Changeset 5405


Ignore:
Timestamp:
Sep 10, 2010, 1:09:05 PM (13 years ago)
Author:
Nicklas Nordborg
Message:

References #1503: Add support for resuming jobs after a system re-start

It is now possible to re-execute a job if the plug-in responds with Response.setContinue(). The Lowess normalization plug-in has implemented this(sort of... it simply rollback everything and restart from the beginning).

The current solution puts the job in WAITING state again so no changes in the job agents or internal job queue are needed. The job agent information is kept so it will restart on the same agent as before.

Location:
trunk/src
Files:
2 added
39 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/clients/web/net/sf/basedb/clients/web/plugins/SimpleExport.java

    r5404 r5405  
    365365      query.setParameter("_excludes_", Arrays.asList(Values.getInt(exclude.split(","))), Type.INT);
    366366    }
    367     checkInterrupted();
     367    ThreadSignalHandler.checkInterrupted();
    368368    exportProperties(dc, out, format, exportedProperties, itemType, query, progress);
    369369  }
     
    575575      do
    576576      {
    577         checkInterrupted();
     577        ThreadSignalHandler.checkInterrupted();
    578578        result = queryWrapper.iterate();
    579579        while (result.hasNext())
    580580        {
    581           checkInterrupted();
     581          ThreadSignalHandler.checkInterrupted();
    582582          numExported++;
    583583          Object item = result.next();
  • trunk/src/core/net/sf/basedb/core/Install.java

    r5404 r5405  
    115115    method.
    116116  */
    117   public static final int NEW_SCHEMA_VERSION = Integer.valueOf(82).intValue();
     117  public static final int NEW_SCHEMA_VERSION = Integer.valueOf(83).intValue();
    118118 
    119119  public static synchronized void createTables(boolean update, final ProgressReporter progress)
  • trunk/src/core/net/sf/basedb/core/Job.java

    r5329 r5405  
    894894  {
    895895    checkPermission(Permission.WRITE);
    896     if (getStatus() != Status.WAITING && getStatus() != Status.UNCONFIGURED)
     896    Job.Status status = getStatus();
     897    if (status != Status.WAITING && status != Status.UNCONFIGURED)
    897898    {
    898899      throw new PermissionDeniedException("Can't prepare a job with status '"+getStatus()+"': "+toString());
     
    10981099    data.setServer(null);
    10991100    data.setJobAgentId(null);
     1101    data.setSignalTransporter(null);
    11001102    if (useLatestConfiguration)
    11011103    {
     
    11091111  }
    11101112
     1113  /**
     1114    This method is used to indicate that a job had to be aborted due
     1115    to a system shutdown. The job can be restarted one the
     1116    system is up and running again.
     1117    <p>
     1118    This method will reset the job status to WAITING. The execute
     1119    command parameter can be set to allow the plug-in to tell the
     1120    difference between a first-time execution and resuming after
     1121    a system shutdown.
     1122   
     1123    @param executeCommand The command to send the next time the
     1124      job is started, if not given {@link Request#COMMAND_EXECUTE}
     1125      is used
     1126    @since 2.16
     1127  */
     1128  void continueLater(String executeCommand)
     1129  {
     1130    JobData data = getData();
     1131    data.setStatus(Job.Status.WAITING.getValue());
     1132    data.setPercentComplete(0);
     1133    data.setStarted(null);
     1134    data.setEnded(null);
     1135    data.setExecuteCommand(executeCommand);
     1136    data.setStatusMessage(null);
     1137    data.setStackTrace(null);
     1138    data.setSignalTransporter(null);
     1139  }
    11111140 
    11121141  /**
     
    12161245    ProgressReporterImpl jobProgress =
    12171246      new ProgressReporterImpl(this, server, agent, progress);
    1218    
     1247    String command = getData().getExecuteCommand();
     1248    if (command == null) command = Request.COMMAND_EXECUTE;
    12191249    PluginExecutionRequest request =
    12201250      new PluginExecutionRequest(
    1221         sc, plugin, Request.COMMAND_EXECUTE,
     1251        sc, plugin, command,
    12221252        config, config == null ? null : config.getParameterValuesImpl(getParameterVersion()),
    12231253        this, parameters, pd
     
    16971727        Job job = jobId == 0 ? this.job : Job.getById(dc, jobId);
    16981728        job.getData().setDryRun(response.isDryRun());
    1699 
    1700         if (response.getStatus() == Response.Status.ERROR)
     1729        Response.Status status = response.getStatus();
     1730        if (status == Response.Status.ERROR)
    17011731        {
    17021732          job.doneError(response.getMessage(), response.getErrorList());
     1733        }
     1734        else if (status == Response.Status.CONTINUE)
     1735        {
     1736          // The job will be continue again when the system restarts
     1737          job.continueLater(response.getNextCommand());
    17031738        }
    17041739        else
  • trunk/src/core/net/sf/basedb/core/PluginResponse.java

    r5384 r5405  
    120120 
    121121  /**
     122    Get the next command sent via {@link net.sf.basedb.core.plugin.Response.Status#CONTINUE}.
     123    @return The next command or null
     124    @since 2.16
     125  */
     126  public String getNextCommand()
     127  {
     128    return nextCommand;
     129  }
     130 
     131  /**
    122132    Get a <code>PluginRequest</code> object that handles the next request,
    123133    if status is {@link net.sf.basedb.core.plugin.Response.Status#CONTINUE}.
     
    228238      throws PermissionDeniedException
    229239    {
    230       if (Request.COMMAND_EXECUTE.equals(request.getCommand()))
    231       {
    232         throw new PermissionDeniedException("Not allowed with command '"+nextCommand+"' after execute");
    233       }
    234240      reset(Response.Status.CONTINUE);
    235241      PluginResponse.this.nextCommand = nextCommand;
  • trunk/src/core/net/sf/basedb/core/Update.java

    r5390 r5405  
    901901    </td>
    902902  </tr>
     903  <tr>
     904    <td>83</td>
     905    <td>
     906      Added {@link JobData#getExecuteCommand()}. No special update is needed.
     907      Only increase the schema version.
     908    </td>
     909  </tr>
    903910  </table>
    904911
     
    12251232      }
    12261233     
    1227       // Schemaversion 81-82 only updates the version number
    1228       if (schemaVersion < 82)
    1229       {
    1230         if (progress != null) progress.display((int)(81*progress_factor), "--Updating schema version: " + schemaVersion + " -> 82...");
    1231         schemaVersion = setSchemaVersionInTransaction(session, 82);
     1234      // Schemaversion 81-83 only updates the version number
     1235      if (schemaVersion < 83)
     1236      {
     1237        if (progress != null) progress.display((int)(82*progress_factor), "--Updating schema version: " + schemaVersion + " -> 83...");
     1238        schemaVersion = setSchemaVersionInTransaction(session, 83);
    12321239      }
    12331240   
  • trunk/src/core/net/sf/basedb/core/data/JobData.java

    r4889 r5405  
    144144  }
    145145 
    146  
    147146  /**
    148147    The maximum allowed length of the plugin version.
     
    240239  {
    241240    this.dryRun = dryRun;
     241  }
     242 
     243  /**
     244    The maximum allowed length of the execute command.
     245    @since 2.16
     246  */
     247  public static final int MAX_COMMAND_LENGTH = 255;
     248  private String executeCommand;
     249  /**
     250    The command to send to the plug-in when starting to
     251    execute it. If null, a default values is used. See
     252    Request.COMMAND_EXECUTE. This is usually only used when
     253    automatically restarting a failed job.
     254    @hibernate.property column="`exec_command`" type="string" length="255" not-null="false"
     255    @since 2.16
     256  */
     257  public String getExecuteCommand()
     258  {
     259    return executeCommand;
     260  }
     261  public void setExecuteCommand(String command)
     262  {
     263    this.executeCommand = command;
    242264  }
    243265 
  • trunk/src/core/net/sf/basedb/core/plugin/AbstractPlugin.java

    r5241 r5405  
    3333import net.sf.basedb.core.SessionControl;
    3434import net.sf.basedb.core.PluginParameter;
    35 import net.sf.basedb.core.signal.SignalException;
     35import net.sf.basedb.core.signal.EnhancedThreadSignalHandler;
    3636import net.sf.basedb.core.signal.ThreadSignalHandler;
    3737
     
    357357    Check if the current thread has been interrupted and throw
    358358    a SignalException if it has. Subclasses that use the {@link ThreadSignalHandler}
    359     to implement signal handling should regularly call this method.
     359    or {@link EnhancedThreadSignalHandler} to implement signal handling should
     360    regularly call this method.
    360361    @since 2.6
    361   */
     362    @deprecated In 2.16, use {@link ThreadSignalHandler#checkInterrupted()} instead
     363  */
     364  @Deprecated
    362365  protected void checkInterrupted()
    363366  {
    364     if (Thread.interrupted())
    365     {
    366       throw new SignalException("Aborted by user");
    367     }
     367    ThreadSignalHandler.checkInterrupted();
    368368  }
    369369 
  • trunk/src/core/net/sf/basedb/core/plugin/Response.java

    r4889 r5405  
    2626import net.sf.basedb.core.Job;
    2727import net.sf.basedb.core.ProgressReporter;
     28import net.sf.basedb.core.signal.Signal;
    2829
    2930import java.util.List;
     
    3940{
    4041  /**
    41     This method will continue the configuration of the job with
    42     a new command to the plugin. This response is only allowed
    43     while configuring a plugin or a job, not after executing the plugin.
    44    
    45     @param nextCommand the next command to the plugin
    46     @throws PermissionDeniedException If another command isn't allowed,
    47       ie. after an {@link Request#COMMAND_EXECUTE} command
     42    This method will continue the configuration or execution of the
     43    job with a new command to the plugin. If the plug-in is currently
     44    being configured the configuration will continue with the next
     45    command. If the plug-in is currently executing the job will
     46    be returned to job queue. The intention is that plug-ins can use
     47    this to survive a temporary shutdown of the system after
     48    catching the {@link Signal#SHUTDOWN} signal.
     49     
     50    @param nextCommand The next command to the plugin
     51    @throws PermissionDeniedException If another command isn't allowed
    4852  */
    4953  public void setContinue(String nextCommand)
  • trunk/src/core/net/sf/basedb/core/signal/EnhancedThreadSignalHandler.java

    r5399 r5405  
    3131  An extension to the thread signal handler that supports any number of
    3232  signals. When a signal is recieved it will call {@link Thread#interrupt()}
    33   on the worker thread. When the worker thread becomes aware of the notification
    34   it should call {@link #getReceivedSignals()} or {@link #hasReceived(Signal)}
     33  on the worker thread. This signal handler is also registered as an interrupt
     34  handler with {@link ThreadSignalHandler#setInterruptHandler(InterruptHandler)},
     35  and will throw a {@link SignalReceivedException} when interrupted.
     36  <p>
     37  When the worker thread becomes aware of the notification  it should call
     38  {@link #getReceivedSignals()} or {@link #hasReceived(Signal)}
    3539  to find out which signal that was sent and then take the proper action.
     40  <p>
     41 
     42  There are usually three different ways to get a notification about
     43  the signal:
     44  <ul>
     45  <li>An {@link InterruptedException} is thrown. This usually happens when the
     46    code involves blocking IO operations or other blocking synchronization
     47    functions.
     48  <li>A {@link SignalReceivedException} is thrown. This is usually
     49    thrown from the BASE core when it detects that the thread has been
     50    interrupted.
     51  <li>Check the Thread.interrupted() flag. But we recommend that
     52    {@link ThreadSignalHandler#checkInterrupted()} is used instead.
     53  </ul>
    3654
    3755 
    3856  <pre class="code">
    3957// ... code in worker thread
    40 threadSignalHandler.setWorkerThread(null);
     58threadSignalHandler.setWorkerThread();
    4159beginTransaction();
    4260boolean done = false;
     
    4765   {
    4866      done = doSomeWork(); // NOTE! This must not take forever!
    49       interrupted = Thread.interrupted();
     67      ThreadSignalHandler.checkInterrupted();
     68   }
     69   catch (SignalReceivedException ex)
     70   {
     71    // This exception can be thrown from BASE core when a thread is interrupted
     72    interrupted = true;
    5073   }
    5174   catch (InterruptedException ex)
     
    7699public class EnhancedThreadSignalHandler
    77100  extends AbstractSignalHandler
     101  implements InterruptHandler
    78102{
    79103
     
    102126    super(supported);
    103127    this.received = Collections.synchronizedList(new ArrayList<Signal>());
     128    setWorkerThread();
    104129  }
    105130
     
    113138  {
    114139    this(Arrays.asList(supported));
    115   }
    116  
    117   /**
    118     Create a new thread signal handler.
    119     @param workerThread The worker thread to interrupt when it receieves the
    120       {@link Signal#ABORT} signal, or null to interrupt the current thread
    121   */
    122   public EnhancedThreadSignalHandler(Thread workerThread, Collection<Signal> supported)
    123   {
    124     this(supported);
    125     setWorkerThread(workerThread);
    126140  }
    127141 
     
    160174  // -------------------------------------------
    161175
    162   /**
    163     Set the worker thread that should be interrupted when a signal is
    164     receiver.
    165     @param workerThread The worker thread, or null to use the current thread
    166   */
    167   public void setWorkerThread(Thread workerThread)
    168   {
    169     this.workerThread = workerThread == null ? Thread.currentThread() : workerThread;
     176  /*
     177    From the InterruptHandler interface
     178    -----------------------------------
     179  */
     180  /**
     181    Throws a {@link SignalReceivedException} if one ore more
     182    signals has been received.
     183  */
     184  @Override
     185  public void handleInterrupt()
     186  {
     187    if (received.size() > 0)
     188    {
     189      throw new SignalReceivedException(received);
     190    }
     191
     192  }
     193  // -----------------------------------
     194
     195  /**
     196    Set the worker thread to the current thread. This will also register
     197    this object as an interrupt handler with {@link
     198    ThreadSignalHandler#setInterruptHandler(InterruptHandler)}.
     199  */
     200  public void setWorkerThread()
     201  {
     202    this.workerThread = Thread.currentThread();
     203    ThreadSignalHandler.setInterruptHandler(this);
    170204  }
    171205 
     
    218252    return received.contains(signal);
    219253  }
    220  
    221 }
     254
     255}
  • trunk/src/core/net/sf/basedb/core/signal/ThreadSignalHandler.java

    r4516 r5405  
    7878
    7979  /**
    80     Utility method to check if the current thread has been interrupted and throws
    81     a SignalException if it has.
     80    Keep per-thread interupt handlers. The default handler is
     81    {@link ExceptionInterruptHandler} which simply throws a
     82    {@link SignalException} if the current thread has been
     83    interrupted.
     84    @since 2.16
     85  */
     86  private static ThreadLocal<InterruptHandler> interruptHandler =
     87    new ThreadLocal<InterruptHandler>()
     88    {
     89      @Override
     90      protected InterruptHandler initialValue()
     91      {
     92        return new ExceptionInterruptHandler("Aborted by user");
     93      }
     94    };
     95 
     96  /**
     97    Utility method to check if the current thread has been interrupted
     98    and (may) throw a SignalException if it has.
     99    NOTE! Since BASE 2.16 the actual behaviour of this method is
     100    determined by the {@link InterruptHandler} that has been registered
     101    for the current thread by {@link #setInterruptHandler(InterruptHandler)}.
     102    The default handler always throws a {@link SignalException}.
    82103  */
    83104  public static void checkInterrupted()
    84105  {
    85     if (Thread.interrupted()) throw new SignalException("Aborted by user");
    86   }
    87  
     106    if (Thread.interrupted()) interruptHandler.get().handleInterrupt();
     107  }
     108 
     109  /**
     110    Register an interrupt handler with the current thread. The handler
     111    is used by the {@link #checkInterrupted()} method when the thread
     112    has been interrupted. If no handler has been registered with a
     113    thread the default action is to throw a {@link SignalException}.
     114
     115    @param handler An interrupt handler or null to remove a previously
     116      registered handler
     117    @since 2.16
     118  */
     119  public static void setInterruptHandler(InterruptHandler handler)
     120  {
     121    if (handler != null)
     122    {
     123      interruptHandler.set(handler);
     124    }
     125    else
     126    {
     127      interruptHandler.remove();
     128    }
     129  }
    88130 
    89131  /**
  • trunk/src/core/net/sf/basedb/util/basefile/BaseFileParser.java

    r5374 r5405  
    3535import net.sf.basedb.core.ProgressReporter;
    3636import net.sf.basedb.core.signal.SignalException;
     37import net.sf.basedb.core.signal.ThreadSignalHandler;
    3738import net.sf.basedb.util.Values;
    3839import net.sf.basedb.util.parser.FlatFileParser;
     
    199200    Checks if the currently executing thread has been interrupted and
    200201    throws a {@link SignalException} if it has.
    201   */
     202    @deprecated In 2.16, use {@link ThreadSignalHandler#checkInterrupted()} instead
     203  */
     204  @Deprecated
    202205  public void checkInterrupted()
    203206  {
    204     if (Thread.interrupted()) throw new SignalException("Aborted by user");
     207    ThreadSignalHandler.checkInterrupted();
    205208  }
    206209 
  • trunk/src/core/net/sf/basedb/util/export/spotdata/AbstractBioAssaySetExporter.java

    r5319 r5405  
    5252import net.sf.basedb.core.query.SqlResult;
    5353import net.sf.basedb.core.signal.SignalException;
     54import net.sf.basedb.core.signal.ThreadSignalHandler;
    5455import net.sf.basedb.core.snapshot.SnapshotManager;
    5556import net.sf.basedb.util.formatter.Formatter;
     
    682683    Checks if the currently executing thread has been interrupted and
    683684    throws a {@link SignalException} if it has.
    684   */
     685    @deprecated In 2.16, use {@link ThreadSignalHandler#checkInterrupted()} instead
     686  */
     687  @Deprecated
    685688  protected void checkInterrupted()
    686689  {
    687     if (Thread.interrupted()) throw new SignalException("Aborted by user");
     690    ThreadSignalHandler.checkInterrupted();
    688691  }
    689692 
  • trunk/src/core/net/sf/basedb/util/export/spotdata/BaseFileExporter.java

    r5384 r5405  
    4747import net.sf.basedb.core.query.Selects;
    4848import net.sf.basedb.core.query.SqlResult;
     49import net.sf.basedb.core.signal.ThreadSignalHandler;
    4950import net.sf.basedb.util.Values;
    5051import net.sf.basedb.util.basefile.BaseFileWriter;
     
    463464      while (it.hasNext())
    464465      {
    465         checkInterrupted();
     466        ThreadSignalHandler.checkInterrupted();
    466467        SqlResult result = it.next();
    467468       
  • trunk/src/core/net/sf/basedb/util/export/spotdata/BfsExporter.java

    r5384 r5405  
    4343import net.sf.basedb.core.Type;
    4444import net.sf.basedb.core.query.SqlResult;
     45import net.sf.basedb.core.signal.ThreadSignalHandler;
    4546import net.sf.basedb.util.ChainedProgressReporter;
    4647import net.sf.basedb.util.Values;
     
    567568      while (it.hasNext())
    568569      {
    569         checkInterrupted();
     570        ThreadSignalHandler.checkInterrupted();
    570571        SqlResult result = it.next();
    571572       
     
    643644    for (BioAssay ba : assays)
    644645    {
    645       checkInterrupted();
     646      ThreadSignalHandler.checkInterrupted();
    646647      index = 0;
    647648      for (int i = 1; i <= numFields; ++i)
     
    709710      while (it.hasNext())
    710711      {
    711         checkInterrupted();
     712        ThreadSignalHandler.checkInterrupted();
    712713        SqlResult result = it.next();
    713714       
  • trunk/src/core/net/sf/basedb/util/export/spotdata/SerialBaseFileExporter.java

    r5373 r5405  
    2929import net.sf.basedb.core.DynamicSpotQuery;
    3030import net.sf.basedb.core.Type;
     31import net.sf.basedb.core.signal.ThreadSignalHandler;
    3132import net.sf.basedb.util.basefile.BaseFileWriter;
    3233
     
    102103    long count = countQuery.count(dc);
    103104    spotCount += count;
    104     checkInterrupted();
     105    ThreadSignalHandler.checkInterrupted();
    105106    exportSpotSectionHeaders(Arrays.asList(current), count);
    106107  }
  • trunk/src/core/net/sf/basedb/util/importer/spotdata/BaseFileImporter.java

    r5384 r5405  
    5252import net.sf.basedb.core.query.SqlResult;
    5353import net.sf.basedb.core.signal.SignalException;
     54import net.sf.basedb.core.signal.ThreadSignalHandler;
    5455import net.sf.basedb.util.ChainedProgressReporter;
    5556import net.sf.basedb.util.basefile.BaseFileParser;
     
    262263    Checks if the currently executing thread has been interrupted and
    263264    throws a {@link SignalException} if it has.
    264   */
     265    @deprecated In 2.16, use {@link ThreadSignalHandler#checkInterrupted()} instead
     266  */
     267  @Deprecated
    265268  protected void checkInterrupted()
    266269  {
    267     if (Thread.interrupted()) throw new SignalException("Aborted by user");
     270    ThreadSignalHandler.checkInterrupted();
    268271  }
    269272
  • trunk/src/core/net/sf/basedb/util/importer/spotdata/BfsImporter.java

    r5329 r5405  
    4141import net.sf.basedb.core.Transformation;
    4242import net.sf.basedb.core.signal.SignalException;
     43import net.sf.basedb.core.signal.ThreadSignalHandler;
    4344import net.sf.basedb.util.ChainedProgressReporter;
    4445import net.sf.basedb.util.Values;
     
    240241    Checks if the currently executing thread has been interrupted and
    241242    throws a {@link SignalException} if it has.
    242   */
     243    @deprecated In 2.16, use {@link ThreadSignalHandler#checkInterrupted()} instead
     244  */
     245  @Deprecated
    243246  protected void checkInterrupted()
    244247  {
    245     if (Thread.interrupted()) throw new SignalException("Aborted by user");
     248    ThreadSignalHandler.checkInterrupted();
    246249  }
    247250 
  • trunk/src/core/net/sf/basedb/util/importer/spotdata/FirstPassSectionSpotsParser.java

    r5096 r5405  
    3131import net.sf.basedb.core.BioAssaySet;
    3232import net.sf.basedb.core.DbControl;
     33import net.sf.basedb.core.signal.ThreadSignalHandler;
    3334import net.sf.basedb.util.Values;
    3435import net.sf.basedb.util.basefile.BaseFileParser;
     
    222223    while ((data = ffp.nextData()) != null)
    223224    {
    224       parser.checkInterrupted();
     225      ThreadSignalHandler.checkInterrupted();
    225226     
    226227      long parsedBytes = ffp.getParsedBytes();
  • trunk/src/core/net/sf/basedb/util/importer/spotdata/SecondPassSectionSpotsParser.java

    r5384 r5405  
    3232import net.sf.basedb.core.SpotBatcher;
    3333import net.sf.basedb.core.SpotExtraValueBatcher;
     34import net.sf.basedb.core.signal.ThreadSignalHandler;
    3435import net.sf.basedb.util.Values;
    3536import net.sf.basedb.util.basefile.BaseFileParser;
     
    139140    while ((data = ffp.nextData()) != null)
    140141    {
    141       parser.checkInterrupted();
     142      ThreadSignalHandler.checkInterrupted();
    142143      int parsedLines = ffp.getParsedLines();
    143144      if (parsedLines >= nextProgressReport)
  • trunk/src/core/net/sf/basedb/util/importer/spotdata/SectionReporterListParser.java

    r5096 r5405  
    3232import net.sf.basedb.core.Transformation;
    3333import net.sf.basedb.core.data.ReporterData;
     34import net.sf.basedb.core.signal.ThreadSignalHandler;
    3435import net.sf.basedb.util.Values;
    3536import net.sf.basedb.util.basefile.BaseFileParser;
     
    105106    while ((data = ffp.nextData()) != null)
    106107    {
    107       parser.checkInterrupted();
     108      ThreadSignalHandler.checkInterrupted();
    108109      long parsedBytes = ffp.getParsedBytes();
    109110      if (parsedBytes >= nextProgressReport)
  • trunk/src/plugins/core/net/sf/basedb/plugins/AbstractFlatFileImporter.java

    r5384 r5405  
    642642          }
    643643        }
    644         checkInterrupted();
     644        ThreadSignalHandler.checkInterrupted();
    645645        FlatFileParser.LineType result = ffp.parseHeaders();
    646646        FlatFileParser.Line line = null;
     
    671671            }
    672672          }
    673           checkInterrupted();
     673          ThreadSignalHandler.checkInterrupted();
    674674        }
    675675        catch (Throwable t)
     
    724724              }
    725725              // In case the server is shutting down... throw exception, rollback and quit
    726               checkInterrupted();
     726              ThreadSignalHandler.checkInterrupted();
    727727            }
    728728          }
  • trunk/src/plugins/core/net/sf/basedb/plugins/AnnotationFlatFileImporter.java

    r5384 r5405  
    8080import net.sf.basedb.core.query.Orders;
    8181import net.sf.basedb.core.query.Restrictions;
     82import net.sf.basedb.core.signal.ThreadSignalHandler;
    8283import net.sf.basedb.plugins.util.Parameters;
    8384import net.sf.basedb.util.error.ClassMapErrorHandler;
     
    911912            numReplaced += na.getNumReplaced();
    912913            numRemoved += na.getNumRemoved();
    913             checkInterrupted();
     914            ThreadSignalHandler.checkInterrupted();
    914915          }
    915916        }
  • trunk/src/plugins/core/net/sf/basedb/plugins/Base1PluginExecuter.java

    r5384 r5405  
    664664          if (chainedProgress != null) chainedProgress.setRange(0, 20);
    665665          stdin = exportData(dc, workingDirectory, chainedProgress);
    666           checkInterrupted();
     666          ThreadSignalHandler.checkInterrupted();
    667667          dc.commit();
    668668        }
     
    683683        try
    684684        {
    685           checkInterrupted();
     685          ThreadSignalHandler.checkInterrupted();
    686686          tmpDir = getExecDirectory();
    687687          copy(stdin, tmpDir);
     
    763763        {
    764764          dc = sc.newDbControl();
    765           checkInterrupted();
     765          ThreadSignalHandler.checkInterrupted();
    766766          progress.display(60, "Importing data from plugin.");
    767767          BioAssaySet parentBas = getSourceBioAssaySet(dc);
     
    14751475  private void associateFiles(DbControl dc, Transformation trans, Directory homeDirectory)
    14761476  {
    1477     checkInterrupted();
     1477    ThreadSignalHandler.checkInterrupted();
    14781478    String root = homeDirectory.getPath().toString();
    14791479    AnyToAny ata = AnyToAny.getNew(dc, trans, homeDirectory, "./", false);
     
    15061506          try
    15071507          {
    1508             checkInterrupted();
     1508            ThreadSignalHandler.checkInterrupted();
    15091509            newFile.upload(new FileInputStream(f), true);
    15101510          }
  • trunk/src/plugins/core/net/sf/basedb/plugins/CdfFileReporterImporter.java

    r5384 r5405  
    190190      File cdfFile = (File)job.getValue("file");
    191191      cdfFile = File.getById(dc, cdfFile.getId());
    192       checkInterrupted();
     192      ThreadSignalHandler.checkInterrupted();
    193193      FusionCDFData cdf = new CdfFileHandler().loadCdfFile(cdfFile);
    194194 
     
    480480  private int importFromCdf(DbControl dc, FusionCDFData cdf, ProgressReporter progress)
    481481  {
    482     checkInterrupted();
     482    ThreadSignalHandler.checkInterrupted();
    483483
    484484    ReporterBatcher batcher = ReporterBatcher.getNew(dc);
  • trunk/src/plugins/core/net/sf/basedb/plugins/FormulaFilter.java

    r5319 r5405  
    171171      Restriction filter = BioAssaySetUtil.createJepRestriction(dc, expression, source.getSpotData());
    172172     
    173       checkInterrupted();
     173      ThreadSignalHandler.checkInterrupted();
    174174
    175175      if (includeLimit != null || excludeLimit != null)
  • trunk/src/plugins/core/net/sf/basedb/plugins/GalExporter.java

    r4888 r5405  
    341341      signalHandler.setWorkerThread(null);
    342342    }
    343     checkInterrupted();
     343    ThreadSignalHandler.checkInterrupted();
    344344    if (progress != null) progress.display(0, "Loading blocks...");
    345345
     
    374374    // Start data output
    375375    writeDataHeader(writer);
    376     checkInterrupted();
     376    ThreadSignalHandler.checkInterrupted();
    377377    if (progress != null) progress.display(5, "Loading features...");
    378378   
     
    401401      if (done % progressInterval == 0)
    402402      {
    403         checkInterrupted();
     403        ThreadSignalHandler.checkInterrupted();
    404404        if (progress != null)
    405405        {
  • trunk/src/plugins/core/net/sf/basedb/plugins/HelpExporter.java

    r4889 r5405  
    326326
    327327    //Get helpitems from database
    328     checkInterrupted();
     328    ThreadSignalHandler.checkInterrupted();
    329329    long numHelpItems = helpTextQuery.count(dc);
    330330    float percentPerItem = 100.0f / numHelpItems;
    331331
    332     checkInterrupted();
     332    ThreadSignalHandler.checkInterrupted();
    333333    result = helpTextQuery.iterate(dc);
    334334    while (result.hasNext())
    335335    {
    336       checkInterrupted();
     336      ThreadSignalHandler.checkInterrupted();
    337337      //Create a new helpitem element         
    338338      Help helpItem = result.next();
  • trunk/src/plugins/core/net/sf/basedb/plugins/HelpImporter.java

    r5384 r5405  
    356356        for (int i = 0; i < helpElements.size(); i++)
    357357        {
    358           checkInterrupted();
     358          ThreadSignalHandler.checkInterrupted();
    359359          Element helpElement = helpElements.get(i);
    360360          String name = helpElement.getChildText("name");
  • trunk/src/plugins/core/net/sf/basedb/plugins/IntensityCalculatorPlugin.java

    r5060 r5405  
    245245      Job thisJob = Job.getById(dc, jobId);
    246246
    247       checkInterrupted();
     247      ThreadSignalHandler.checkInterrupted();
    248248      BioAssaySet rootBas = null;
    249249      if (IntensityCalculatorUtil.checkSameArrayDesign(sources, false))
  • trunk/src/plugins/core/net/sf/basedb/plugins/JepExtraValueCalculator.java

    r5319 r5405  
    178178
    179179      if (progress != null) progress.display(10, "Calculating extra value ("+source.getNumSpots()+" total)...");
    180       checkInterrupted();
     180      ThreadSignalHandler.checkInterrupted();
    181181      extraBatcher.insert(query);
    182182
  • trunk/src/plugins/core/net/sf/basedb/plugins/JepIntensityTransformer.java

    r5319 r5405  
    202202      // Create the batchers we need
    203203      if (progress != null) progress.display(10, "Transforming intensities ("+source.getNumSpots()+" total)...");
    204       checkInterrupted();
     204      ThreadSignalHandler.checkInterrupted();
    205205      SpotBatcher spotBatcher = child.getSpotBatcher();
    206206      spotBatcher.insert(query);
  • trunk/src/plugins/core/net/sf/basedb/plugins/LowessNormalization.java

    r5384 r5405  
    6666import net.sf.basedb.core.query.SqlResult;
    6767import net.sf.basedb.core.query.WhenStatement;
     68import net.sf.basedb.core.signal.EnhancedThreadSignalHandler;
     69import net.sf.basedb.core.signal.Signal;
    6870import net.sf.basedb.core.signal.SignalException;
    6971import net.sf.basedb.core.signal.SignalHandler;
    70 import net.sf.basedb.core.signal.SignalReceivedException;
    7172import net.sf.basedb.core.signal.SignalTarget;
    7273import net.sf.basedb.core.signal.ThreadSignalHandler;
     
    180181  private RequestInformation configureJob;
    181182
    182   private ThreadSignalHandler signalHandler;
     183  private EnhancedThreadSignalHandler signalHandler;
    183184 
    184185  /*
     
    218219    if (command.equals(Request.COMMAND_EXECUTE))
    219220    {
    220       if (signalHandler != null)
    221       {
    222         signalHandler.setWorkerThread(null);
    223       }
     221      if (signalHandler != null) signalHandler.setWorkerThread();
    224222      DbControl dc = null;
    225223      try
     
    245243        response.setDone(normalizedSpots + " spots normalized, " + (source.getNumSpots() - normalizedSpots) + " spots removed");
    246244      }
     245      catch (SignalException ex)
     246      {
     247        if (signalHandler != null && signalHandler.hasReceived(Signal.SHUTDOWN))
     248        {
     249          response.setContinue(command);
     250        }
     251        else
     252        {
     253          response.setError("Aborted by user", Arrays.asList(ex));
     254        }
     255      }
    247256      catch (Throwable ex)
    248257      {
     
    330339      }
    331340    }
    332     catch (SignalReceivedException ex)
    333     {
    334       response.setError("Aborted by user", Arrays.asList(ex));
    335     }
    336341    catch (Throwable ex)
    337342    {
     
    347352  public SignalHandler getSignalHandler()
    348353  {
    349     signalHandler = new ThreadSignalHandler();
     354    if (signalHandler == null)
     355    {
     356      signalHandler = new EnhancedThreadSignalHandler(Signal.ABORT, Signal.SHUTDOWN);
     357    }
    350358    return signalHandler;
    351359  }
     
    467475      while (numDone < numAssays)
    468476      {
    469         checkInterrupted();
     477        ThreadSignalHandler.checkInterrupted();
    470478
    471479        // Get data for the next bioassay unless the working queue
     
    522530    catch (InterruptedException ex)
    523531    {
    524       throw new SignalException("Aborted by user");
     532      if (signalHandler != null)
     533      {
     534        signalHandler.checkForSignals();
     535      }
     536      throw new SignalException("Aborted by user", ex);
    525537    }
    526538    finally
     
    552564    while (it.hasNext())
    553565    {
    554       checkInterrupted();
     566      ThreadSignalHandler.checkInterrupted();
    555567      SqlResult r = it.next();
    556568      SpotData spot = new SpotData(bioassayColumn, r.getInt(positionIndex),
  • trunk/src/plugins/core/net/sf/basedb/plugins/MedianRatioNormalization.java

    r5103 r5405  
    459459      for (BioAssay assay : assays)
    460460      {
    461         checkInterrupted();
     461        ThreadSignalHandler.checkInterrupted();
    462462
    463463        // query spot data for this bioassay
     
    567567    DynamicSpotQuery query = source.getSpotData();
    568568    query.restrictPermanent(intensityRestriction);
    569     checkInterrupted();
     569    ThreadSignalHandler.checkInterrupted();
    570570    long numSpots = query.count(dc);
    571571    query.reset();
     
    588588      for (BioAssay assay : assays)
    589589      {
    590         checkInterrupted();
     590        ThreadSignalHandler.checkInterrupted();
    591591 
    592592        // Prepare list for holding data
     
    607607        while (it.hasNext())
    608608        {
    609           checkInterrupted();
     609          ThreadSignalHandler.checkInterrupted();
    610610          SqlResult r = it.next();
    611611          SpotData spot = new SpotData(r.getInt(positionIndex),
     
    632632        while (fromBlock <= maxBlock)
    633633        {
    634           checkInterrupted();
     634          ThreadSignalHandler.checkInterrupted();
    635635          // Find start and end index for current block range
    636636          int toBlock = fromBlock + blockGroupSize - 1;
     
    664664              for (int j = fromIndex; j < toIndex; ++j)
    665665              {
    666                 checkInterrupted();
     666                ThreadSignalHandler.checkInterrupted();
    667667                SpotData spot = data.get(j);
    668668                float newCh1 = (float)(spot.ch1 * invSqrtMedRatio);
  • trunk/src/plugins/core/net/sf/basedb/plugins/PackedFileExporter.java

    r5329 r5405  
    391391    for (Nameable item : itemsToPack)
    392392    {
    393       checkInterrupted();
     393      ThreadSignalHandler.checkInterrupted();
    394394      currentFile++;
    395395      String entryPath = null;
     
    582582      for (Integer fileId : files)
    583583      {
    584         checkInterrupted();
     584        ThreadSignalHandler.checkInterrupted();
    585585        try
    586586        {
     
    603603      for (Integer dirId : directories)
    604604      {
    605         checkInterrupted();
     605        ThreadSignalHandler.checkInterrupted();
    606606        try
    607607        {
     
    630630    for (File f : fileQuery.list(dc))
    631631    {
    632       checkInterrupted();
     632      ThreadSignalHandler.checkInterrupted();
    633633      if (f.getLocation() == Location.PRIMARY)
    634634      {
     
    646646    for (Directory dir : dirQuery.list(dc))
    647647    {
    648       checkInterrupted();
     648      ThreadSignalHandler.checkInterrupted();
    649649      loadedFiles.add(dir);
    650650      loadFiles(dc, loadedFiles, dir);
  • trunk/src/plugins/core/net/sf/basedb/plugins/PlateMappingExporter.java

    r4523 r5405  
    295295        for (int column = 0; column < destGeometry.getColumns(); ++column)
    296296        {
    297           checkInterrupted();
     297          ThreadSignalHandler.checkInterrupted();
    298298          MappingCoordinate destination = new MappingCoordinate(plate, row, column);
    299299          MappingCoordinate source = mapping.getSourceCoordinate(destination);
  • trunk/src/plugins/core/net/sf/basedb/plugins/PluginConfigurationExporter.java

    r4889 r5405  
    268268      );
    269269      configQuery.setParameter("_selected_", selectedItems, Type.INT);
    270       checkInterrupted();
     270      ThreadSignalHandler.checkInterrupted();
    271271      ItemResultIterator<PluginConfiguration> configurations = configQuery.iterate(dc);
    272272     
     
    280280      while (configurations.hasNext())
    281281      {
    282         checkInterrupted();
     282        ThreadSignalHandler.checkInterrupted();
    283283        PluginConfiguration pluginConfig = configurations.next();       
    284284       
  • trunk/src/plugins/core/net/sf/basedb/plugins/PluginConfigurationImporter.java

    r5384 r5405  
    377377      for (Object obj : configurations)
    378378      {
    379         checkInterrupted();
     379        ThreadSignalHandler.checkInterrupted();
    380380        Element configuration = (Element)obj;
    381381        String name = configuration.getChildText("configname");
  • trunk/src/plugins/core/net/sf/basedb/plugins/TarFileUnpacker.java

    r4513 r5405  
    4949import net.sf.basedb.core.plugin.About;
    5050import net.sf.basedb.core.plugin.AboutImpl;
     51import net.sf.basedb.core.signal.ThreadSignalHandler;
    5152import net.sf.basedb.util.FileUtil;
    5253import net.sf.basedb.util.InputStreamTracker;
     
    163164    while ((entry = TarUtil.getNextEntry(tarStream)) != null)
    164165    {
    165       checkInterrupted();
     166      ThreadSignalHandler.checkInterrupted();
    166167      String subPath = entry.getName();
    167168      String completePath = rootPath + "/" + subPath;
  • trunk/src/plugins/core/net/sf/basedb/plugins/executor/ExternalProgramExecutor.java

    r5384 r5405  
    285285      // Run the exporter plug-in
    286286      if (chainedProgress != null) chainedProgress.setRange(5, 40);
    287       checkInterrupted();
     287      ThreadSignalHandler.checkInterrupted();
    288288      runExporter(requestWrapper, responseWrapper, chainedProgress);
    289289      if (responseWrapper.hasError())
     
    297297      String cmdLine = (String)configuration.getValue(PARAMETER_EXTERNAL_CMDLINE);
    298298      if (progress != null) progress.display(50, "Executing external program: " + externalProgram);
    299       checkInterrupted();
     299      ThreadSignalHandler.checkInterrupted();
    300300      runExternalProgram(externalProgram, cmdLine, workDir, "stdin.txt", "stdout.txt", progress);
    301301     
     
    303303      if (chainedProgress != null) chainedProgress.setRange(60, 95);
    304304      responseWrapper = new ResponseWrapper();
    305       checkInterrupted();
     305      ThreadSignalHandler.checkInterrupted();
    306306      runImporter(requestWrapper, responseWrapper, chainedProgress);
    307307      if (responseWrapper.hasError())
Note: See TracChangeset for help on using the changeset viewer.