Changeset 7022


Ignore:
Timestamp:
Feb 6, 2023, 2:45:22 PM (8 weeks ago)
Author:
Nicklas Nordborg
Message:

References #1453: Make the release exporter multi-threaded

Use 1/3 of the available cpus for the export. The ReleaseExporter will start multiple threads. It is important that all *Writer implementations are thread-safe.

Location:
extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/converter/DaysSinceRefDateConverter.java

    r4570 r7022  
    1919  private static long MILLIS_IN_A_DAY = 24 * MILLIS_IN_AN_HOUR;
    2020 
    21   private Long refTime;
     21  // Need ThreadLocal so a single instance can handle
     22  // multiple threads at the same time
     23  private final ThreadLocal<Long> refTime;
    2224 
    2325  public DaysSinceRefDateConverter()
    24   {}
     26  {
     27    this.refTime = new ThreadLocal<>();
     28  }
    2529 
    2630  /*
     
    3135  public Integer convert(Date value)
    3236  {
    33     return value == null || refTime == null ? null : (int)((value.getTime() - refTime) / MILLIS_IN_A_DAY);
     37    Long refT = refTime.get();
     38    return value == null || refT == null ? null : (int)((value.getTime() - refT) / MILLIS_IN_A_DAY);
    3439  }
    3540  // ----------------------------------
     
    4247    // The calculations rely on rounding down to return the correct result but
    4348    // it is important that no time part is present in the date values
    44     this.refTime = refDate == null ? null : refDate.getTime() - MILLIS_IN_AN_HOUR;
     49    this.refTime.set(refDate == null ? null : refDate.getTime() - MILLIS_IN_AN_HOUR);
    4550  }
    4651 
  • extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/CohortItem.java

    r7010 r7022  
    124124    @since 4.21
    125125  */
    126   public CohortItem(ReleaseExporter exporter, DbControl dc, SnapshotManager manager, QueryManager queryManager, int biosourceId, ReleaseWriterOptions options)
     126  public CohortItem(ReleaseExporter exporter, DbControl dc, SnapshotManager manager, QueryManager queryManager, int biosourceId)
    127127  {
    128128    this.exporter = exporter;
  • extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/CohortWriter.java

    r7010 r7022  
    1919  Subclasses must implement the {@link #writeCohortData(CohortItem)}
    2020  method and make sure the header line is written first.
     21 
     22  Implementing classes need to be thread-safe since they are
     23  used with multiple exporter threads at the same time.
     24  See {@link ReleaseExporter#exportCohortData()}.
    2125
    2226  @author nicklas
  • extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/IncaWriter.java

    r5415 r7022  
    4141
    4242  private final Annotationtype[] incaTypes;
    43   private DaysSinceRefDateConverter daysSinceRefDate;
    44   private ValueConverter<?, ?>[] incaConverters;
     43  private final DaysSinceRefDateConverter daysSinceRefDate;
     44  private final ValueConverter<?, ?>[] incaConverters;
    4545 
     46  @SuppressWarnings("rawtypes")
    4647  public IncaWriter(DbControl dc, ReleaseWriterOptions options)
    4748  {
     
    6667      index++;
    6768    }
     69   
     70    incaConverters = new ValueConverter[incaTypes.length];
     71    daysSinceRefDate = new DaysSinceRefDateConverter();
    6872  }
    6973 
     
    109113
    110114  @Override
    111   @SuppressWarnings("rawtypes")
    112115  public List<CohortTypeDef> getTypeDefsInJSON()
    113116  {
    114117    DbControl dc = getDbControl();
    115118    CohortTypeDefFactory incaFactory = new CohortTypeDefFactory(dc, Item.SAMPLE, "Case");
    116     incaConverters = new ValueConverter[incaTypes.length];
    117     daysSinceRefDate = new DaysSinceRefDateConverter();
    118119    Unit days = UnitUtil.getUnit(dc, Quantity.TIME, "d");
    119120   
  • extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/JsonWriter.java

    r5320 r7022  
    126126  public void writeJsonData(CohortItem item)
    127127  {
     128    // This method may be called by multiple threads
     129    // The CohortWriter implementations are assumed to be thread-safe
    128130    JSONArray json = new FilteredJSONArray(new NotNullFilter<>(false));
    129131    for (CohortWriter cw : itemWriters)
     
    132134    }
    133135
    134     String itemName = item.getName();
    135     ExportOutputStream out = null;
    136     try
    137     {
    138       String file = "/json/"+itemName+".json";
    139       jsonFiles.add(file);
    140       out = location.getOutputStream(file, false);
    141       out.setCharacterSet("UTF-8");
    142       out.setMimeType("application/json");
    143 
    144       Writer writer = new OutputStreamWriter(out, Charset.forName("UTF-8"));
    145       json.writeJSONString(writer);
    146       writer.flush();
    147       out.flush();
    148       writer.close();
    149     }
    150     catch (IOException ex)
    151     {
    152       throw new RuntimeException(ex);
    153     }
    154     finally
    155     {
    156       FileUtil.close(out);
     136    // This part need to be synchronized since the OutputLocation
     137    // implementation may not be thread-safe
     138    synchronized (this)
     139    {
     140      String itemName = item.getName();
     141      ExportOutputStream out = null;
     142      try
     143      {
     144        String file = "/json/"+itemName+".json";
     145        jsonFiles.add(file);
     146        out = location.getOutputStream(file, false);
     147        out.setCharacterSet("UTF-8");
     148        out.setMimeType("application/json");
     149 
     150        Writer writer = new OutputStreamWriter(out, Charset.forName("UTF-8"));
     151        json.writeJSONString(writer);
     152        writer.flush();
     153        out.flush();
     154        writer.close();
     155      }
     156      catch (IOException ex)
     157      {
     158        throw new RuntimeException(ex);
     159      }
     160      finally
     161      {
     162        FileUtil.close(out);
     163      }
    157164    }
    158165  }
  • extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/ReleaseExporter.java

    r7007 r7022  
    55import java.util.List;
    66import java.util.Map;
     7import java.util.Queue;
    78import java.util.Set;
    89import java.util.TreeMap;
     10import java.util.concurrent.ConcurrentLinkedQueue;
     11import java.util.concurrent.ExecutorService;
     12import java.util.concurrent.Executors;
     13import java.util.concurrent.ThreadFactory;
     14import java.util.concurrent.TimeUnit;
    915
    1016import net.sf.basedb.core.Application;
     
    1723import net.sf.basedb.core.ItemQuery;
    1824import net.sf.basedb.core.ProgressReporter;
     25import net.sf.basedb.core.SessionControl;
    1926import net.sf.basedb.core.SimpleProgressReporter;
    2027import net.sf.basedb.core.plugin.ExportOutputStream;
    2128import net.sf.basedb.core.query.Hql;
    2229import net.sf.basedb.core.query.Orders;
    23 import net.sf.basedb.core.signal.ThreadSignalHandler;
     30import net.sf.basedb.core.signal.SignalException;
    2431import net.sf.basedb.core.snapshot.SnapshotManager;
    2532import net.sf.basedb.reggie.Reggie;
     
    4653  private OutputLocation outputLocation;
    4754
    48   private ReleaseWriterOptions options;
     55  private final ReleaseWriterOptions options;
    4956 
    5057  private ItemList list;
     
    136143  private void exportCohortData(DbControl dc, QueryManager queryManager, List<BioSource> biosources, ProgressReporter progress)
    137144  {
    138     progress.display(15, "Exporting cohort data for " + biosources.size() + " biosources...");
    139 
     145    // Use 1/3 of the available processors
     146    int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors()/3);
    140147    // Counters used for progress reporting
    141148    int totalCount = biosources.size();
     149    progress.display(15, "Exporting cohort data for " + totalCount + " biosources ("+numThreads+" threads)...");
     150   
     151    ScriptWriter scriptWriter = null;
     152    if (options.exportFileSyncScripts())
     153    {
     154      scriptWriter = new ScriptWriter(dc, outputLocation, options);
     155    }
     156   
     157    JsonWriter jsonWriter = new JsonWriter(dc, outputLocation, options);
     158    jsonWriter.registerItemWriter(new PatientWriter(dc, options));
     159    jsonWriter.registerItemWriter(new RetractWriter(dc, options));
     160    jsonWriter.registerItemWriter(new NotAskedWriter(dc, options));
     161    jsonWriter.registerItemWriter(new NoWriter(dc, options));
     162    jsonWriter.registerItemWriter(new BloodWriter(dc, options));
     163    jsonWriter.registerItemWriter(new BloodDnaWriter(dc, options));
     164    jsonWriter.registerItemWriter(new CaseWriter(dc, options));
     165    jsonWriter.registerItemWriter(new IncaWriter(dc, options));
     166    jsonWriter.registerItemWriter(new SpecimenWriter(dc, options, outputLocation, scriptWriter));
     167    jsonWriter.registerItemWriter(new NoSpecimenWriter(dc, options));
     168    jsonWriter.registerItemWriter(new LysateWriter(dc, options));
     169    jsonWriter.registerItemWriter(new RnaWriter(dc, options));
     170    jsonWriter.registerItemWriter(new DnaWriter(dc, options));
     171    jsonWriter.registerItemWriter(new FlowThroughWriter(dc, options));
     172    jsonWriter.registerItemWriter(new LibraryWriter(dc, options));
     173    jsonWriter.registerItemWriter(new MergedWriter(dc, options, scriptWriter));
     174    jsonWriter.registerItemWriter(new AlignedWriter(dc, options, scriptWriter));
     175    jsonWriter.registerItemWriter(new StringTieWriter(dc, options, scriptWriter));
     176    jsonWriter.registerItemWriter(new CufflinksWriter(dc, options, scriptWriter));
     177    jsonWriter.registerItemWriter(new VariantCallingWriter(dc, options, scriptWriter));
     178    jsonWriter.registerItemWriter(new MethylationWriter(dc, options, scriptWriter));
     179    jsonWriter.registerItemWriter(new OncoarrayWriter(dc, options, scriptWriter));
     180   
     181    jsonWriter.writeIndex(list, biosources);
     182    jsonWriter.writeTypeDefs();
     183   
     184    Queue<BioSource> bioSourceQueue = new ConcurrentLinkedQueue<>(biosources);
     185   
     186    CallableExporter[] exporters = new CallableExporter[numThreads];
     187    ExecutorService executor = Executors.newFixedThreadPool(numThreads, new ExporterThreadFactory());
     188   
    142189    int count = 0;
    143    
    144     DbControl readerDc = null;
    145     SnapshotManager readerManager = null;
    146190    try
    147191    {
    148       ScriptWriter scriptWriter = null;
    149       if (options.exportFileSyncScripts())
    150       {
    151         scriptWriter = new ScriptWriter(dc, outputLocation, options);
     192      // Start the exporter threads
     193      for (int threadNo = 0; threadNo < numThreads; threadNo++)
     194      {
     195        CallableExporter exp = new CallableExporter(this, dc, jsonWriter, bioSourceQueue);
     196        exporters[threadNo] = exp;
     197        executor.execute(exp);
    152198      }
    153199     
    154       JsonWriter jsonWriter = new JsonWriter(dc, outputLocation, options);
    155       jsonWriter.registerItemWriter(new PatientWriter(dc, options));
    156       jsonWriter.registerItemWriter(new RetractWriter(dc, options));
    157       jsonWriter.registerItemWriter(new NotAskedWriter(dc, options));
    158       jsonWriter.registerItemWriter(new NoWriter(dc, options));
    159       jsonWriter.registerItemWriter(new BloodWriter(dc, options));
    160       jsonWriter.registerItemWriter(new BloodDnaWriter(dc, options));
    161       jsonWriter.registerItemWriter(new CaseWriter(dc, options));
    162       jsonWriter.registerItemWriter(new IncaWriter(dc, options));
    163       jsonWriter.registerItemWriter(new SpecimenWriter(dc, options, outputLocation, scriptWriter));
    164       jsonWriter.registerItemWriter(new NoSpecimenWriter(dc, options));
    165       jsonWriter.registerItemWriter(new LysateWriter(dc, options));
    166       jsonWriter.registerItemWriter(new RnaWriter(dc, options));
    167       jsonWriter.registerItemWriter(new DnaWriter(dc, options));
    168       jsonWriter.registerItemWriter(new FlowThroughWriter(dc, options));
    169       jsonWriter.registerItemWriter(new LibraryWriter(dc, options));
    170       jsonWriter.registerItemWriter(new MergedWriter(dc, options, scriptWriter));
    171       jsonWriter.registerItemWriter(new AlignedWriter(dc, options, scriptWriter));
    172       jsonWriter.registerItemWriter(new StringTieWriter(dc, options, scriptWriter));
    173       jsonWriter.registerItemWriter(new CufflinksWriter(dc, options, scriptWriter));
    174       jsonWriter.registerItemWriter(new VariantCallingWriter(dc, options, scriptWriter));
    175       jsonWriter.registerItemWriter(new MethylationWriter(dc, options, scriptWriter));
    176       jsonWriter.registerItemWriter(new OncoarrayWriter(dc, options, scriptWriter));
    177      
    178       jsonWriter.writeIndex(list, biosources);
    179       jsonWriter.writeTypeDefs();
    180            
    181       readerDc = dc.getSessionControl().newDbControl(dc.getName());
    182       readerManager = new SnapshotManager();
    183       for (BioSource bs : biosources)
    184       {
    185         count++;
    186         if (count % 10 == 0)
     200      // Wait in this loop until all biosources have been processed
     201      while (count < totalCount)
     202      {
     203        try
    187204        {
    188           ThreadSignalHandler.checkInterrupted();
    189           if (count % 100 == 0)
    190           {
    191             // Rollback and create new DbControl and snapshot
    192             // manager to allow GC to reclaim memory
    193             readerDc.close();
    194             readerDc = dc.getSessionControl().newDbControl(dc.getName());
    195             readerManager = new SnapshotManager();
    196           }
    197          
    198           progress.display(15 + ((80*count)/totalCount),
    199             "Exporting cohort data (" + count + " of " + totalCount + ")...");
     205          Thread.sleep(2000);
    200206        }
    201        
    202         CohortItem item = new CohortItem(this, readerDc, readerManager, queryManager, bs.getId(), options);
    203         jsonWriter.writeJsonData(item);
    204       }
    205      
    206       jsonWriter.writeFiles();
    207       jsonWriter.writeBatchIndexLookupFiles(batchIndexProxies);
    208       if (scriptWriter != null) scriptWriter.writeScripts();
    209      
    210       progress.display(98, "Exported cohort data (" + count + " items)");
     207        catch (InterruptedException ex)
     208        {
     209          throw new SignalException("Aborted by user");
     210        }
     211        // Important loop for progress reporting AND for detecting if one of
     212        // the exporter threads have failed (getCount() throws an exception)
     213        count = 0;
     214        for (CallableExporter exp : exporters)
     215        {
     216          count += exp.getCount();
     217        }
     218        progress.display(15 + ((80*count)/totalCount),
     219          "Exporting cohort data ("+numThreads+" threads; " + count + " of " + totalCount + ")...");
     220      }
    211221    }
    212222    finally
    213223    {
    214       if (readerDc != null) readerDc.close();
    215     }
     224      try
     225      {
     226        // This will send an interrupt to all running threads
     227        // There should be none if everything went ok, but if one
     228        // thread fails, getCount() will throw an exception and we
     229        // will ask the other threads to abort. We give them 10 seconds...
     230        executor.shutdownNow();
     231        executor.awaitTermination(10, TimeUnit.SECONDS);
     232      }
     233      catch (InterruptedException ex)
     234      {}
     235    }
     236   
     237    jsonWriter.writeFiles();
     238    jsonWriter.writeBatchIndexLookupFiles(batchIndexProxies);
     239    if (scriptWriter != null) scriptWriter.writeScripts();
     240   
     241    progress.display(98, "Exported cohort data (" + count + " items)");
    216242  }
    217243 
     
    245271    @param batchKey Typically a date identifying the "batch"
    246272  */
    247   public String getBatchIndexProxy(String batchName, String batchKey)
     273  public synchronized String getBatchIndexProxy(String batchName, String batchKey)
    248274  {
    249275    Map<String, String> batch = batchIndexProxies.get(batchName);
     
    265291  }
    266292 
    267  
     293  /**
     294    Implementation for multi-threading support. Any number of instances can be created with
     295    the same parameters and started at the same time. Each instance will take biosource items
     296    one by one from the queue and will continue processing as long as there are items left.
     297   
     298    The main code should call getCount() at regular intervals to get information about the
     299    progress. This will also check if there has been an error during the processing and
     300    re-throw the same error.
     301   
     302    @since 4.44
     303  */
     304  public static class CallableExporter
     305    implements Runnable
     306  {
     307    private final ReleaseExporter exporter;
     308    private final DbControl writerDc;
     309    private final JsonWriter jsonWriter;
     310    private final Queue<BioSource> biosources;
     311   
     312    private volatile int count;
     313    private volatile RuntimeException error;
     314   
     315    CallableExporter(ReleaseExporter exporter, DbControl writerDc, JsonWriter jsonWriter, Queue<BioSource> biosources)
     316    {
     317      this.exporter = exporter;
     318      this.writerDc = writerDc;
     319      this.jsonWriter = jsonWriter;
     320      this.biosources = biosources;
     321    }
     322
     323    /**
     324      Get the number of biosources that have been processed so far by this
     325      instance. If there has been an error during the processing, this
     326      method will re-throw that error.
     327    */
     328    public int getCount()
     329    {
     330      if (error != null) throw error;
     331      return count;
     332    }
     333   
     334    @Override
     335    public void run()
     336    {
     337      DbControl readerDc = null;
     338      SnapshotManager manager = null;
     339      QueryManager queryManager = null;
     340     
     341      try
     342      {
     343        SessionControl sc = writerDc.getSessionControl();
     344        readerDc = sc.newDbControl(writerDc.getName());
     345        manager = new SnapshotManager();
     346        queryManager = new QueryManager(readerDc);
     347       
     348        do
     349        {
     350          BioSource bs = biosources.poll();
     351          // Exit if there are no more biosources or
     352          // if this thread has been interrupted (eg. due to
     353          // aborting or an error in another thread)
     354          if (bs == null || Thread.interrupted()) break;
     355         
     356          if (count % 100 == 0)
     357          {
     358            // Rollback and create new DbControl and snapshot
     359            // manager to allow GC to reclaim memory
     360            readerDc.close();
     361            readerDc = sc.newDbControl(writerDc.getName());
     362            manager = new SnapshotManager();
     363            queryManager = new QueryManager(readerDc);
     364          }
     365         
     366          CohortItem item = new CohortItem(exporter, readerDc, manager, queryManager, bs.getId());
     367          jsonWriter.writeJsonData(item);
     368          count++;
     369        } while (true);
     370      }
     371      catch (RuntimeException ex)
     372      {
     373        error = ex;
     374      }
     375      finally
     376      {
     377        if (readerDc != null) readerDc.close();
     378      }
     379    }
     380  }
     381 
     382  /**
     383    Thread factory for exporter threads.
     384  */
     385  public static class ExporterThreadFactory
     386    implements ThreadFactory
     387  {
     388 
     389    private final ThreadGroup grp;
     390    private int nThreads;
     391   
     392    public ExporterThreadFactory()
     393    {
     394      this.grp = new ThreadGroup("ReleaseExporterThread");
     395    }
     396   
     397    @Override
     398    public Thread newThread(Runnable r)
     399    {
     400      nThreads++;
     401      return new Thread(grp, r, grp.getName()+"."+nThreads);
     402    }
     403  }
     404
    268405}
  • extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/ScriptWriter.java

    r5196 r7022  
    6868    Add a directory that should be created as part of this release.
    6969  */
    70   public void addMkDir(String path)
     70  public synchronized void addMkDir(String path)
    7171  {
    7272    if (path != null)
     
    8989    @param jsonFiles
    9090  */
    91   public void addFiles(String releasePath, String sourcePath, String fileList, JSONArray jsonFiles)
     91  public synchronized void addFiles(String releasePath, String sourcePath, String fileList, JSONArray jsonFiles)
    9292  {
    9393    addMkDir(releasePath);
     
    140140    @since 4.21
    141141  */
    142   public void addLinkToExistingFile(String releasePath, String filename, String releasedVersion)
     142  public synchronized void addLinkToExistingFile(String releasePath, String filename, String releasedVersion)
    143143  {
    144144    addMkDir(releasePath);
Note: See TracChangeset for help on using the changeset viewer.