Changeset 7022
- Timestamp:
- Feb 6, 2023, 2:45:22 PM (8 weeks ago)
- Location:
- extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/converter/DaysSinceRefDateConverter.java
r4570 r7022 19 19 private static long MILLIS_IN_A_DAY = 24 * MILLIS_IN_AN_HOUR; 20 20 21 private Long refTime; 21 // Need ThreadLocal so a single instance can handle 22 // multiple threads at the same time 23 private final ThreadLocal<Long> refTime; 22 24 23 25 public DaysSinceRefDateConverter() 24 {} 26 { 27 this.refTime = new ThreadLocal<>(); 28 } 25 29 26 30 /* … … 31 35 public Integer convert(Date value) 32 36 { 33 return value == null || refTime == null ? null : (int)((value.getTime() - refTime) / MILLIS_IN_A_DAY); 37 Long refT = refTime.get(); 38 return value == null || refT == null ? null : (int)((value.getTime() - refT) / MILLIS_IN_A_DAY); 34 39 } 35 40 // ---------------------------------- … … 42 47 // The calculations rely on rounding down to return the correct result but 43 48 // it is important that no time part is present in the date values 44 this.refTime = refDate == null ? null : refDate.getTime() - MILLIS_IN_AN_HOUR;49 this.refTime.set(refDate == null ? null : refDate.getTime() - MILLIS_IN_AN_HOUR); 45 50 } 46 51 -
extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/CohortItem.java
r7010 r7022 124 124 @since 4.21 125 125 */ 126 public CohortItem(ReleaseExporter exporter, DbControl dc, SnapshotManager manager, QueryManager queryManager, int biosourceId , ReleaseWriterOptions options)126 public CohortItem(ReleaseExporter exporter, DbControl dc, SnapshotManager manager, QueryManager queryManager, int biosourceId) 127 127 { 128 128 this.exporter = exporter; -
extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/CohortWriter.java
r7010 r7022 19 19 Subclasses must implement the {@link #writeCohortData(CohortItem)} 20 20 method and make sure the header line is written first. 21 22 Implementing classes need to be thread-safe since they are 23 used with multiple exporter threads at the same time. 24 See {@link ReleaseExporter#exportCohortData()}. 21 25 22 26 @author nicklas -
extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/IncaWriter.java
r5415 r7022 41 41 42 42 private final Annotationtype[] incaTypes; 43 private DaysSinceRefDateConverter daysSinceRefDate;44 private ValueConverter<?, ?>[] incaConverters;43 private final DaysSinceRefDateConverter daysSinceRefDate; 44 private final ValueConverter<?, ?>[] incaConverters; 45 45 46 @SuppressWarnings("rawtypes") 46 47 public IncaWriter(DbControl dc, ReleaseWriterOptions options) 47 48 { … … 66 67 index++; 67 68 } 69 70 incaConverters = new ValueConverter[incaTypes.length]; 71 daysSinceRefDate = new DaysSinceRefDateConverter(); 68 72 } 69 73 … … 109 113 110 114 @Override 111 @SuppressWarnings("rawtypes")112 115 public List<CohortTypeDef> getTypeDefsInJSON() 113 116 { 114 117 DbControl dc = getDbControl(); 115 118 CohortTypeDefFactory incaFactory = new CohortTypeDefFactory(dc, Item.SAMPLE, "Case"); 116 incaConverters = new ValueConverter[incaTypes.length];117 daysSinceRefDate = new DaysSinceRefDateConverter();118 119 Unit days = UnitUtil.getUnit(dc, Quantity.TIME, "d"); 119 120 -
extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/JsonWriter.java
r5320 r7022 126 126 public void writeJsonData(CohortItem item) 127 127 { 128 // This method may be called by multiple threads 129 // The CohortWriter implementations are assumed to be thread-safe 128 130 JSONArray json = new FilteredJSONArray(new NotNullFilter<>(false)); 129 131 for (CohortWriter cw : itemWriters) … … 132 134 } 133 135 134 String itemName = item.getName(); 135 ExportOutputStream out = null; 136 try 137 { 138 String file = "/json/"+itemName+".json"; 139 jsonFiles.add(file); 140 out = location.getOutputStream(file, false); 141 out.setCharacterSet("UTF-8"); 142 out.setMimeType("application/json"); 143 144 Writer writer = new OutputStreamWriter(out, Charset.forName("UTF-8")); 145 json.writeJSONString(writer); 146 writer.flush(); 147 out.flush(); 148 writer.close(); 149 } 150 catch (IOException ex) 151 { 152 throw new RuntimeException(ex); 153 } 154 finally 155 { 156 FileUtil.close(out); 136 // This part need to be synchronized since the OutputLocation 137 // implementation may not be thread-safe 138 synchronized (this) 139 { 140 String itemName = item.getName(); 141 ExportOutputStream out = null; 142 try 143 { 144 String file = "/json/"+itemName+".json"; 145 jsonFiles.add(file); 146 out = location.getOutputStream(file, false); 147 out.setCharacterSet("UTF-8"); 148 out.setMimeType("application/json"); 149 150 Writer writer = new OutputStreamWriter(out, Charset.forName("UTF-8")); 151 json.writeJSONString(writer); 152 writer.flush(); 153 out.flush(); 154 writer.close(); 155 } 156 catch (IOException ex) 157 { 158 throw new RuntimeException(ex); 159 } 160 finally 161 { 162 FileUtil.close(out); 163 } 157 164 } 158 165 } -
extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/ReleaseExporter.java
r7007 r7022 5 5 import java.util.List; 6 6 import java.util.Map; 7 import java.util.Queue; 7 8 import java.util.Set; 8 9 import java.util.TreeMap; 10 import java.util.concurrent.ConcurrentLinkedQueue; 11 import java.util.concurrent.ExecutorService; 12 import java.util.concurrent.Executors; 13 import java.util.concurrent.ThreadFactory; 14 import java.util.concurrent.TimeUnit; 9 15 10 16 import net.sf.basedb.core.Application; … … 17 23 import net.sf.basedb.core.ItemQuery; 18 24 import net.sf.basedb.core.ProgressReporter; 25 import net.sf.basedb.core.SessionControl; 19 26 import net.sf.basedb.core.SimpleProgressReporter; 20 27 import net.sf.basedb.core.plugin.ExportOutputStream; 21 28 import net.sf.basedb.core.query.Hql; 22 29 import net.sf.basedb.core.query.Orders; 23 import net.sf.basedb.core.signal. ThreadSignalHandler;30 import net.sf.basedb.core.signal.SignalException; 24 31 import net.sf.basedb.core.snapshot.SnapshotManager; 25 32 import net.sf.basedb.reggie.Reggie; … … 46 53 private OutputLocation outputLocation; 47 54 48 private ReleaseWriterOptions options;55 private final ReleaseWriterOptions options; 49 56 50 57 private ItemList list; … … 136 143 private void exportCohortData(DbControl dc, QueryManager queryManager, List<BioSource> biosources, ProgressReporter progress) 137 144 { 138 progress.display(15, "Exporting cohort data for " + biosources.size() + " biosources...");139 145 // Use 1/3 of the available processors 146 int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors()/3); 140 147 // Counters used for progress reporting 141 148 int totalCount = biosources.size(); 149 progress.display(15, "Exporting cohort data for " + totalCount + " biosources ("+numThreads+" threads)..."); 150 151 ScriptWriter scriptWriter = null; 152 if (options.exportFileSyncScripts()) 153 { 154 scriptWriter = new ScriptWriter(dc, outputLocation, options); 155 } 156 157 JsonWriter jsonWriter = new JsonWriter(dc, outputLocation, options); 158 jsonWriter.registerItemWriter(new PatientWriter(dc, options)); 159 jsonWriter.registerItemWriter(new RetractWriter(dc, options)); 160 jsonWriter.registerItemWriter(new NotAskedWriter(dc, options)); 161 jsonWriter.registerItemWriter(new NoWriter(dc, options)); 162 jsonWriter.registerItemWriter(new BloodWriter(dc, options)); 163 jsonWriter.registerItemWriter(new BloodDnaWriter(dc, options)); 164 jsonWriter.registerItemWriter(new CaseWriter(dc, options)); 165 jsonWriter.registerItemWriter(new IncaWriter(dc, options)); 166 jsonWriter.registerItemWriter(new SpecimenWriter(dc, options, outputLocation, scriptWriter)); 167 jsonWriter.registerItemWriter(new NoSpecimenWriter(dc, options)); 168 jsonWriter.registerItemWriter(new LysateWriter(dc, options)); 169 jsonWriter.registerItemWriter(new RnaWriter(dc, options)); 170 jsonWriter.registerItemWriter(new DnaWriter(dc, options)); 171 jsonWriter.registerItemWriter(new FlowThroughWriter(dc, options)); 172 jsonWriter.registerItemWriter(new LibraryWriter(dc, options)); 173 jsonWriter.registerItemWriter(new MergedWriter(dc, options, scriptWriter)); 174 jsonWriter.registerItemWriter(new AlignedWriter(dc, options, scriptWriter)); 175 jsonWriter.registerItemWriter(new StringTieWriter(dc, options, scriptWriter)); 176 jsonWriter.registerItemWriter(new CufflinksWriter(dc, options, scriptWriter)); 177 jsonWriter.registerItemWriter(new VariantCallingWriter(dc, options, scriptWriter)); 178 jsonWriter.registerItemWriter(new MethylationWriter(dc, options, scriptWriter)); 179 jsonWriter.registerItemWriter(new OncoarrayWriter(dc, options, scriptWriter)); 180 181 jsonWriter.writeIndex(list, biosources); 182 jsonWriter.writeTypeDefs(); 183 184 Queue<BioSource> bioSourceQueue = new ConcurrentLinkedQueue<>(biosources); 185 186 CallableExporter[] exporters = new CallableExporter[numThreads]; 187 ExecutorService executor = Executors.newFixedThreadPool(numThreads, new ExporterThreadFactory()); 188 142 189 int count = 0; 143 144 DbControl readerDc = null;145 SnapshotManager readerManager = null;146 190 try 147 191 { 148 ScriptWriter scriptWriter = null; 149 if (options.exportFileSyncScripts()) 150 { 151 scriptWriter = new ScriptWriter(dc, outputLocation, options); 192 // Start the exporter threads 193 for (int threadNo = 0; threadNo < numThreads; threadNo++) 194 { 195 CallableExporter exp = new CallableExporter(this, dc, jsonWriter, bioSourceQueue); 196 exporters[threadNo] = exp; 197 executor.execute(exp); 152 198 } 153 199 154 JsonWriter jsonWriter = new JsonWriter(dc, outputLocation, options); 155 jsonWriter.registerItemWriter(new PatientWriter(dc, options)); 156 jsonWriter.registerItemWriter(new RetractWriter(dc, options)); 157 jsonWriter.registerItemWriter(new NotAskedWriter(dc, options)); 158 jsonWriter.registerItemWriter(new NoWriter(dc, options)); 159 jsonWriter.registerItemWriter(new BloodWriter(dc, options)); 160 jsonWriter.registerItemWriter(new BloodDnaWriter(dc, options)); 161 jsonWriter.registerItemWriter(new CaseWriter(dc, options)); 162 jsonWriter.registerItemWriter(new IncaWriter(dc, options)); 163 jsonWriter.registerItemWriter(new SpecimenWriter(dc, options, outputLocation, scriptWriter)); 164 jsonWriter.registerItemWriter(new NoSpecimenWriter(dc, options)); 165 jsonWriter.registerItemWriter(new LysateWriter(dc, options)); 166 jsonWriter.registerItemWriter(new RnaWriter(dc, options)); 167 jsonWriter.registerItemWriter(new DnaWriter(dc, options)); 168 jsonWriter.registerItemWriter(new FlowThroughWriter(dc, options)); 169 jsonWriter.registerItemWriter(new LibraryWriter(dc, options)); 170 jsonWriter.registerItemWriter(new MergedWriter(dc, options, scriptWriter)); 171 jsonWriter.registerItemWriter(new AlignedWriter(dc, options, scriptWriter)); 172 jsonWriter.registerItemWriter(new StringTieWriter(dc, options, scriptWriter)); 173 jsonWriter.registerItemWriter(new CufflinksWriter(dc, options, scriptWriter)); 174 jsonWriter.registerItemWriter(new VariantCallingWriter(dc, options, scriptWriter)); 175 jsonWriter.registerItemWriter(new MethylationWriter(dc, options, scriptWriter)); 176 jsonWriter.registerItemWriter(new OncoarrayWriter(dc, options, scriptWriter)); 177 178 jsonWriter.writeIndex(list, biosources); 179 jsonWriter.writeTypeDefs(); 180 181 readerDc = dc.getSessionControl().newDbControl(dc.getName()); 182 readerManager = new SnapshotManager(); 183 for (BioSource bs : biosources) 184 { 185 count++; 186 if (count % 10 == 0) 200 // Wait in this loop until all biosources have been processed 201 while (count < totalCount) 202 { 203 try 187 204 { 188 ThreadSignalHandler.checkInterrupted(); 189 if (count % 100 == 0) 190 { 191 // Rollback and create new DbControl and snapshot 192 // manager to allow GC to reclaim memory 193 readerDc.close(); 194 readerDc = dc.getSessionControl().newDbControl(dc.getName()); 195 readerManager = new SnapshotManager(); 196 } 197 198 progress.display(15 + ((80*count)/totalCount), 199 "Exporting cohort data (" + count + " of " + totalCount + ")..."); 205 Thread.sleep(2000); 200 206 } 201 202 CohortItem item = new CohortItem(this, readerDc, readerManager, queryManager, bs.getId(), options); 203 jsonWriter.writeJsonData(item); 204 } 205 206 jsonWriter.writeFiles(); 207 jsonWriter.writeBatchIndexLookupFiles(batchIndexProxies); 208 if (scriptWriter != null) scriptWriter.writeScripts(); 209 210 progress.display(98, "Exported cohort data (" + count + " items)"); 207 catch (InterruptedException ex) 208 { 209 throw new SignalException("Aborted by user"); 210 } 211 // Important loop for progress reporting AND for detecting if one of 212 // the exporter threads have failed (getCount() throws an exception) 213 count = 0; 214 for (CallableExporter exp : exporters) 215 { 216 count += exp.getCount(); 217 } 218 progress.display(15 + ((80*count)/totalCount), 219 "Exporting cohort data ("+numThreads+" threads; " + count + " of " + totalCount + ")..."); 220 } 211 221 } 212 222 finally 213 223 { 214 if (readerDc != null) readerDc.close(); 215 } 224 try 225 { 226 // This will send an interrupt to all running threads 227 // There should be none if everything went ok, but if one 228 // thread fails, getCount() will throw an exception and we 229 // will ask the other threads to abort. We give them 10 seconds... 230 executor.shutdownNow(); 231 executor.awaitTermination(10, TimeUnit.SECONDS); 232 } 233 catch (InterruptedException ex) 234 {} 235 } 236 237 jsonWriter.writeFiles(); 238 jsonWriter.writeBatchIndexLookupFiles(batchIndexProxies); 239 if (scriptWriter != null) scriptWriter.writeScripts(); 240 241 progress.display(98, "Exported cohort data (" + count + " items)"); 216 242 } 217 243 … … 245 271 @param batchKey Typically a date identifying the "batch" 246 272 */ 247 public String getBatchIndexProxy(String batchName, String batchKey)273 public synchronized String getBatchIndexProxy(String batchName, String batchKey) 248 274 { 249 275 Map<String, String> batch = batchIndexProxies.get(batchName); … … 265 291 } 266 292 267 293 /** 294 Implementation for multi-threading support. Any number of instances can be created with 295 the same parameters and started at the same time. Each instance will take biosource items 296 one by one from the queue and will continue processing as long as there are items left. 297 298 The main code should call getCount() at regular intervals to get information about the 299 progress. This will also check if there has been an error during the processing and 300 re-throw the same error. 301 302 @since 4.44 303 */ 304 public static class CallableExporter 305 implements Runnable 306 { 307 private final ReleaseExporter exporter; 308 private final DbControl writerDc; 309 private final JsonWriter jsonWriter; 310 private final Queue<BioSource> biosources; 311 312 private volatile int count; 313 private volatile RuntimeException error; 314 315 CallableExporter(ReleaseExporter exporter, DbControl writerDc, JsonWriter jsonWriter, Queue<BioSource> biosources) 316 { 317 this.exporter = exporter; 318 this.writerDc = writerDc; 319 this.jsonWriter = jsonWriter; 320 this.biosources = biosources; 321 } 322 323 /** 324 Get the number of biosources that have been processed so far by this 325 instance. If there has been an error during the processing, this 326 method will re-throw that error. 327 */ 328 public int getCount() 329 { 330 if (error != null) throw error; 331 return count; 332 } 333 334 @Override 335 public void run() 336 { 337 DbControl readerDc = null; 338 SnapshotManager manager = null; 339 QueryManager queryManager = null; 340 341 try 342 { 343 SessionControl sc = writerDc.getSessionControl(); 344 readerDc = sc.newDbControl(writerDc.getName()); 345 manager = new SnapshotManager(); 346 queryManager = new QueryManager(readerDc); 347 348 do 349 { 350 BioSource bs = biosources.poll(); 351 // Exit if there are no more biosources or 352 // if this thread has been interrupted (eg. due to 353 // aborting or an error in another thread) 354 if (bs == null || Thread.interrupted()) break; 355 356 if (count % 100 == 0) 357 { 358 // Rollback and create new DbControl and snapshot 359 // manager to allow GC to reclaim memory 360 readerDc.close(); 361 readerDc = sc.newDbControl(writerDc.getName()); 362 manager = new SnapshotManager(); 363 queryManager = new QueryManager(readerDc); 364 } 365 366 CohortItem item = new CohortItem(exporter, readerDc, manager, queryManager, bs.getId()); 367 jsonWriter.writeJsonData(item); 368 count++; 369 } while (true); 370 } 371 catch (RuntimeException ex) 372 { 373 error = ex; 374 } 375 finally 376 { 377 if (readerDc != null) readerDc.close(); 378 } 379 } 380 } 381 382 /** 383 Thread factory for exporter threads. 384 */ 385 public static class ExporterThreadFactory 386 implements ThreadFactory 387 { 388 389 private final ThreadGroup grp; 390 private int nThreads; 391 392 public ExporterThreadFactory() 393 { 394 this.grp = new ThreadGroup("ReleaseExporterThread"); 395 } 396 397 @Override 398 public Thread newThread(Runnable r) 399 { 400 nThreads++; 401 return new Thread(grp, r, grp.getName()+"."+nThreads); 402 } 403 } 404 268 405 } -
extensions/net.sf.basedb.reggie/trunk/src/net/sf/basedb/reggie/plugins/release/ScriptWriter.java
r5196 r7022 68 68 Add a directory that should be created as part of this release. 69 69 */ 70 public void addMkDir(String path)70 public synchronized void addMkDir(String path) 71 71 { 72 72 if (path != null) … … 89 89 @param jsonFiles 90 90 */ 91 public void addFiles(String releasePath, String sourcePath, String fileList, JSONArray jsonFiles)91 public synchronized void addFiles(String releasePath, String sourcePath, String fileList, JSONArray jsonFiles) 92 92 { 93 93 addMkDir(releasePath); … … 140 140 @since 4.21 141 141 */ 142 public void addLinkToExistingFile(String releasePath, String filename, String releasedVersion)142 public synchronized void addLinkToExistingFile(String releasePath, String filename, String releasedVersion) 143 143 { 144 144 addMkDir(releasePath);
Note: See TracChangeset
for help on using the changeset viewer.