Changeset 625


Ignore:
Timestamp:
Mar 11, 2008, 2:36:19 PM (13 years ago)
Author:
Nicklas Nordborg
Message:

References #102: Intensity calculator for Illumina SNP data

Added error handling. Improved performance by preloading reporters.

Location:
plugins/base2/net.sf.basedb.illumina/trunk/src/net/sf/basedb/illumina/plugins
Files:
1 added
1 edited

Legend:

Unmodified
Added
Removed
  • plugins/base2/net.sf.basedb.illumina/trunk/src/net/sf/basedb/illumina/plugins/SnpIntensityCalculator.java

    r624 r625  
    2525
    2626import java.io.IOException;
     27import java.sql.SQLException;
    2728import java.util.ArrayList;
    2829import java.util.Arrays;
    2930import java.util.Collections;
     31import java.util.HashMap;
     32import java.util.HashSet;
    3033import java.util.List;
     34import java.util.Map;
    3135import java.util.Set;
    3236import java.util.regex.Pattern;
     
    3842import net.sf.basedb.core.Config;
    3943import net.sf.basedb.core.DbControl;
     44import net.sf.basedb.core.DynamicReporterQuery;
     45import net.sf.basedb.core.DynamicResultIterator;
    4046import net.sf.basedb.core.Experiment;
    4147import net.sf.basedb.core.File;
     
    4551import net.sf.basedb.core.InvalidUseOfNullException;
    4652import net.sf.basedb.core.Item;
     53import net.sf.basedb.core.ItemNotFoundException;
    4754import net.sf.basedb.core.ItemParameterType;
    4855import net.sf.basedb.core.ItemQuery;
     
    5461import net.sf.basedb.core.RawBioAssay;
    5562import net.sf.basedb.core.RawDataType;
    56 import net.sf.basedb.core.ReporterBatcher;
     63import net.sf.basedb.core.Reporter;
    5764import net.sf.basedb.core.RequestInformation;
     65import net.sf.basedb.core.SimpleAbsoluteProgressReporter;
    5866import net.sf.basedb.core.SpotBatcher;
    5967import net.sf.basedb.core.StringParameterType;
     68import net.sf.basedb.core.StringUtil;
    6069import net.sf.basedb.core.Transformation;
    6170import net.sf.basedb.core.data.ReporterData;
     
    6877import net.sf.basedb.core.plugin.Request;
    6978import net.sf.basedb.core.plugin.Response;
     79import net.sf.basedb.core.query.Dynamic;
    7080import net.sf.basedb.core.query.Hql;
    7181import net.sf.basedb.core.query.Orders;
     82import net.sf.basedb.core.query.SqlResult;
    7283import net.sf.basedb.core.signal.SignalHandler;
    7384import net.sf.basedb.core.signal.SignalTarget;
     
    7586import net.sf.basedb.illumina.Illumina;
    7687import net.sf.basedb.plugins.util.Parameters;
    77 import net.sf.basedb.util.ChainedProgressReporter;
    7888import net.sf.basedb.util.IntensityCalculatorUtil;
    7989import net.sf.basedb.util.NumberFormatUtil;
     
    104114      "as follows:\n" +
    105115      " * Channel 1 = GType (AA = 1.0, AB = 0.0, BB = -1.0, NC = null\n" +
    106       " * Channel 2 = B Allele Freq\n" +
    107       " * Channel 3 = Log R Ratio",
     116      " * Channel 2 = Log R Ratio\n" +
     117      " * Channel 3 = B Allele Freq",
    108118      Illumina.VERSION,
    109119      Illumina.COPYRIGHT,
     
    123133 
    124134 
     135  private static final PluginParameter<String> missingReporterErrorParameter = new PluginParameter<String>(
     136    "missingReporterError",
     137    "Reporter not found",
     138    "How to handle errors that are caused by a reporter not beeing present in the datbase. If not specified the " +
     139      "default error handling is used.\n\n"+
     140      "null = Import the data but set the reporter to null\n"+
     141      "skip = Skip the current data line and continue\n"+
     142      "fail = Stop with an error message",
     143      new StringParameterType(255, null, false, 1, 0, 0,
     144          Arrays.asList( new String[] { "null", "skip", "fail"} ))
     145    );
     146
    125147  private RequestInformation configureJob;
    126148  private ThreadSignalHandler signalHandler;
     
    175197      // rawBioAssays parameter
    176198      List<RawBioAssay> sources = (List<RawBioAssay>)job.getValues("rawBioAssays");
     199      long totalProgressTicks = 0;
    177200      if (sources == null || sources.size() == 0)
    178201      {
     
    182205      {
    183206        dc.reattachItem(rba);
     207        totalProgressTicks += rba.getNumFileSpots();
    184208      }
    185209
     
    197221      dc.saveItem(rootBas);
    198222
    199       ChainedProgressReporter chainedProgress = progress == null ?
    200           null : new ChainedProgressReporter(progress);
    201       float percentPerStep = 100.0f / (1 + sources.size());
    202       int lowLimit = 0;
    203       int highLimit = (int)percentPerStep;
    204 
    205       // Create bioassays for each source
     223      ArrayDesign design = sources.get(0).getArrayDesign();
     224      totalProgressTicks += 3 * design.getNumFileFeatures();
     225      SimpleAbsoluteProgressReporter simpleProgress = progress == null ?
     226        null : new SimpleAbsoluteProgressReporter(progress, totalProgressTicks);
     227     
     228      // 1. Preload all reporters from the SNP manifest file
     229      Map<String, ReporterProxy> preloaded = null;
     230      preloaded = preloadReporters(dc, design, simpleProgress);
     231     
     232      // 2. Load reporter ID:s from the database
     233      mapReporterIds(dc, preloaded, simpleProgress);
     234     
     235      // 3. Insert position data into the database
     236      PositionBatcher posBatcher = rootBas.getPositionBatcher();
     237      Set<Integer> skipped = insertPosData(dc, posBatcher, preloaded, simpleProgress);
     238      posBatcher.close();
     239     
     240      // 4. Create bioassays for each raw bioassay source
    206241      SpotBatcher spotBatcher = rootBas.getSpotBatcher();
     242      int numInserted = 0;
    207243      for (RawBioAssay rba : sources)
    208244      {
    209         if (chainedProgress != null)
    210         {
    211           chainedProgress.setRange(lowLimit, highLimit);
    212           lowLimit = highLimit;
    213           highLimit += percentPerStep;
    214         }
    215         insertSpotData(dc, rba, rootBas, spotBatcher, chainedProgress);
     245        numInserted += insertSpotData(dc, rba, rootBas, spotBatcher, skipped, simpleProgress);
    216246      }
    217247      spotBatcher.close();     
    218      
    219       // Get the manifest file from the array design and
    220       // batch position, reporter and addressID
    221       if (chainedProgress != null)
    222       {
    223         chainedProgress.setRange(lowLimit, highLimit);
    224         lowLimit = highLimit;
    225         highLimit += percentPerStep;
    226       }
    227       PositionBatcher posBatcher = rootBas.getPositionBatcher();
    228       ReporterBatcher reporters = ReporterBatcher.getNew(dc);
    229       ArrayDesign design = sources.get(0).getArrayDesign();
    230       insertPosData(dc, design, posBatcher, reporters, chainedProgress);
    231       posBatcher.close();
    232       reporters.close();
    233  
     248      if (progress != null) progress.display(100, "Committing transactio...");
    234249      dc.commit();
    235250      if (progress != null) progress.display(100, "Done");
     251      String message = numInserted + " spots inserted";
     252      if (!skipped.isEmpty()) message += "; " + skipped.size() + " positions skipped (missing reporter)";
    236253      response.setDone("Done");
    237254    }
     
    342359        storeValue(job, request, ri.getParameter(Parameters.CHARSET_PARAMETER));
    343360        storeValue(job, request, ri.getParameter(Parameters.DECIMAL_SEPARATOR_PARAMETER));
     361       
     362        // Error handling
     363        storeValue(job, request, ri.getParameter(Parameters.DEFAULT_ERROR));
     364        storeValue(job, request, ri.getParameter(Parameters.NUMBER_FORMAT_ERROR));
     365        storeValue(job, request, ri.getParameter(missingReporterErrorParameter.getName()));
    344366
    345367        Job.ExecutionTime execTime = Job.ExecutionTime.MEDIUM; 
     
    382404    if (charset == null) charset = Config.getCharset();
    383405    return charset;
     406  }
     407 
     408  private String getErrorOption(String parameterName)
     409  {
     410    Object option = null;
     411    option = job.getValue(parameterName);
     412    if (option == null) option = job.getValue(Parameters.DEFAULT_ERROR);
     413    return option == null ? "fail" : option.toString();
    384414  }
    385415 
     
    420450        parameters.add(Parameters.charsetParameter(null, null, null));
    421451        parameters.add(Parameters.decimalSeparatorParameter(null, null, null));
     452       
     453        parameters.add(Parameters.errorSection(null, null));
     454        parameters.add(Parameters.defaultError(null, null, "fail"));
     455        parameters.add(Parameters.numberFormatError(null, null, null));
     456        parameters.add(missingReporterErrorParameter);
    422457      }
    423458      finally
     
    436471  }
    437472 
    438   /**
    439     Use the manifest file on the array design to map each adress ID to
    440     a reporter.
    441   */
    442   private void insertPosData(DbControl dc, ArrayDesign design, PositionBatcher posBatcher,
    443     ReporterBatcher reporters, ProgressReporter progress)
    444     throws IOException
    445   {
    446    
     473  private File getManifestFile(DbControl dc, ArrayDesign design)
     474  {
    447475    File manifest = FileStoreUtil.getDataFile(dc, design, Illumina.SNP_MANIFEST_FILE_ID);
    448476    if (manifest == null)
    449477    {
    450478      throw new BaseException("Can't find manifest file on array design: " + design);
    451     }
    452    
     479    }   
     480    return manifest;
     481  }
     482 
     483  private FlatFileParser getManifestParser(File manifest)
     484    throws IOException
     485  {
    453486    FlatFileParser ffp = new FlatFileParser();
    454487    ffp.setDataSplitterRegexp(Pattern.compile(","));
     
    461494      throw new BaseException("Can't find SNP data in file: " + manifest);
    462495    }
    463    
    464     Mapper reporterMapper = ffp.getMapper("\\IlmnID\\");
    465     Mapper addressMapper = ffp.getMapper("\\AddressA_ID\\");
    466    
    467     int numSpots = design.getNumFileFeatures();
    468     int currentSpot = 0;
    469     int interval = numSpots / 100;
    470     float progressFactor = 100.0f / numSpots;
    471     while (ffp.hasMoreData())
    472     {
    473       checkInterrupted();
    474       currentSpot++;
    475       FlatFileParser.Data data = ffp.nextData();
    476       String reporterId = reporterMapper.getValue(data);
    477       int addressId = addressMapper.getInt(data);
    478       ReporterData reporter = reporters.getByExternalId(reporterId);
    479       posBatcher.insert(addressId, reporter);
    480      
    481       if (currentSpot % interval == 0 && progress != null)
    482       {
    483         progress.display((int)(progressFactor * currentSpot),
    484           "Mapping positions from array design. " +
    485           currentSpot + " of " + numSpots + " done.");
    486       }
    487     }
    488     posBatcher.flush();
    489    
    490   }
    491  
    492   private void insertSpotData(DbControl dc, RawBioAssay rba, BioAssaySet root,
    493     SpotBatcher spotBatcher, ProgressReporter progress)
     496    return ffp;
     497  }
     498 
     499
     500  private int insertSpotData(DbControl dc, RawBioAssay rba, BioAssaySet root,
     501    SpotBatcher spotBatcher, Set<Integer> skipped, SimpleAbsoluteProgressReporter progress)
    494502    throws IOException
    495503  {
     
    520528      throw new BaseException("Can't find SNP data in file: " + dataFile);
    521529    }
     530
     531    String invalidNumber = getErrorOption(Parameters.NUMBER_FORMAT_ERROR);
     532    boolean skipLine = "skip".equals(invalidNumber);
     533    boolean useNull = "null".equals(invalidNumber);
    522534   
    523535    BioAssay ba = root.newRootBioAssay(Collections.singleton(rba));
     
    528540    Mapper addressMapper = ffp.getMapper("\\Address\\");
    529541    Mapper gTypeMapper = new GTypeMapper(ffp.getColumnHeaderIndex("GType"));
    530     Mapper bAlleleMapper = new Null2NaNMapper(ffp.getMapper("\\B Allele Freq\\", true));
    531     Mapper ratioMapper = new Null2NaNMapper(ffp.getMapper("\\Log R Ratio\\", true));
    532 
     542    Mapper bAlleleMapper = new Null2NaNMapper(ffp.getMapper("\\B Allele Freq\\", useNull));
     543    Mapper ratioMapper = new Null2NaNMapper(ffp.getMapper("\\Log R Ratio\\", useNull));
     544 
    533545    int numSpots = rba.getNumFileSpots();
     546    long baseProgress = progress == null ? 0 : progress.getCompleted();
     547    int currentSpot = 0;
     548    int numInserted = 0;
     549    int interval = numSpots / 100;
     550    float[] snpdata = new float[3];
     551    FlatFileParser.Data dataLine = null;
     552    try
     553    {
     554      while (ffp.hasMoreData())
     555      {         
     556        checkInterrupted();
     557        currentSpot++;
     558        dataLine = ffp.nextData();
     559        int addressId = addressMapper.getInt(dataLine);
     560        if (skipped.contains(addressId)) continue;
     561       
     562        try
     563        {
     564          snpdata[0] = gTypeMapper.getFloat(dataLine);
     565          snpdata[1] = ratioMapper.getFloat(dataLine);
     566          snpdata[2] = bAlleleMapper.getFloat(dataLine);
     567        }
     568        catch (RuntimeException ex)
     569        {
     570          if (skipLine) continue;
     571          throw ex;
     572        }
     573        numInserted++;
     574        spotBatcher.insert(baCol, addressId, snpdata);
     575       
     576        if (currentSpot % interval == 0 && progress != null)
     577        {
     578          progress.displayAbsolute(baseProgress + currentSpot,
     579            "Inserting spot data for raw bioassay '" + rba.getName() + "'. " +
     580            currentSpot + " of " + numSpots + " done.");
     581        }
     582      }
     583      spotBatcher.flush();
     584    }
     585    catch (Exception ex)
     586    {
     587      throw new BaseException(ex.getMessage() +
     588          " on line " + (dataLine == null ? "-1" : dataLine.lineNo() + " in file '" +
     589          dataFile.getName() + "': " +
     590          StringUtil.trimString(dataLine.line(), 20)), ex);     
     591    }
     592    return numInserted;
     593  }
     594 
     595  /**
     596    Parse the SNP manifest file attached to the array design. Pick the
     597    AddressA_ID --> position and IlmnID --> Reporter.externalId and
     598    put those in a map which is returned.
     599  */
     600  private Map<String, ReporterProxy> preloadReporters(DbControl dc, ArrayDesign design,
     601    SimpleAbsoluteProgressReporter progress)
     602    throws IOException
     603  {
     604    int numSpots = design.getNumFileFeatures();
     605    long baseProgress = progress == null ? 0 : progress.getCompleted();
     606    int currentSpot = 0;
     607    int numInserted = 0;
     608    int interval = numSpots / 100;
     609   
     610    Map<String, ReporterProxy> preloaded =
     611      new HashMap<String, ReporterProxy>(numSpots);
     612   
     613    // Scan the file and create proxies for all reporters
     614    File manifest = getManifestFile(dc, design);
     615    String mfFilename = manifest.getName();
     616    FlatFileParser ffp = getManifestParser(manifest);
     617    Mapper reporterMapper = ffp.getMapper("\\IlmnID\\");
     618    Mapper addressMapper = ffp.getMapper("\\AddressA_ID\\");
     619    while (ffp.hasMoreData())
     620    {
     621      checkInterrupted();
     622      currentSpot++;
     623      FlatFileParser.Data dataLine = ffp.nextData();
     624      String externalId = reporterMapper.getValue(dataLine);
     625      Integer addressId = addressMapper.getInt(dataLine);
     626      if (externalId == null || addressId == null) continue;
     627     
     628      ReporterProxy proxy = preloaded.get(externalId);
     629      if (proxy == null)
     630      {
     631        proxy = new ReporterProxy();
     632        preloaded.put(externalId, proxy);
     633      }
     634      proxy.addAddressId(addressId);
     635
     636      if (currentSpot % interval == 0 && progress != null)
     637      {
     638        progress.displayAbsolute(baseProgress + currentSpot,
     639          "Loading reporters from file '" + mfFilename + "': " +
     640          currentSpot + " of " + numSpots + " done.");
     641      }
     642    }
     643    return preloaded;
     644  }
     645 
     646  /**
     647    Query the reporters table and fill in the Reporter.id value for
     648    all reporters that have been loaded from the SNP manifest file.
     649  */
     650  private void mapReporterIds(DbControl dc, Map<String, ReporterProxy> preloaded,
     651    SimpleAbsoluteProgressReporter progress)
     652    throws SQLException
     653  {
     654    int numSpots = preloaded.size();
     655    long baseProgress = progress == null ? 0 : progress.getCompleted();
    534656    int currentSpot = 0;
    535657    int interval = numSpots / 100;
    536     float progressFactor = 100.0f / numSpots;
    537     float[] snpdata = new float[3];
    538     while (ffp.hasMoreData())
    539     {         
     658
     659    // Fill the proxies with ID:s for the reporters
     660    DynamicReporterQuery query = Reporter.getDynamicQuery();
     661    query.select(Dynamic.selectReporter("id"));
     662    query.select(Dynamic.selectReporter("externalId"));
     663    DynamicResultIterator result = query.iterate(dc);
     664    while (result.hasNext())
     665    {
     666      SqlResult reporter = result.next();
     667      ReporterProxy proxy = preloaded.get(reporter.getString(2));
     668      if (proxy != null)
     669      {
     670        checkInterrupted();
     671        currentSpot++;
     672        proxy.setTheId(reporter.getInt(1));
     673       
     674        if (currentSpot % interval == 0 && progress != null)
     675        {
     676          progress.displayAbsolute(baseProgress + currentSpot,
     677            "Loading reporters from database: " +
     678            currentSpot + " of " + numSpots + " done.");
     679        }
     680      }
     681    }
     682    result.close();
     683   
     684    if (progress != null)
     685    {
     686      progress.displayAbsolute(baseProgress+numSpots,
     687        "Loading reporters from database: " +
     688        numSpots + " of " + numSpots + " done.");
     689    }
     690  }
     691
     692  /**
     693    Insert position -> reporter mapping in the database. Data is taken
     694    from the preloaded map. The returned set contains the addressId:s
     695    of positions that has been skipped because no reporter was found.
     696  */
     697  private Set<Integer> insertPosData(DbControl dc, PositionBatcher posBatcher,
     698    Map<String, ReporterProxy> preloaded, SimpleAbsoluteProgressReporter progress)
     699  {
     700    // Insert pos -> reporter mapping into database
     701    String missingReporter = getErrorOption(missingReporterErrorParameter.getName());
     702    boolean skipLine = "skip".equals(missingReporter);
     703    boolean useNull = "null".equals(missingReporter);
     704    Set<Integer> skipped = new HashSet<Integer>();
     705
     706    int numSpots = preloaded.size();
     707    long baseProgress = progress == null ? 0 : progress.getCompleted();
     708    int currentSpot = 0;
     709    int interval = numSpots / 100;
     710   
     711    for (Map.Entry<String, ReporterProxy> entry : preloaded.entrySet())
     712    {
    540713      checkInterrupted();
    541       FlatFileParser.Data data = ffp.nextData();
    542       snpdata[0] = gTypeMapper.getFloat(data);
    543       snpdata[1] = bAlleleMapper.getFloat(data);
    544       snpdata[2] = ratioMapper.getFloat(data);
    545       int addressId = addressMapper.getInt(data);
    546       spotBatcher.insert(baCol, addressId, snpdata);
     714      currentSpot++;
     715      ReporterProxy proxy = entry.getValue();
     716      Integer addressId = proxy.getAddressId();
     717      ReporterData reporter = proxy.getReporter();
     718     
     719      // Missing reporter?
     720      if (reporter == null)
     721      {
     722        if (skipLine)
     723        {
     724          skipped.add(addressId);
     725          if (proxy.hasMultiple()) skipped.addAll(proxy.getMultipleAddressIds());
     726          continue;
     727        }
     728        if (!useNull) throw new ItemNotFoundException("Reporter[id=" + entry.getKey() +"]");
     729      }
     730      posBatcher.insert(addressId, reporter);
     731      if (proxy.hasMultiple())
     732      {
     733        for (int aId : proxy.getMultipleAddressIds())
     734        {
     735          posBatcher.insert(addressId, reporter);
     736        }
     737      }
    547738     
    548739      if (currentSpot % interval == 0 && progress != null)
    549740      {
    550         progress.display((int)(progressFactor * currentSpot),
    551           "Inserting spot data for raw bioassay '" + rba.getName() + "'. " +
     741        checkInterrupted();
     742        progress.displayAbsolute(baseProgress + currentSpot,
     743          "Mapping reporters to positions: " +
    552744          currentSpot + " of " + numSpots + " done.");
    553       }
    554     }
    555     spotBatcher.flush();
    556 
     745      }     
     746    }
     747    return skipped;
    557748  }
    558749 
Note: See TracChangeset for help on using the changeset viewer.