Changeset 5891


Ignore:
Timestamp:
Nov 28, 2011, 8:33:40 AM (10 years ago)
Author:
Nicklas Nordborg
Message:

References #1630: Migrate from MySQL to PostgreSQL

Using a separate thread for gzip compression/decompression. Hopefully this should make the export/import a bit faster on multi-processor machines.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/core/net/sf/basedb/core/Migration.java

    r5889 r5891  
    3131import java.io.OutputStream;
    3232import java.io.OutputStreamWriter;
     33import java.io.PipedInputStream;
     34import java.io.PipedOutputStream;
    3335import java.io.StringWriter;
    3436import java.io.Writer;
     
    6264import net.sf.basedb.core.dbengine.TableInfo.PrimaryKeyInfo;
    6365import net.sf.basedb.core.hibernate.JdbcWork;
     66import net.sf.basedb.util.FileCopyRunnable;
    6467import net.sf.basedb.util.FileUtil;
    6568import net.sf.basedb.util.MD5;
     
    300303    // Load information about all tables in the static and dynamic databases
    301304    progress.display(0, "Counting tables...\n");
    302     final int TICK_INTERVAL = 1000;
     305    final int TICK_INTERVAL = 2000;
    303306    long nextTick = System.currentTimeMillis() + TICK_INTERVAL;
    304307    List<Table> staticTables = TableInfo.listTables(metaData, staticCatalog, staticSchema);
     
    344347    long actualExported = 0;
    345348    Set<Integer> usedSqlTypes = new HashSet<Integer>();
     349    Thread compressThread = null;
    346350    for (TableInfo table : tables)
    347351    {
     
    386390      writer.close();
    387391
     392      // Execute query
     393      ResultSet rows = st.executeQuery(select.toString());
     394     
     395      // Wait for previous output file to be completely writting
     396      if (compressThread != null && compressThread.isAlive())
     397      {
     398        try
     399        {
     400          compressThread.join();
     401        }
     402        catch (InterruptedException ex)
     403        {}
     404      }
     405     
    388406      // Create output file
    389407      OutputStream out = getOutputStream(exportFile);
     408     
     409      // Setup streams and threading if using compression
     410      if (exportCompressed)
     411      {
     412        PipedInputStream pipeIn = new PipedInputStream(1024*1024);
     413        compressThread = new Thread(new FileCopyRunnable(pipeIn, out));
     414        out = new PipedOutputStream(pipeIn);
     415        compressThread.start();
     416      }
     417     
    390418      writer = new BufferedWriter(new OutputStreamWriter(out, "UTF-8"));
    391 
    392       ResultSet rows = st.executeQuery(select.toString());
    393419      int actualRows = 0;
    394420      while (rows.next())
     
    431457      // Done with this table...
    432458      progress.display((int)((actualExported * 100) / approxTotal),
    433           tableName + ": complete (" + actualRows + " rows)                   \n");
     459          tableName + ": complete (" + actualRows + " rows)                      \n");
    434460      writer.flush();
    435461      writer.close();
    436       out.flush();
    437       out.close();
     462    }
     463   
     464    // Wait for compress thread to finish
     465    if (compressThread != null && compressThread.isAlive())
     466    {
     467      try
     468      {
     469        compressThread.join();
     470      }
     471      catch (InterruptedException ex)
     472      {}     
    438473    }
    439474    progress.display(100, "Export complete, total row count: " + actualExported + "\n");
     
    918953    copy.append(" WITH ENCODING 'UTF8'");
    919954
     955    // Setup streams
     956    InputStream in = getInputStream(dataFile);
     957   
     958    // Use separate thread for decompression
     959    if (dataFile.getName().endsWith(".gz"))
     960    {
     961      PipedOutputStream out = new PipedOutputStream();
     962      Thread decompressThread = new Thread(new FileCopyRunnable(in, out));
     963      in = new PipedInputStream(out, 1024*1024);
     964      decompressThread.start();
     965    }
     966   
    920967    // Import the data
    921     ProgressInputStream in = new ProgressInputStream(getInputStream(dataFile), progress);
     968    ProgressInputStream pin = new ProgressInputStream(in, progress);
    922969    try
    923970    {
    924971      CopyManager copyManager = new CopyManager((Jdbc4Connection)st.getConnection());
    925       long rows = copyManager.copyIn(copy.toString(), in);
    926       in.append("; "+rows + " rows imported");
     972      long rows = copyManager.copyIn(copy.toString(), pin);
     973      pin.append("; "+rows + " rows imported");
    927974    }
    928975    finally
    929976    {
    930       FileUtil.close(in);
     977      FileUtil.close(pin);
    931978    }
    932979    //st.executeUpdate(copy.toString());
     
    9851032    private long numLines;
    9861033    private long nextTick;
    987     private long TICK_INTERVAL = 1000;
     1034    private long TICK_INTERVAL = 2000;
    9881035    private String lastMsg;
    9891036   
Note: See TracChangeset for help on using the changeset viewer.