Changeset 5896


Ignore:
Timestamp:
Dec 1, 2011, 4:14:56 PM (10 years ago)
Author:
Nicklas Nordborg
Message:

References #1630: Migrate from MySQL to PostgreSQL

Refactored migration exporter to use special implementations for each column type which can use more exact calls to get data from the database (eg. getInt() instead of getObject()). String escaping has been made more efficient by using lookup arrays instead of regular expressions. Initial tests indicate a performance gain of 20-25%.

A side-effect is that it should not be too difficult to make the exporter also generate data files that can be imported into MySQL. Eg. we only need a few different implementations that format data a bit differently. The importer may require more work though, so this is nothing we are going to do right now.

Progress reporting is now weighted with the number of columns in each table. This seems to better reflect the actual time it takes. The reporter and raw data tables usually takes much longer time per row since they have more columns than other tables.

Location:
trunk/src/core/net/sf/basedb/core
Files:
16 added
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/core/net/sf/basedb/core/Migration.java

    r5894 r5896  
    4040import java.sql.SQLException;
    4141import java.sql.Statement;
    42 import java.sql.Types;
     42import java.text.DecimalFormat;
     43import java.text.DecimalFormatSymbols;
    4344import java.util.ArrayList;
    44 import java.util.Date;
    4545import java.util.HashMap;
    4646import java.util.HashSet;
     
    6464import net.sf.basedb.core.dbengine.TableInfo.PrimaryKeyInfo;
    6565import net.sf.basedb.core.hibernate.JdbcWork;
     66import net.sf.basedb.core.migrate.ResultWriter;
     67import net.sf.basedb.core.migrate.ResultWriterFactory;
     68import net.sf.basedb.core.migrate.postgres.PostgresResultWriterFactory;
    6669import net.sf.basedb.util.FileCopyRunnable;
    6770import net.sf.basedb.util.FileUtil;
     
    6972import net.sf.basedb.util.RegexpFileFilter;
    7073import net.sf.basedb.util.Values;
    71 import net.sf.basedb.util.formatter.DateFormatter;
    7274
    7375import org.hibernate.Query;
     
    187189  private final Session session;
    188190  private final ProgressReporter progress;
    189   private final DateFormatter dateFormat;
    190   private final DateFormatter dateTimeFormat;
     191  private final DecimalFormat numRowsFormat;
    191192 
    192193  private boolean exportCompressed;
     
    202203    this.session = session;
    203204    this.progress = progress;
    204     this.dateFormat = new DateFormatter("yyyy-MM-dd");
    205     this.dateTimeFormat = new DateFormatter("yyyy-MM-dd HH:mm:ss");
     205    // This formatter is used for progress reporting
     206    DecimalFormatSymbols symbols = new DecimalFormatSymbols();
     207    symbols.setGroupingSeparator(',');
     208    this.numRowsFormat = new DecimalFormat("#,###",symbols);
    206209    this.exportCompressed = false;
    207210    this.fetchSize = 20000;
     
    312315    DatabaseMetaData metaData = connection.getMetaData();
    313316    DbEngine engine = HibernateUtil.getDbEngine();
    314    
     317    ResultWriterFactory rwFactory = new PostgresResultWriterFactory();
     318
    315319    String staticCatalog = null;
    316320    String staticSchema = null;
     
    338342    // and count the number of data rows for each table
    339343    List<TableInfo> tables = new ArrayList<TableInfo>();
    340     long approxTotal = 0;
     344    long approxTotalValues = 0;
     345    long approxTotalRows = 0;
    341346    for (Table table : staticTables)
    342347    {
    343348      TableInfo info = new TableInfo(table, metaData);
    344349      long approxRows = info.getRowCount(engine, connection, true);
    345       approxTotal += approxRows;
     350      approxTotalRows += approxRows;
     351      approxTotalValues += approxRows * info.getColumns().size();
    346352      tables.add(info);
    347353      if (System.currentTimeMillis() > nextTick)
     
    355361      TableInfo info = new TableInfo(table, metaData);
    356362      long approxRows = info.getRowCount(engine, connection, true);
    357       approxTotal += approxRows;
     363      approxTotalRows += approxRows;
     364      approxTotalValues += approxRows * info.getColumns().size();
    358365      tables.add(info);
    359366      if (System.currentTimeMillis() > nextTick || tables.size() == totalTables)
     
    364371    }
    365372   
    366     progress.display(0, "Found approximate " + Values.formatNumber(approxTotal / 1000000f, 2) + " million data rows\n");
     373    progress.display(0, "Found approximate " + Values.formatNumber(approxTotalRows / 1000000f, 2) + " million data rows\n");
    367374   
    368375    // Export data for each table...
    369     long actualExported = 0;
     376    long actualExportedValues = 0;
     377    long actualExportedRows = 0;
    370378    Set<Integer> usedSqlTypes = new HashSet<Integer>();
    371379    Thread compressThread = null;
     
    373381    {
    374382      String tableName = table.getTable().getName();
    375       progress.display((int)((actualExported * 100) / approxTotal),
    376           tableName + ": loading...");
    377       nextTick = System.currentTimeMillis()+TICK_INTERVAL;
    378 
    379383      long approxRows = table.getRowCount(engine, connection, true);
     384      int numColumns = table.getColumns().size();
     385
    380386      File exportFile = getDataFile(table, exportCompressed);
    381387      if (exportFile.exists())
    382388      {
    383389        // Skip this table and move on to the next
    384         actualExported += approxRows;
    385         progress.display((int)((actualExported * 100) / approxTotal),
     390        actualExportedValues += approxRows * numColumns;
     391        progress.display((int)((actualExportedValues * 100) / approxTotalValues),
    386392            tableName + ": skipped (file already exists)\n");
    387393        continue; // with next table
    388394      }
    389395     
     396      progress.display((int)((actualExportedValues * 100) / approxTotalValues),
     397          tableName + ": loading (approx. " + numRows(approxRows) + " rows)...");
     398      nextTick = System.currentTimeMillis()+TICK_INTERVAL;
     399
    390400      // Get columns in sorted order
    391401      SortedSet<ColumnInfo> tmp =  new TreeSet<ColumnInfo>(table.getColumns());
     
    439449        compressThread.start();
    440450      }
    441      
    442451      writer = new BufferedWriter(new OutputStreamWriter(out, "UTF-8"));
     452     
     453      // Setup writer factory and factories
     454      ResultWriter[] writers = new ResultWriter[columns.length];
     455      for (int i = 0; i < columns.length; ++i)
     456      {
     457        writers[i] = rwFactory.create(rows, i+1, writer, columns[i]);
     458      }
     459
    443460      long actualRows = 0;
    444461      while (rows.next())
     
    450467          if (actualRows > approxRows)
    451468          {
    452             approxTotal += actualRows - approxRows;
     469            approxTotalValues += (actualRows - approxRows) * numColumns;
    453470            approxRows = actualRows;
    454471          }
    455           progress.display((int)(((actualExported+actualRows) * 100) / approxTotal),
    456               tableName + ": exporting... (" + actualRows + " of " + approxRows + " rows done)");
     472          progress.display((int)(((actualExportedValues+actualRows*numColumns) * 100) / approxTotalValues),
     473              tableName + ": exporting... (" + numRows(actualRows) + " of " + numRows(approxRows) + " rows done)");
    457474          nextTick = System.currentTimeMillis() + TICK_INTERVAL;
    458475        }
     
    463480        {
    464481          if (index > 0) writer.write("\t");
    465           ColumnInfo c = columns[index];
    466           writer.write(formatAndEscape(rows.getObject(++index), c));
     482          writers[index++].write();
    467483        }
    468484        writer.write("\n");
     
    472488      }
    473489     
    474       actualExported += actualRows;
     490      actualExportedValues += actualRows * numColumns;
     491      actualExportedRows += actualRows;
    475492      // Correct for actual vs. approximate mismatch
    476493      if (actualRows != approxRows)
    477494      {
    478495        // Decrease the approximate total row count
    479         approxTotal += actualRows - approxRows;
     496        approxTotalValues += (actualRows - approxRows) * numColumns;
    480497      }
    481498
     
    485502      tmpFile.renameTo(exportFile);
    486503
    487       progress.display((int)((actualExported * 100) / approxTotal),
    488           tableName + ": complete (" + actualRows + " rows)                      \n");
     504      progress.display((int)((actualExportedValues * 100) / approxTotalValues),
     505          tableName + ": complete (" + numRows(actualRows) + " rows)                      \n");
    489506    }
    490507   
     
    499516      {}     
    500517    }
    501     progress.display(100, "Export complete, total row count: " + actualExported + "\n");
     518    progress.display(100, "Export complete, total row count: " + numRows(actualExportedRows) + "\n");
    502519  }
    503520 
     
    740757  }
    741758 
    742   private static String zero = new String(new char[] { 0 });
    743 
    744   /**
    745     Format the value to a string that PostgreSQL accepts in the
    746     data file for the COPY command. Typical rules:
    747    
    748     <ul>
    749     <li>null value --> \N
    750     <li>strings --> must escape \, newline, tab, etc. with \
    751     <li>dates and times --> ISO format as defined by PostgreSQL (eg. 2011-11-07 08:21:14)
    752     <li>boolean --> 'true' or 'false'
    753     <li>binary --> octal escapes using \ for all character codes <32 and >127 and for 92==\
    754     </ul>
    755    
    756     @param value The value
    757     @param column
    758     @return The formatted and escaped string
    759   */
    760   private String formatAndEscape(Object value, ColumnInfo column)
    761   {
    762     String formatted;
    763     int sqlType = column.getSqlType();
    764     if (value == null)
    765     {
    766       formatted = "\\N";
    767     }
    768     else
    769     {
    770       switch (sqlType)
    771       {
    772         case Types.VARCHAR:
    773         case Types.LONGVARCHAR:
    774         {
    775           String s = value.toString();
    776           s = s.replace("\\", "\\\\");
    777           s = s.replace("\n", "\\n");
    778           s = s.replace("\r", "\\r");
    779           s = s.replace("\t", "\\t");
    780           s = s.replace(zero, ""); // a zero-character can't be stored in PostgreSQL so we simply remove it
    781           formatted = s;
    782           break;
    783         }
    784         case Types.INTEGER:
    785         case Types.SMALLINT:
    786         case Types.BIGINT:
    787         {
    788           formatted = value.toString();
    789           break;
    790         }
    791         case Types.REAL:
    792         case Types.DOUBLE:
    793         case Types.FLOAT:
    794         {
    795           formatted = value.toString();
    796           break;
    797         }
    798         case Types.TIMESTAMP:
    799         {
    800           formatted = dateTimeFormat.format((Date)value);
    801           break;
    802         }
    803         case Types.DATE:
    804         {
    805           formatted = dateFormat.format((Date)value);
    806           break;
    807         }
    808         case Types.BIT:
    809         case Types.BOOLEAN:
    810         {
    811           formatted = value.toString();
    812           break;
    813         }
    814         case Types.BINARY:
    815         case Types.VARBINARY:
    816         case Types.LONGVARBINARY:
    817         {
    818           byte[] bytes = (byte[])value;
    819           StringBuilder sb = new StringBuilder();
    820           for (int i = 0; i < bytes.length; ++i)
    821           {
    822             int b = bytes[i] & 0xff;
    823             if (b < 32 || b > 127)
    824             {
    825               sb.append("\\\\");
    826               String octal = Integer.toOctalString(b);
    827               if (octal.length() == 1)
    828               {
    829                 sb.append("00");
    830               }
    831               else if (octal.length() == 2)
    832               {
    833                 sb.append("0");
    834               }
    835               sb.append(octal);
    836             }
    837             else if (b == 92) // 92 is a '\'
    838             {
    839               sb.append("\\\\\\\\");
    840             }
    841             else
    842             {
    843               sb.append(new String(bytes, i, 1));
    844             }
    845           }
    846           formatted = sb.toString();
    847           break;
    848         }
    849         default:
    850           throw new BaseException("Unhandled SQL column type for column " +
    851               column.getName() + ": " + sqlType);
    852       }
    853     }
    854     return formatted;
     759  private String numRows(long numRows)
     760  {
     761    return numRowsFormat.format(numRows);
    855762  }
    856763 
     
    998905      CopyManager copyManager = new CopyManager((Jdbc4Connection)st.getConnection());
    999906      long rows = copyManager.copyIn(copy.toString(), pin);
    1000       pin.append("; "+rows + " rows imported");
     907      pin.append("; "+numRows(rows) + " rows imported");
    1001908    }
    1002909    finally
     
    1053960
    1054961 
    1055   static class ProgressInputStream
     962  class ProgressInputStream
    1056963    extends FilterInputStream
    1057964  {
     
    1086993      if (numRead > 0)
    1087994      {
    1088         for (int i = offset; i < offset+numRead; ++i)
     995        int end = offset+numRead;
     996        for (int i = offset; i < end; ++i)
    1089997        {
    1090998          if (buffer[i] == '\n') numLines++;
     
    10921000        if (System.currentTimeMillis() > nextTick)
    10931001        {
    1094           append("; " + numLines + " rows...");
     1002          append("; " + numRows(numLines) + " rows...");
    10951003          nextTick = System.currentTimeMillis() + TICK_INTERVAL;
    10961004        }
     
    11101018    }
    11111019  }
    1112  
     1020
    11131021}
    11141022
Note: See TracChangeset for help on using the changeset viewer.