Changeset 4606


Ignore:
Timestamp:
Oct 27, 2008, 4:20:39 PM (15 years ago)
Author:
Martin Svensson
Message:

Fixes #1152 Improve transaction handling in Base1PluginExecuter. There is only one transaction open each time and there are three separate transaction now: one for exporting data, one to import files and the last to import data and do some clean-up.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/plugins/core/net/sf/basedb/plugins/Base1PluginExecuter.java

    r4605 r4606  
    44 Copyright (C) 2006 Johan Enell, Jari Hakkinen, Nicklas Nordborg
    55 Copyright (C) 2007 Johan Enell, Nicklas Nordborg
     6 Copyright (C) 2008 Martin Svensson
    67 
    78 This file is part of BASE - BioArray Software Environment.
     
    278279  */
    279280  private java.io.File execDirectory;
    280 
    281   private int pluginDirectoryId = -1;
    282281 
    283282  private ThreadSignalHandler signalHandler;
     
    311310        {
    312311          dc = sc.newDbControl();
    313           getManualConfigureParameters();
     312          //getManualConfigureParameters();
    314313 
    315314          int channels = bas.getRawDataType().getChannels();
    316           int maxChannels = Values.getInt(String.valueOf(configuration.getValue(maxChannelsParameter.getName())), 0);
    317           int minChannels = Values.getInt(String.valueOf(configuration.getValue(minChannelsParameter.getName())), 0);
     315          int maxChannels = Values.getInt(String.valueOf(configuration.getValue("maxChannels")), 0);
     316          int minChannels = Values.getInt(String.valueOf(configuration.getValue("minChannels")), 0);
    318317          if (channels < minChannels || ( maxChannels != 0 && channels > maxChannels))
    319318          {
     
    325324            extraValues.add(ev.getExtraValueType().getExternalId());
    326325          }
    327           String[] usedFields = String.valueOf(configuration.getValue(usedFieldsParameter.getName())).split("\\\\t");
     326          String[] usedFields = String.valueOf(configuration.getValue("usedFields")).split("\\\\t");
    328327          for (String field : usedFields)
    329328          {
     
    485484  }
    486485 
    487  
    488   private int createPluginDirectory()
    489   {
    490     DbControl dc = null;
    491     int id = -1;
    492     try
    493     {
    494       dc = sc.newDbControl();
    495       String pathName = (String) job.getValue(pluginDirectoryParameter.getName()) + "/" + job.getId();
    496       Path path = new Path(pathName, Path.Type.DIRECTORY);
    497       Directory jobDirectory = null;
    498       try
    499       {
    500         // Load existing...
    501         jobDirectory = Directory.getByPath(dc, path);
    502         // ...and remove files and sub directories in it
    503         emptyDir(dc, path);
    504       }
    505       catch(ItemNotFoundException e)
    506       {
    507         // Create a new directory
    508         jobDirectory = Directory.getNew(dc, path);
    509         dc.saveItem(jobDirectory);
    510       }
    511       dc.commit();
    512       id = jobDirectory.getId();
    513     }
    514     finally
    515     {
    516       if (dc != null) dc.close();
    517     }
    518     return id;
    519   }
    520  
    521486  /**
    522487    Deletes all files and sub directories under dir.
     
    542507  }
    543508 
    544  
    545   /**
    546     Deletes all files and sub directories under dir. The directory given
    547     itself is not deleted.
    548    
    549     @param dc
    550     @param dir
    551     @return true if all deletions were successful
    552   */
    553   private boolean emptyDir(DbControl dc, Path dir)
    554   {
    555     if (dir.getType() == Path.Type.DIRECTORY)
    556     {
    557       Directory d = Directory.getByPath(dc, dir);
    558       for (File f : d.getFiles().list(dc))
    559       {
    560         dc.deleteItem(f);
    561       }
    562       for (Directory subD : d.getSubDirectories().list(dc))
    563       {
    564         if (!emptyDir(dc, subD.getPath()))
    565         {
    566           return false;
    567         }
    568         dc.deleteItem(subD);
    569       }
    570       return true;
    571     }
    572     return false;
    573   }
    574 
    575509  @Override
    576510  public Collection<Permissions> getPermissions()
     
    628562      File stdin = null;
    629563      File stdout = null;
    630       Transformation trans = null;
     564      Transformation trans = null;
     565     
     566      Directory workingDirectory = null;
     567
     568      //Export
    631569      try
    632570      {
    633571        dc = sc.newDbControl();
    634         getManualConfigureParameters();
    635         getConfigureJobParameters();
    636  
    637         //Export
     572        String workingPath = (String) job.getValue("pluginDirectory") + "/" + job.getId();
     573        workingDirectory = getPluginDirectory(dc, workingPath);
     574        progress.display(0, "Exporting data to be used by plugin.");
     575        stdin = exportData(dc, workingDirectory);
     576        checkInterrupted();
     577        dc.commit();
     578      }
     579      catch(Exception e)
     580      {
     581        response.setError("Error during export: " + e.getMessage(), Arrays.asList(e));
     582        return;
     583      }
     584      finally
     585      {
     586        if (dc != null) dc.close();
     587      }
     588     
     589      //Execution
     590      Process externalProcess = null;
     591      try
     592      {
     593        checkInterrupted();
     594        copy(stdin, getExecDirectory());
     595        progress.display(10, "Running on remote computation server.");
     596        externalProcess = Runtime.getRuntime().exec(getExecLine(), null, getExecDirectory());
     597
     598        ByteArrayOutputStream err = new ByteArrayOutputStream();
     599        StreamHandler errorStream = new StreamHandler(
     600            new BufferedInputStream(externalProcess.getErrorStream()),
     601            new BufferedOutputStream(err));
     602
     603        FileOutputStream out =
     604          new FileOutputStream(new java.io.File(getExecDirectory(), "stdout.txt"));
     605        StreamHandler inputStream = new StreamHandler(
     606          new BufferedInputStream(externalProcess.getInputStream()),
     607          new BufferedOutputStream(out));
     608       
     609        FileInputStream in =
     610          new FileInputStream(new java.io.File(getExecDirectory(), "stdin.txt"));
     611        StreamHandler outputStream = new StreamHandler(
     612          new BufferedInputStream(in),
     613          new BufferedOutputStream(externalProcess.getOutputStream()));
     614       
     615        inputStream.start();
     616        errorStream.start();
     617        outputStream.start();
     618        int exitValue = 0;
    638619        try
    639620        {
    640           BioAssaySet parentBas = getSourceBioAssaySet(dc);
    641           trans = parentBas.newTransformation(Job.getById(dc, job.getId()));
    642           trans.setName(PluginConfiguration.getById(dc, configuration.getId()).getName());
    643           dc.saveItem(trans);
    644           progress.display(0, "Exporting data to be used by plugin.");
    645           stdin = File.getById(dc, exportData());
    646           checkInterrupted();
    647           copy(stdin, getExecDirectory());
    648         }
    649         catch(Exception e)
    650         {
    651           response.setError("Error during export: " + e.getMessage(), Arrays.asList(e));
     621          exitValue = externalProcess.waitFor();
     622        }
     623        catch (InterruptedException ex)
     624        {
     625          // Aborted by user
     626          externalProcess.destroy();
     627          throw new SignalException("Aborted by user", ex);
     628        }
     629
     630        progress.display(50, "Reading from error stream.");
     631        for(int i = 0; errorStream.isAlive(); ++i)
     632        {
     633          Thread.sleep(1000);
     634          if (i > 300) throw new BaseException("Waited 5 minutes for the errormessage. I give up.");
     635        }
     636                 
     637        err.flush();
     638        msg = err.toString();
     639        out.flush();
     640       
     641        if (exitValue != 0)
     642        {
     643          response.setError(msg, null);
    652644          return;
    653645        }
     646      }
     647      catch (Exception e)
     648      {
     649        response.setError("Error during execution: " + e.getMessage(), Arrays.asList(e));
     650        return;
     651      }
     652     
     653      // Import analysis files created by the BASE1 plug-in
     654      try
     655      {
     656        //Copy files
     657        dc = sc.newDbControl();
     658        progress.display(60, "Copying files to server.");
     659        importTempFiles(dc, getExecDirectory().listFiles(), workingDirectory);
     660        dc.commit();
     661        if (!deleteDir(getExecDirectory()))
     662        {
     663          response.setError("Could not remove execution directory: "+getExecDirectory().getAbsolutePath(), null);
     664        }       
     665      }
     666      finally
     667      {
     668        if (dc != null) dc.close();
     669      }
     670           
     671      //Import data and clean up
     672      try
     673      {
     674        dc = sc.newDbControl();
     675        checkInterrupted();
     676        progress.display(70, "Importing data from plugin.");
     677        BioAssaySet parentBas = getSourceBioAssaySet(dc);
     678        trans = parentBas.newTransformation(Job.getById(dc, job.getId()));
     679        trans.setName(PluginConfiguration.getById(dc, configuration.getId()).getName());
     680        dc.saveItem(trans);
     681        stdout = File.getFile(dc, workingDirectory, "stdout.txt", true);
     682        associateFiles(dc, trans, workingDirectory);
     683        importData(dc, stdout, trans);
    654684       
    655         //Execution
    656         Process externalProcess = null;
    657         try
    658         {
    659           checkInterrupted();
    660           progress.display(10, "Running on remote computation server.");
    661           externalProcess = Runtime.getRuntime().exec(getExecLine(), null, getExecDirectory());
    662  
    663           ByteArrayOutputStream err = new ByteArrayOutputStream();
    664           StreamHandler errorStream = new StreamHandler(
    665               new BufferedInputStream(externalProcess.getErrorStream()),
    666               new BufferedOutputStream(err));
    667  
    668           FileOutputStream out =
    669             new FileOutputStream(new java.io.File(getExecDirectory(), "stdout.txt"));
    670           StreamHandler inputStream = new StreamHandler(
    671             new BufferedInputStream(externalProcess.getInputStream()),
    672             new BufferedOutputStream(out));
    673          
    674           FileInputStream in =
    675             new FileInputStream(new java.io.File(getExecDirectory(), "stdin.txt"));
    676           StreamHandler outputStream = new StreamHandler(
    677             new BufferedInputStream(in),
    678             new BufferedOutputStream(externalProcess.getOutputStream()));
    679          
    680           inputStream.start();
    681           errorStream.start();
    682           outputStream.start();
    683           int exitValue = 0;
    684           try
    685           {
    686             exitValue = externalProcess.waitFor();
    687           }
    688           catch (InterruptedException ex)
    689           {
    690             // Aborted by user
    691             externalProcess.destroy();
    692             throw new SignalException("Aborted by user", ex);
    693           }
    694 
    695           progress.display(50, "Reading from error stream.");
    696           for(int i = 0; errorStream.isAlive(); ++i)
    697           {
    698             Thread.sleep(1000);
    699             if (i > 300) throw new BaseException("Waited 5 minutes for the errormessage. I give up.");
    700           }
    701                    
    702           err.flush();
    703           msg = err.toString();
    704           out.flush();
    705          
    706           if (exitValue != 0)
    707           {
    708             response.setError(msg, null);
    709             return;
    710           }
    711         }
    712         catch (Exception e)
    713         {
    714           response.setError("Error during execution: " + e.getMessage(), Arrays.asList(e));
    715           return;
    716         }
    717         finally
    718         {
    719           //Copy files
    720           progress.display(60, "Copying files to server.");
    721           importFiles(dc, trans);
    722           if (!deleteDir(getExecDirectory()))
    723           {
    724             response.setError("Could not remove execution directory: "+getExecDirectory().getAbsolutePath(), null);
    725           }
    726           stdout = File.getFile(dc, getPluginDirectory(dc), "stdout.txt", true);
    727         }
    728        
    729         //Import data
    730         try
    731         {
    732           checkInterrupted();
    733           progress.display(70, "Importing data from plugin.");
    734           importData(dc, stdout, trans);
    735         }
    736         catch (Exception ex)
    737         {
    738           response.setError(ex.getMessage(), Arrays.asList(ex));
    739           return;
    740         }
    741         dc.commit();
    742       }
    743       finally
    744       {
    745         if (dc != null) dc.close();
    746       }
    747      
    748       //Clean up
    749       try
    750       {
    751         dc = sc.newDbControl();
    752         if (!Values.getBoolean(String.valueOf(configuration.getValue(leaveStdinParameter.getName()))))
     685        if (!Values.getBoolean(String.valueOf(configuration.getValue("leaveStdin"))))
    753686        {
    754687          if (stdin != null)
     
    764697          }
    765698        }
    766         if (!Values.getBoolean(String.valueOf(configuration.getValue(leaveStdoutParameter.getName()))))
     699        if (!Values.getBoolean(String.valueOf(configuration.getValue("leaveStdout"))))
    767700        {
    768701          if (stdout != null)
     
    778711          }
    779712        }
     713       
    780714        dc.commit();
     715      }
     716      catch (Exception ex)
     717      {
     718        response.setError(ex.getMessage(), Arrays.asList(ex));
     719        return;
    781720      }
    782721      finally
     
    805744  /**
    806745    Export the bioassay set to a file called 'stdin.txt' in the plugin directory.
    807    
    808     @return The ID of the file where the data is exported to.
    809   */
    810   private int exportData()
    811   {
    812     DbControl dc = null;
     746    @param dc DbControl used to access the database
     747    @param workingDirectory Where in base' file system the exported BASEFile should be stored.
     748    @return The the file where the data is exported to.
     749  */
     750  private File exportData(DbControl dc, Directory workingDirectory)
     751  {
    813752    try
    814753    {
    815       dc = sc.newDbControl();
    816       Directory d = getPluginDirectory(dc);
    817       File file = File.getFile(dc, d, "stdin.txt", true);
     754      File file = File.getFile(dc, workingDirectory, "stdin.txt", true);
    818755      file.setMimeTypeAuto("text/plain", null);
    819756      dc.saveItem(file);
     
    822759     
    823760      Map<String, String> nameMap = loadNameMap(dc, bas);
    824       String usedColumns = (String) configuration.getValue(usedColumnsParameter.getName());
     761      String usedColumns = (String) configuration.getValue("usedColumns");
    825762      List<String> reporterFields = usedColumns == null ?
    826763        null : convertNames(nameMap, usedColumns.split("\\\\t"));
    827764     
    828       String usedFields = (String) configuration.getValue(usedFieldsParameter.getName());
     765      String usedFields = (String) configuration.getValue("usedFields");
    829766      List<String> spotFields = usedFields == null ?
    830767        null : convertNames(nameMap, usedFields.split("\\\\t"));
     
    840777      }
    841778     
    842       Boolean mergeReporters = (Boolean) configuration.getValue(geneAveragesParameter.getName());
    843      
    844       TreeMap<String, JobParameter> base1parametertype = getJobParameterObjectsFromXML(String.valueOf(configuration.getValue(jobParametersParameter.getName())), true);
     779      Boolean mergeReporters = (Boolean) configuration.getValue("geneAverages");
     780     
     781      TreeMap<String, JobParameter> base1parametertype = getJobParameterObjectsFromXML(String.valueOf(configuration.getValue("jobParameters")), true);
    845782      ListMap<String, String> parameters = new ListMap<String, String>();
    846       for (PluginParameter<?> pp : getJobParametersFromXML(String.valueOf(configuration.getValue(jobParametersParameter.getName())), true))
     783      for (PluginParameter<?> pp : getJobParametersFromXML(String.valueOf(configuration.getValue("jobParameters")), true))
    847784      {
    848785        String name = pp.getName();
     
    866803      checkInterrupted();
    867804      BioAssaySetExporter exporter = new BioAssaySetExporter();
    868       if ((Boolean) configuration.getValue(serialFormatParameter.getName()))
     805      if ((Boolean) configuration.getValue("serialFormat"))
    869806      {
    870807        exporter.exportBaseFileSerial(bas, file, parameters, reporterFields, spotFields, mergeReporters);
     
    874811        exporter.exportBaseFileMatrix(bas, file, parameters, reporterFields, spotFields, mergeReporters);
    875812      }
    876       dc.commit();
    877       return file.getId();
     813      return file;
    878814    }
    879815    catch (IOException e)
     
    884820    {
    885821      throw new BaseException(e);
    886     }
    887     finally
    888     {
    889       if (dc != null) dc.close();
    890822    }
    891823  }
     
    12861218  private String getExecLine()
    12871219  {
    1288     String execPath = (String) configuration.getValue(execPathParameter.getName());
    1289     String execName = (String) configuration.getValue(execNameParameter.getName());
     1220    String execPath = (String) configuration.getValue("execPath");
     1221    String execName = (String) configuration.getValue("execName");
    12901222    String s = java.io.File.separator;
    12911223    if (execPath.endsWith(s) || execName.startsWith(s))
     
    13781310    Get a {@link Directory directory}
    13791311  */
    1380   private Directory getPluginDirectory(DbControl dc)
    1381   {
    1382     if (pluginDirectoryId == -1)
    1383     {
    1384       pluginDirectoryId = createPluginDirectory();
    1385     }
    1386     return Directory.getById(dc, pluginDirectoryId);
     1312  private Directory getPluginDirectory(DbControl dc, String pathName)
     1313  {
     1314    Path path = new Path(pathName, Path.Type.DIRECTORY);
     1315    Directory jobDirectory = null;
     1316    try
     1317    {
     1318      // Load existing...
     1319      jobDirectory = Directory.getByPath(dc, path);
     1320    }
     1321    catch(ItemNotFoundException e)
     1322    {
     1323      // Create a new directory if no existing were found
     1324      jobDirectory = Directory.getNew(dc, path);
     1325      dc.saveItem(jobDirectory);
     1326    }
     1327    return jobDirectory;
    13871328  }
    13881329 
     
    19471888  }
    19481889 
    1949   private void importFiles(DbControl dc, Transformation trans)
    1950   {
    1951     DbControl mydc = null;
    1952     try
    1953     {
    1954       mydc = sc.newDbControl();
    1955       Directory homeDirectory = getPluginDirectory(mydc);
    1956       importTempFiles(mydc, getExecDirectory().listFiles(), homeDirectory);
    1957       mydc.commit();
    1958     }
    1959     finally
    1960     {
    1961       if (mydc != null)
    1962         mydc.close();
    1963     }
    1964    
     1890  // Associate files in a directory with transformation
     1891  private void associateFiles(DbControl dc, Transformation trans, Directory homeDirectory)
     1892  {
    19651893    checkInterrupted();
    1966     Directory homeDirectory = getPluginDirectory(dc);
    19671894    for(File f : homeDirectory.getFiles().list(dc))
    19681895    {
     
    19771904  }
    19781905
    1979 
     1906  // Import external files to a directory in BASE
    19801907  private void importTempFiles(DbControl dc, java.io.File[] files, Directory d)
    19811908  {
Note: See TracChangeset for help on using the changeset viewer.