Changeset 4078


Ignore:
Timestamp:
Jan 14, 2008, 12:21:58 PM (16 years ago)
Author:
Nicklas Nordborg
Message:

References #636: Kill a running job

  • Signals are now working with job agents and the progress reporter.
  • Lowess and intensity calculator plug-in now support signals
  • Documentation has been written.

More things to do:

  • Add support in rest of core plug-ins
Location:
trunk
Files:
6 added
28 edited

Legend:

Unmodified
Added
Removed
  • trunk/config/dist/base.config

    r4074 r4078  
    8080# Signal receiver class for sending signals to running jobs (to abort them)
    8181jobqueue.internal.signalreceiver.class = net.sf.basedb.core.signal.LocalSignalReceiver
    82 jobqueue.internal.signalreceiver.init  = localhost:0
     82jobqueue.internal.signalreceiver.init  = jobqueue:0
    8383
    8484# If plugins with useInteralJobQueue = false should be executed or not
  • trunk/doc/src/docbook/appendix/base.config.xml

    r3944 r4078  
    379379
    380380    <varlistentry>
     381      <term><property>jobqueue.internal.signalreceiver.class</property></term>
     382      <listitem>
     383        <para>
     384         A class implementing the <interfacename docapi="net.sf.basedb.core.signal"
     385         >SignalReceiver</interfacename>
     386         interface. The class must have a public no-argument constructor. If
     387         no value is specified the default setting is:
     388         <classname docapi="net.sf.basedb.core.signal"
     389         >net.sf.basedb.core.signal.LocalSignalReceiver</classname>.
     390         </para>
     391         <para>
     392         Change to <classname docapi="net.sf.basedb.core.signal"
     393         >net.sf.basedb.core.signal.SocketSignalReceiver</classname>
     394         if the internal job queue must be able to receive signals from outside
     395         this JVM.
     396         </para>
     397      </listitem>
     398    </varlistentry>
     399   
     400    <varlistentry>
     401      <term><property>jobqueue.internal.signalreceiver.init</property></term>
     402      <listitem>
     403        <para>
     404        Initialisation string sent to <methodname>SignalReceiver.init()</methodname>.
     405        The syntax and meaning of the string depends on the actual implementation
     406        that is used. Please see the javadoc for more information.
     407         </para>
     408      </listitem>
     409    </varlistentry>
     410
     411    <varlistentry>
    381412      <term><property>jobqueue.internal.checkinterval</property></term>
    382413      <listitem>
  • trunk/doc/src/docbook/developerdoc/api_overview.xml

    r4076 r4078  
    28062806     
    28072807    </sect2>
     2808   
     2809    <sect2 id="core_api.signals">
     2810      <title>Sending signals (to plug-ins)</title>
     2811   
     2812      <para>
     2813        BASE has a simple system for sending signals between different parts of
     2814        a system. This signalling system was initially developed to be able to
     2815        kill plug-ins that a user for some reason wanted to abort. The signalling
     2816        system as such is not limited to this and it can be used for other purposes
     2817        as well. Signals can of course be handled internally in a single JVM but
     2818        also sent externally to other JVM:s running on the same or a different
     2819        computer. The transport mechanism for signals is decoupled from the actual
     2820        handling of them. If you want to, you could implement a signal transporter
     2821        that sends signal as emails and the target plug-in would never know.
     2822      </para>
     2823     
     2824      <para>
     2825        The remainder of this section will focus mainly on the sending and
     2826        transportation of signals. For more information about handling signals
     2827        on the receiving end, see <xref linkend="plugin_developer.signals" />.
     2828      </para>
     2829     
     2830      <sect3 id="core_api.signals.diagram">
     2831        <title>Diagram of classes and methods</title>
     2832        <figure id="core_api.figures.signals">
     2833          <title>The signalling system</title>
     2834          <screenshot>
     2835            <mediaobject>
     2836              <imageobject>
     2837                <imagedata
     2838                  align="center"
     2839                  scalefit="1" width="100%"
     2840                  fileref="figures/uml/corelayer.signals.png" format="PNG" />
     2841              </imageobject>
     2842            </mediaobject>
     2843          </screenshot>
     2844        </figure>
     2845     
     2846        <para>
     2847          The signalling system is rather simple. An object that wish
     2848          to receieve signals must implement the
     2849          <interfacename docapi="net.sf.basedb.core.signal"
     2850          >SignalTarget</interfacename>. It's only method
     2851          is <methodname>getSignalHandler()</methodname>. A
     2852          <interfacename docapi="net.sf.basedb.core.signal"
     2853          >SignalHandler</interfacename> is an object that
     2854          knows what to do when a signal is delivered to it. The target object
     2855          may implement the <interfacename>SignalHandler</interfacename> itself
     2856          or use one of the existing handlers.
     2857        </para>
     2858       
     2859        <para>
     2860          The difficult part here is to be aware that a signal is usually
     2861          delivered by a separate thread. The target object must be aware
     2862          of this and know how to handle multiple threads. As an example we
     2863          can use the <classname docapi="net.sf.basedb.core.signal"
     2864          >ThreadSignalHandler</classname> which simply
     2865          calls <code>Thread.interrupt()</code> to deliver a signal. The target
     2866          object that uses this signal handler it must know that it should check
     2867          <code>Thread.interrupted()</code> at regular intervals from the main
     2868          thread. If that method returns true, it means that the <constant>ABORT</constant>
     2869          signal has been delivered and the main thread should clean up and exit as
     2870          soon as possible.
     2871        </para>
     2872       
     2873        <para>
     2874          Even if a signal handler could be given directly to the party
     2875          that may be interested in sending a signal to the target this
     2876          is not recommended. This would only work when sending signals
     2877          within the same virtual machine. The signalling system includes
     2878          <interfacename docapi="net.sf.basedb.core.signal"
     2879          >SignalTransporter</interfacename> and
     2880          <interfacename docapi="net.sf.basedb.core.signal"
     2881          >SignalReceiver</interfacename> objects that are used
     2882          to decouple the sending of signals with the handling of signals. The
     2883          implementation usually comes in pairs, for example
     2884          <classname docapi="net.sf.basedb.core.signal"
     2885          >SocketSignalTransporters</classname> and <classname
     2886          docapi="net.sf.basedb.core.signal">SocketSignalReceiver</classname>.
     2887        </para>
     2888       
     2889        <para>
     2890          Setting up the transport mechanism is usually a system responsibility.
     2891          Only the system know what kind of transport that is appropriate for it's current
     2892          setup. Ie. should signals be delievered by TCP/IP sockets, only internally, or
     2893          should a delivery mechanism based on web services be implemented?
     2894          If a system wants to receive signals it must create an appropriate
     2895          <interfacename>SignalReceiver</interfacename> object. Within BASE the
     2896          internal job queue set up it's own signalling system that can be used to
     2897          send signals (eg. kill) running jobs. The job agents do the same but uses
     2898          a different implementation. See <xref linkend="appendix.base.config.jobqueue" />
     2899          for more information about how to configure the internal job queue's
     2900          signal receiver. In both cases, there is only one signal receiver instance
     2901          active in the system.
     2902        </para>
     2903       
     2904        <para>
     2905          Let's take the internal job queue as an example. Here is how it works:
     2906        </para>
     2907       
     2908        <itemizedlist>
     2909        <listitem>
     2910          <para>
     2911          When the internal job queue is started, it will also create a signal
     2912          receiver instance according to the settings in <filename>base.config</filename>.
     2913          The default is to create <classname docapi="net.sf.basedb.core.signal"
     2914          >LocalSignalReceiver</classname>
     2915          which can only be used inside the same JVM. If needed, this can
     2916          be changed to a <classname docapi="net.sf.basedb.core.signal"
     2917          >SocketSignalReceiver</classname> or any other
     2918          user-provided implementation.
     2919          </para>
     2920        </listitem>
     2921       
     2922        <listitem>
     2923          <para>
     2924          When the job queue has found a plug-in to execute it will check if
     2925          it also implements the <interfacename docapi="net.sf.basedb.core.signal"
     2926          >SignalTarget</interfacename>
     2927          interface. If it does, a signal handler is created and registered
     2928          with the signal receiver. This is actually done by the BASE core
     2929          by calling <methodname>PluginExecutionRequest.registerSignalReceiver()</methodname>
     2930          which also makes sure that the the ID returned from the registration is
     2931          stored in the database together with the job item representing the
     2932          plug-in to execute.
     2933          </para>
     2934        </listitem>
     2935       
     2936        <listitem>
     2937          <para>
     2938          Now, when the web client see's a running job which has a non-empty
     2939          signal transporter property, the <guilabel>Abort</guilabel>
     2940          button is activated. If the user clicks this button the BASE core
     2941          uses the information in the database to create
     2942          <interfacename docapi="net.sf.basedb.core.signal"
     2943          >SignalTransporter</interfacename> object. This
     2944          is simply done by calling <code>Job.getSignalTransporter()</code>.
     2945          The created signal transporter knows how to send a signal
     2946          to the signal receiver it was first registered with. When the
     2947          signal arrives at the receiver it will find the handler for it
     2948          and call <code>SignalHandler.handleSignal()</code>. This will in it's turn
     2949          trigger some action in the signal target which soon will abort what
     2950          it is doing and exit.
     2951          </para>
     2952        </listitem>
     2953        </itemizedlist>
     2954       
     2955       
     2956      </sect3>
     2957   
     2958    </sect2>
     2959   
    28082960  </sect1>
    28092961
  • trunk/doc/src/docbook/developerdoc/plugin_developer.xml

    r3958 r4078  
    43654365  </sect1>
    43664366 
     4367  <sect1 id="plugin_developer.signals">
     4368    <title>Enable support for aborting a running a plug-in</title>
     4369   
     4370    <para>
     4371      BASE includes a simple signalling system that can be used to send
     4372      signals to plug-ins. The system was primarly developed to allow a user
     4373      to kill a plug-in when it is executing. Therfore, the focus of this chapter
     4374      will be how to implement a plug-in to make it possible to kill it
     4375      during it's execution.
     4376    </para>
     4377   
     4378    <para>
     4379      Since we don't want to do this by brute force such as destroying the
     4380      process or stopping thread the plug-in executes in, cooperation is needed
     4381      by the plug-in. First, the plug-in must implement the
     4382      <interfacename docapi="net.sf.basedb.core.signal">SignalTarget</interfacename>
     4383      interface. From this, a <interfacename
     4384      docapi="net.sf.basedb.core.signal">SignalHandler</interfacename> can be created.
     4385      A plug-in may choose to implement it's own signal handler or use an existing
     4386      implementation. BASE, for example, provides the <classname docapi="net.sf.basedb.core.signal"
     4387      >ThreadSignalHandler</classname> implementation that supports the <constant>ABORT</constant> signal.
     4388      This is a simple implementation that just calls <code>Thread.interrupt()</code>
     4389      on the plug-in worker thread. This may cause two different effects:
     4390    </para>
     4391   
     4392    <itemizedlist>
     4393    <listitem>
     4394      <para>
     4395      The <code>Thread.interrupted()</code> flag is set. The plug-in must check this
     4396      at regular intervals and if the flag is set it must cleanup, rollback
     4397      open transactions and exit as soon as possible.
     4398      </para>
     4399    </listitem>
     4400   
     4401    <listitem>
     4402      <para>
     4403      If the plug-in is waiting in a blocking call that is interruptable, for
     4404      example <code>Thread.sleep()</code>, an <classname>InterruptedException</classname>
     4405      is thrown. This should cause the same actions as if the flag was set to happen.
     4406      </para>
     4407     
     4408      <warning>
     4409        <title>Not all blocking calls are interruptable</title>
     4410        <para>
     4411        For example calling <code>InputStream.read()</code> may leave the
     4412        plug-in waiting in a non-interruptable state. In this case there
     4413        is nothing BASE can do to wake it up again.
     4414        </para>
     4415      </warning>
     4416    </listitem>
     4417    </itemizedlist>
     4418   
     4419    <para>
     4420      Here is a general outline for a plug-in that uses the
     4421      <classname>ThreadSignalHandler</classname>.
     4422    </para>
     4423    <programlisting language="java">
     4424<![CDATA[
     4425private ThreadSignalHandler signalHandler;
     4426public SignalHandler getSignalHandler()
     4427{
     4428   signalHandler = new ThreadSignalHandler();
     4429   return signalHandler;
     4430}
     4431
     4432public void run(Request request, Response response, ProgressReporter progress)
     4433{
     4434   if (signalHandler != null) signalHandler.setWorkerThread(null);
     4435   beginTransaction();
     4436   boolean done = false;
     4437   boolean interrupted = false;
     4438   while (!done && !interrupted)
     4439   {
     4440      try
     4441      {
     4442         done = doSomeWork(); // NOTE! This must not take forever!
     4443         interrupted = Thread.interrupted();
     4444      }
     4445      catch (InterruptedException ex)
     4446      {
     4447         // NOTE! Try-catch is only needed if thread calls
     4448         // a blocking method that is interruptable
     4449         interrupted = true;
     4450      }
     4451   }
     4452   if (interrupted)
     4453   {
     4454      rollbackTransaction();
     4455      response.setError("Aborted by user", null);
     4456   }
     4457   else
     4458   {
     4459      commitTransaction();
     4460      response.setDone("Done");
     4461   }
     4462}
     4463]]>
     4464</programlisting>
     4465   
     4466    <para>
     4467      Another signal handler implementation is the
     4468      <classname docapi="net.sf.basedb.core.signal">ProgressReporterSignalHandler</classname>.
     4469      See that javadoc for information about how to use it.
     4470      For more information about the signalling system as a whole,
     4471      see <xref linkend="core_api.signals" />.
     4472    </para>
     4473   
     4474  </sect1>
     4475 
    43674476  <sect1 id="plugin_developer.classload">
    43684477    <title>How BASE load plug-in classes</title>
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java

    r4070 r4078  
    251251  private JobAgentServerConnection server;
    252252  private RequestHandler requestHandler;
     253  private AgentSignalReceiver signalReceiver;
    253254  private JobExecutor jobExecutor;
    254255
     
    275276 
    276277  private final Set<JobInfo> activeJobs;
    277  
     278
     279  /**
     280    Timeout to wait for jobs to act on the ABORT signal when stopping.
     281    Default value is 20 seconds.
     282  */
     283  private int closeTimeout = 20000;
     284
    278285  /**
    279286    The group were all job runners are placed.
     
    531538    if (server == null)
    532539    {
     540      this.signalReceiver = new AgentSignalReceiver(this);
     541      this.signalReceiver.init(null);
    533542      this.requestHandler = requestHandler == null ? new DefaultRequestHandler(this) : requestHandler;
    534543      this.server = new JobAgentServerConnection(port, this.requestHandler, logServer);
     
    685694    }
    686695    return allow;
     696  }
     697 
     698  /**
     699    Get the signal receiver that is processing signal messages
     700    on behalf of this job agent.
     701    @since 2.6
     702  */
     703  public AgentSignalReceiver getSignalReceiver()
     704  {
     705    return signalReceiver;
    687706  }
    688707 
     
    917936 
    918937  /**
    919     Try to stop running jobs by interrupting the threads thaey are executing in.
     938    Try to stop running jobs by interrupting the threads they are executing in.
    920939  */
    921940  private void maybeStopRunningJobs()
    922941  {
    923942    log.info("Stopping running jobs. " + activeJobs.size() + " job(s) still active.");
    924     // Interrupt all threads. Hopefully they will do as we tell them.
    925     runnersGroup.interrupt();
     943   
     944    // Send ABORT to all jobs, that support signals
     945    signalReceiver.close(closeTimeout);
    926946  }
    927947 
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/DummyJobExecutor.java

    r3857 r4078  
    2424package net.sf.basedb.clients.jobagent.executors;
    2525
     26import java.util.Collections;
     27
    2628import net.sf.basedb.clients.jobagent.Agent;
    2729import net.sf.basedb.clients.jobagent.JobExecutor;
     
    3032import net.sf.basedb.core.JobAgentSettings;
    3133import net.sf.basedb.core.SessionControl;
     34import net.sf.basedb.core.signal.SignalReceiver;
     35import net.sf.basedb.core.signal.ThreadSignalHandler;
    3236import net.sf.basedb.util.Values;
    3337
     
    8993    log.info("Executing job: " + job);
    9094    DbControl dc = null;
     95    boolean aborted = false;
     96    Throwable error = null;
    9197    try
    9298    {
     
    96102      if (wait > 0)
    97103      {
     104        SignalReceiver signalReceiver = agent.getSignalReceiver();
     105        job.setSignalTransporter(signalReceiver.getSignalTransporterClass(),
     106          signalReceiver.registerSignalHandler(new ThreadSignalHandler()));
    98107        job.setProgress(50, "Halfway; waiting " + wait + " seconds");
    99108        dc.commit();
     
    105114          }
    106115        }
     116        catch (InterruptedException ex)
     117        {
     118          aborted = true;
     119        }
    107120        catch (Throwable t)
    108121        {
    109122          log.error(t.getMessage(), t);
     123          error = t;
    110124        }
    111125        dc  = sc.newDbControl();
    112126        dc.reattachItem(job);
    113127      }
    114       job.doneOk("Not really, but used for testing job agent");
     128      if (aborted)
     129      {
     130        job.doneError("Aborted by user");
     131      }
     132      else if (error != null)
     133      {
     134        job.doneError(error.getMessage(), Collections.singleton(error));
     135      }
     136      else
     137      {
     138        job.doneOk("Not really, but used for testing job agent");
     139      }
    115140      dc.commit();
    116141      log.info("Done executing: " + job);
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/ProcessJobExecutor.java

    r3675 r4078  
    4242import net.sf.basedb.core.PluginDefinition;
    4343import net.sf.basedb.core.SessionControl;
     44import net.sf.basedb.core.signal.Signal;
     45import net.sf.basedb.core.signal.SignalHandler;
     46import net.sf.basedb.core.signal.SignalReceiver;
     47import net.sf.basedb.core.signal.SignalTransporter;
     48import net.sf.basedb.core.signal.ThreadSignalHandler;
    4449import net.sf.basedb.util.Values;
    4550
     
    101106  private String javaBin;
    102107  private String options;
     108  private SignalReceiver signalReceiver;
     109  private SignalHandler signalHandler;
    103110 
    104111  public ProcessJobExecutor()
     
    120127    if (javaBin == null) javaBin = "java";
    121128    options = Values.getString(agent.getProperty("agent.executor.process.options"), "-server");
     129    signalReceiver = agent.getSignalReceiver();
    122130  }
    123131 
     
    126134  {
    127135    log.info("Executing job: " + job);
    128    
     136
    129137    // Generate command for new process
    130138    List<String> cmd = new ArrayList<String>(10);
     
    164172    cmd.add("-t"); // Thread priority
    165173    cmd.add(Integer.toString(agent.getThreadPriority(job.getEstimatedExecutionTime())));
     174    cmd.add("-x"); // Port the job agent is listening on for remote control
     175    cmd.add(Integer.toString(agent.getPort()));
    166176   
    167177    Process process = null;
     
    201211        new InputStreamReader(process.getInputStream()), result));
    202212      t.start();
    203 
     213     
    204214      try
    205215      {
    206216        log.info("Waiting for process to end");
     217        signalHandler = new ThreadSignalHandler();
     218        signalReceiver.registerSignalHandler(signalHandler);
    207219        int exitCode = process.waitFor();
    208220        if (exitCode != 0)
     
    224236      {
    225237        log.info("Job was interrupted: " + job, ex);
    226         // Kill the process
    227         process.destroy();
     238        // First, send ABORT to the job if it supports it
    228239        dc = sc.newDbControl();
    229240        job = Job.getById(dc, job.getId());
    230         job.doneError(ex.getMessage(), Arrays.asList(ex));
     241        SignalTransporter signalTransporter = job.getSignalTransporter();
     242        if (signalTransporter != null)
     243        {
     244          signalTransporter.send(Signal.ABORT);
     245        }
     246        else
     247        {
     248          job.doneError("Aborted by user", Arrays.asList(ex));
     249          process.destroy();
     250        }
    231251        dc.commit();
     252      }
     253      finally
     254      {
     255        signalReceiver.unregisterSignalHandler(signalHandler);         
    232256      }
    233257    }
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/ThreadJobExecutor.java

    r3857 r4078  
    3939import net.sf.basedb.core.SessionControl;
    4040import net.sf.basedb.core.plugin.Response;
     41import net.sf.basedb.core.signal.SignalReceiver;
     42import net.sf.basedb.core.signal.SocketSignalReceiver;
    4143import net.sf.basedb.util.SocketUtil;
    4244import net.sf.basedb.util.Values;
     
    6567    org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.executors.ThreadJobExecutor");
    6668 
     69  private SignalReceiver signalReceiver;
    6770 
    6871  public ThreadJobExecutor()
     
    7477  */
    7578  public void init(Agent agent)
    76   {}
     79  {
     80    this.signalReceiver = agent == null ? null : agent.getSignalReceiver();
     81  }
    7782 
    7883  public void executeJob(SessionControl sc, Agent agent, Job job, JobAgentSettings settings,
     
    99104        log.error("Error executing job: " + job, t);
    100105      }
     106      exec.registerSignalReceiver(signalReceiver);
    101107      dc.commit();
    102108
     
    130136  // -------------------------------------------
    131137 
     138  private void setSignalReceiver(SignalReceiver signalReceiver)
     139  {
     140    this.signalReceiver = signalReceiver;
     141  }
     142 
    132143  public static void main(String[] args)
    133144  {
     
    138149    String login = cmdLine.getOption("-u");
    139150    String password = cmdLine.getOption("-p");
     151    String proxyPort = cmdLine.getOption("-x");
    140152    int threadPriority = Values.getInt(cmdLine.getOption("-t"), Thread.NORM_PRIORITY);
    141153
     
    152164      ThreadJobExecutor executor = new ThreadJobExecutor();
    153165     
     166      if (proxyPort != null)
     167      {
     168        SocketSignalReceiver signalReceiver = new SocketSignalReceiver();
     169        signalReceiver.init("proxy=" + Application.getHostName() + ":" + proxyPort);
     170        executor.setSignalReceiver(signalReceiver);
     171        Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(signalReceiver)));
     172      }
     173     
    154174      sc = Application.newSessionControl("net.sf.basedb.clients.jobagent",
    155175        SocketUtil.getLocalHost().toString(), null);
     
    188208  }
    189209 
     210  private static class ShutdownHook
     211    implements Runnable
     212  {
     213    private SignalReceiver signalReceiver;
     214   
     215    private ShutdownHook(SignalReceiver signalReceiver)
     216    {
     217      this.signalReceiver = signalReceiver;
     218    }
     219    public void run()
     220    {
     221      signalReceiver.close(5000);
     222    }
     223  }
    190224}
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/DefaultRequestHandler.java

    r3675 r4078  
    6969    registerHandler(new StopRequestHandler(agent), "stop");
    7070    registerHandler(new PauseRequestHandler(agent), "pause");
     71    if (agent.getSignalReceiver() != null)
     72    {
     73      registerHandler(new SignalRequestHandler(agent), "signal");
     74    }
    7175  }
    7276 
     
    8993    String answer = null;
    9094    RequestHandler handler = commandHandlers.get(cmd);
     95    if (handler == null && cmd != null && cmd.startsWith("signal://"))
     96    {
     97      handler = commandHandlers.get("signal");
     98    }
    9199    if (!agent.isAllowedControl(remote, cmd))
    92100    {
  • trunk/src/core/net/sf/basedb/core/Application.java

    r3960 r4078  
    568568    // Stop timer and clean the cache of SessionControl objects
    569569    if (timer != null) timer.cancel();
     570    if (internalJobQueue != null) internalJobQueue.close();
     571    if (secondaryStorageController != null) secondaryStorageController.close();
    570572    timer = null;
    571573    scheduler = null;
    572574    cleanSessionControlCache(true);
    573575    sessionCache = null;
    574     if (internalJobQueue != null) internalJobQueue.close();
    575     if (secondaryStorageController != null) secondaryStorageController.close();
    576576
    577577    Keyring.unload();
  • trunk/src/core/net/sf/basedb/core/InternalJobQueue.java

    r4074 r4078  
    203203  private boolean ignoreUseInternalJobQueueFlag = false;
    204204
     205  /**
     206    Timeout to wait for jobs to act on the ABORT signal when shutting down BASE.
     207    Default value is 20 seconds.
     208  */
     209  private int closeTimeout = 20000;
     210 
    205211  /**
    206212    Use the local signal receiver only.
     
    381387    sc.logout();
    382388
    383     // Interrupt all threads. Hopefully they will do as we tell them.
    384     runnersGroup.interrupt();
     389    // Send ABORT to all jobs that supports signalling
     390    signalReceiver.close(closeTimeout);
    385391  }
    386392 
     
    454460          throw t;
    455461        }
    456         exec.registerSignalReceiver(signalReceiver, false);
     462        exec.registerSignalReceiver(signalReceiver);
    457463        dc.commit();
    458464       
  • trunk/src/core/net/sf/basedb/core/Job.java

    r4074 r4078  
    592592  }
    593593 
     594  /**
     595    Set information needed to create a signal transporter object that can be used
     596    to send signals to the plug-in that is currently executing this job.
     597    @param clazz The signal transporter class
     598    @param initParams Initialisation parameters that will be passed
     599      to {@link SignalTransporter#init(String)}
     600    @throws PermissionDeniedException If the user doesn't have write
     601      permission for the job
     602    @since 2.6
     603  */
    594604  public void setSignalTransporter(Class<? extends SignalTransporter> clazz, String initParams)
    595605  {
     
    600610  }
    601611 
     612  /**
     613    Checks if a signal transporter has been registered for this job or not.
     614    @return TRUE if a signal transporter is registered, FALSE otherwise
     615    @since 2.6
     616  */
    602617  public boolean hasSignalTransporter()
    603618  {
     
    605620  }
    606621 
     622  /**
     623    Create and initialise a signal transporter object that can be used
     624    to send signal to the plug-in running this job. Note! Depending on
     625    the transport mechanism used, it is not certain that the created
     626    transporter can actually send the signal. For example, some transporters
     627    only work if the plug-in is running in the same virtual machine, while others
     628    supports sending signals to other JVM:s on the same or a different
     629    computer.
     630    @return An initialised signal transporter object, or null
     631      if no signal transporter has been registered
     632    @throws PermissionDeniedException If the user doesn't have write
     633      permission for the job
     634    @since 2.6
     635  */
    607636  public SignalTransporter getSignalTransporter()
    608637  {
     638    checkPermission(Permission.WRITE);
    609639    String tmp = getData().getSignalTransporter();
    610640    SignalTransporter transporter = null;
  • trunk/src/core/net/sf/basedb/core/PluginExecutionRequest.java

    r4074 r4078  
    6666  private Job.ProgressReporterImpl progress;
    6767  private SignalReceiver signalReceiver = null;
    68   private boolean forceSignalReceiver = false;
    6968 
    7069  PluginExecutionRequest(SessionControl sc, Plugin plugin, String command,
     
    10099        // Register a signal handler, receiver and transporter
    101100        signalHandler = ((SignalTarget)plugin).getSignalHandler();
    102         if (!forceSignalReceiver && signalHandler != null)
    103         {
    104           SignalReceiver override = signalHandler.getSignalReceiver();
    105           if (override != null) signalReceiver = override;
    106         }
    107101        if (signalReceiver != null && signalHandler != null)
    108102        {
     
    158152   
    159153    @param signalReceiver The signal recevier to use
    160     @param force If true, the specified signal recevier will always be used,
    161       even if the plug-in wants to use another receiver implementation
    162154  */
    163   public void registerSignalReceiver(SignalReceiver signalReceiver, boolean force)
     155  public void registerSignalReceiver(SignalReceiver signalReceiver)
    164156  {
    165157    this.signalReceiver = signalReceiver;
    166     this.forceSignalReceiver = force;
    167158  }
    168159 
  • trunk/src/core/net/sf/basedb/core/signal/AbstractSignalHandler.java

    r4074 r4078  
    5454  protected AbstractSignalHandler()
    5555  {
    56     this.supported = new HashSet<Signal>();
     56    this(null);
    5757  }
    58  
     58
    5959  /**
    6060    Create a new signal handler that supports the given signals.
    61     @param supported A set with the signals that are supported
     61   
     62    @param supported A collection with the signals that are initially supported.
     63      More signals can be added with {@link #addSignal(Signal)}
    6264  */
    63   protected AbstractSignalHandler(Set<Signal> supported)
     65  protected AbstractSignalHandler(Collection<Signal> supported)
    6466  {
    65     this.supported = new HashSet<Signal>(supported);
     67    this.supported = new HashSet<Signal>();
     68    if (supported != null) this.supported.addAll(supported);
    6669  }
    6770 
     
    7881    return supported != null && supported.contains(signal);
    7982  }
    80   public SignalReceiver getSignalReceiver()
    81   {
    82     return null;
    83   }
    8483  // -------------------------------------------
    8584
     
    8887    @param signal The signal to add
    8988  */
    90   public void addSignal(Signal signal)
     89  protected void addSignal(Signal signal)
    9190  {
    9291    if (signal != null) supported.add(signal);
     
    9695    @param signal The signal to remove
    9796  */
    98   public void removeSignal(Signal signal)
     97  protected void removeSignal(Signal signal)
    9998  {
    10099    supported.remove(signal);
  • trunk/src/core/net/sf/basedb/core/signal/AbstractSignalReceiver.java

    r4074 r4078  
    2525
    2626import java.net.URI;
     27import java.util.ArrayList;
    2728import java.util.Collection;
    2829import java.util.Collections;
     
    4243    subclass.
    4344   
    44   <li>The <i>handlerId</i> part is given by calling {@link System#identityHashCode(Object)}
    45     for the signal handler.
     45  <li>The <i>handlerId</i> part is given by calling {@link #getLocalSignalHandlerId(SignalHandler)}.
     46    This method may be overridden by subclasses that needs a different method of
     47    local ID generation. The important thing is that the ID is unique among all
     48    registered signal handlers.
    4649   
    4750  <li>The <i>supportedSignals</i> part is created by joining all signal ID:s
     
    7679  private Map<String, SignalHandler> handlers;
    7780  private String receiverId;
     81  private Thread notifyThread;
    7882 
    7983  /**
     
    99103  }
    100104  /**
    101     Close this signal receiver.
    102   */
    103   public void close()
     105    Close this signal receiver. If a subclass overrides this method it
     106    should call <code>super.close()</code> if it wants to
     107    use the default shutdown notification to signal handlers. This
     108    implementation will start a separate notification thread that sends
     109    the {@link Signal#ABORT} signal to all registered handlers. The current
     110    thread will wait at most the given time. If the number of registered handlers
     111    goes down to 0 before the time has ended the current thread will be awakened
     112    so it can continue. Note that the notification thread will usually finish in
     113    a short time, but it may take longer for all worker thread of each job to
     114    react to the signal.
     115    @param wait Number of milliseconds to wait for registered jobs to abort,
     116      use a negative value to disable notification, and 0 to disable waiting
     117  */
     118  public void close(int wait)
    104119  {
    105120    logger.info("Closing signal receiver: receiver id=" + receiverId);
    106     if (handlers != null) handlers.clear();
     121    if (handlers == null) return;
     122    if (wait >= 0 && handlers.size() > 0)
     123    {
     124      Thread abortAll = new Thread(
     125        new Runnable()
     126        {
     127          public void run()
     128          {
     129            logger.info("Sending ABORT to all handlers");
     130            sendToAll(Signal.ABORT);
     131          }
     132        }
     133      );
     134      abortAll.setDaemon(true);
     135      abortAll.start();
     136      if (wait > 0)
     137      {
     138        try
     139        {
     140          logger.info("Sleeping for " + wait + " milliseconds");
     141          notifyThread = Thread.currentThread();
     142          Thread.sleep(wait);
     143          notifyThread = null;
     144        }
     145        catch (InterruptedException ex)
     146        {
     147          // Good, all handlers got the signal before timeout
     148        }
     149      }
     150      logger.info("Remaining active handlers: " + handlers.size());
     151    }
     152    handlers.clear();
    107153    handlers = null;
    108154  }
     
    115161  public String registerSignalHandler(SignalHandler handler)
    116162  {
     163    if (notifyThread != null) throw new SignalException("Signal receiver is shutting down.");
    117164    String globalId = getGlobalSignalId(handler);
    118165    String localId = getLocalSignalHandlerId(handler);
    119     if (handlers != null) handlers.put(localId, handler);
     166    if (handlers != null)
     167    {
     168      if (handlers.containsKey(localId))
     169      {
     170        throw new SignalException("A handler with ID '" + localId + "' is already registered: "
     171          + handlers.get(localId));
     172      }
     173      handlers.put(localId, handler);
     174    }
     175
    120176    logger.info("Register signal handler: recevier id = " + receiverId +
    121177      "; global id=" + globalId + "; local id=" + localId);
    122     logger.debug("Current number of registered signal handlers: " + handlers.size());
     178    logger.debug("Current number of registered signal handlers: " +
     179      (handlers == null ? 0 : handlers.size()));
    123180    return globalId;
    124181  }
     
    133190    logger.info("Unregister signal handler: recevier id = " + receiverId +
    134191      "; local id=" + localId);
    135     if (handlers != null) handlers.remove(localId);
    136     logger.debug("Current number of registered signal handlers: " + handlers.size());
     192    if (handlers != null)
     193    {
     194      handlers.remove(localId);
     195      // If called from the close() method() this is waiting for
     196      if (notifyThread != null && handlers.size() == 0)
     197      {
     198        logger.info("Interrupting closedown thread");
     199        notifyThread.interrupt();
     200      }
     201    }
     202    logger.debug("Current number of registered signal handlers: " +
     203      (handlers == null ? 0 : handlers.size()));
     204  }
     205 
     206  public void sendToAll(Signal signal)
     207  {
     208    if (handlers == null) return;
     209    logger.info("Sending " + signal + " to " + (handlers.size()) + " handlers");
     210    // Copy the signal handlers to a temporary collection,
     211    // since sending a signal may cause a handler to get
     212    // unregistered, which may fail or block due to synchronization
     213    Collection<SignalHandler> temp = null;
     214    synchronized (handlers)
     215    {
     216      temp = new ArrayList<SignalHandler>(handlers.values());
     217    }
     218    for (SignalHandler handler : temp)
     219    {
     220      if (handler.supports(signal)) handler.handleSignal(signal);
     221    }
    137222  }
    138223  // -------------------------------------------
    139224
     225  /**
     226    Get the receiver ID that was passed to the {@link #init(String)}
     227    method.
     228  */
     229  protected String getReceiverId()
     230  {
     231    return receiverId;
     232  }
     233 
    140234  /**
    141235    Generate a signal ID string. This string is returned by
     
    144238    a transporter object so that it can send signals to the specified handler.
    145239    See the class documentation for a description of the format of the
    146     generated string.
     240    generated string. The string is of the format:
     241    <p>
     242    <code>signal://handlerId@receiverId/?supportedSignals</code>
     243    <p>
     244    See class description for detailed information.
    147245   
    148246    @param handler The signal handler to generate the ID for
     
    154252    sb.append("signal://");
    155253    sb.append(getLocalSignalHandlerId(handler)).append("@");
    156     sb.append(receiverId).append("/?");
     254    sb.append(getReceiverId()).append("/?");
    157255    Collection<Signal> signals = handler.getSupportedSignals();
    158256    if (signals != null)
     
    172270    Get the local signal handler id of the given signal handler.
    173271    This implementation simply return the system hashcode for the
    174     handler.
     272    handler. The returned ID must be unique among the registered
     273    signal handlers.
    175274    @param handler The handler to get the id for
    176275    @return The local handler id
  • trunk/src/core/net/sf/basedb/core/signal/DelegatingSignalHandler.java

    r4074 r4078  
    2626import java.util.Collection;
    2727import java.util.Collections;
    28 import java.util.HashMap;
    2928import java.util.HashSet;
    3029import java.util.Iterator;
     
    4342*/
    4443public class DelegatingSignalHandler
    45   implements SignalHandler
     44  extends AbstractSignalHandler
    4645{
    4746  /**
     
    5756 
    5857  /**
    59     If a specific signal receiver must be used.
    60   */
    61   private SignalReceiver signalReceiver;
    62  
    63   /**
    6458    Create a new delegating signal handler. Signal handlers to
    6559    delegate to should be registered with
     
    6862  public DelegatingSignalHandler()
    6963  {
    70     this(null);
    71   }
    72  
    73   /**
    74     Create a new delegating signal handler using a specified signal
    75     receiver. Signal handlers to delegate to should be registered with
    76     {@link #registerSignalHandler(SignalHandler)}.
    77     @param signalReceiver The signal receiver that should receive the signals,
    78       or null to use the system default signal receiver
    79   */
    80   public DelegatingSignalHandler(SignalReceiver signalReceiver)
    81   {
    82     this.handlers = new HashMap<Signal, Set<SignalHandler>>();
    83     this.signalReceiver = signalReceiver;
     64    super();
    8465  }
    8566 
     
    9273      registered handler
    9374  */
     75  @Override
    9476  public Collection<Signal> getSupportedSignals()
    9577  {
     
    10082    Check if at least one handler has been registered for the given signal.
    10183  */
     84  @Override
    10285  public boolean supports(Signal signal)
    10386  {
     
    126109      handler.handleSignal(signal);
    127110    }
    128   }
    129  
    130   public SignalReceiver getSignalReceiver()
    131   {
    132     return signalReceiver;
    133111  }
    134112  // -------------------------------------------
     
    174152    }
    175153  }
    176  
    177154
    178155}
  • trunk/src/core/net/sf/basedb/core/signal/LocalSignalReceiver.java

    r4074 r4078  
    9090 
    9191  @Override
    92   public void close()
     92  public void close(int wait)
    9393  {
     94    super.close(wait);
    9495    receivers.remove(receiverId);
    95     super.close();
    9696  }
    9797 
  • trunk/src/core/net/sf/basedb/core/signal/SignalHandler.java

    r4074 r4078  
    3434  implementation.
    3535  <p>
    36   Signal handler implementations need to be partly thread safe. Once they have
    37   been registered with a {@link SignalReceiver} they may receive multiple signals
    38   in different threads at the same time.
     36  Signal handler implementations need to be implemented in a thread safe. Once
     37  they have been registered with a {@link SignalReceiver} they may receive
     38  multiple signals in different threads at the same time.
    3939
    4040  @author nicklas
     
    6363  */
    6464  public boolean supports(Signal signal);
    65  
    66   /**
    67     Return a specific signal receiever that must be used with this
    68     signal handler. Null should be returned to let the system select
    69     an appropriate signal receiver. If a non-null value is returned, the
    70     system should use this signal receiver instead of the system default.
    71     @return A signal receiver or null
    72   */
    73   public SignalReceiver getSignalReceiver();
    74  
     65
    7566}
  • trunk/src/core/net/sf/basedb/core/signal/SignalReceiver.java

    r4074 r4078  
    5757    Close the receiver. The receiver should close and cleanup any opened
    5858    resources, unregister itself if neccesary and stop listening for signals.
     59
     60    @param wait If the value is zero or positive, the {@link Signal#ABORT}
     61      should be sent to all registered signal handlers. The signal receiver
     62      should then wait at most the specified number of milliseconds for
     63      all signal handlers to get unregistered. If the timeout expires before
     64      all signal handlers has processed the signal, the signal receiver should
     65      continue it's shutdown.
    5966  */
    60   public void close();
     67  public void close(int wait);
    6168 
    6269  /**
     
    9198  */
    9299  public void unregisterSignalHandler(SignalHandler handler);
     100 
     101  /**
     102    Send a signal to all registered signal handlers that supports
     103    it. The main purpose of this method is to be able to send
     104    the {@link Signal#ABORT} when the system is shutting down.
     105    @param signal The signal to send
     106  */
     107  public void sendToAll(Signal signal);
     108 
    93109}
  • trunk/src/core/net/sf/basedb/core/signal/SocketSignalReceiver.java

    r4074 r4078  
    2929import java.net.ServerSocket;
    3030import java.net.Socket;
     31import java.net.URI;
     32import java.net.URISyntaxException;
    3133import java.net.UnknownHostException;
    3234import java.nio.channels.ClosedByInterruptException;
     
    4951 
    5052  <p>
    51   <code>port=xx&allow=ip-address&allow=ip-address...</code>
     53  <code>port=xx&proxy=host:port&forward=xx&allow=ip-address&allow=ip-address...</code>
    5254
    5355  <p>
    54   where <code>port</code> is the port number the signal receiver will listen on and
    55   the <code>allow</code> parts are the ip name or numbers of hosts that are allowed
    56   to send signals to the receiver. Except for the special case <code>allow=*</code>,
    57   which allows any remote host to send signals an exact match is required. The
    58   local host is always allowed to send signals.
    59   <p>
    60   If no port is given, the signal receiver will randomly choose a free port
    61   <p>
    62   If no <code>allow</code> tags are given only allow connections from the
    63   local host are allowed.
     56  where:
     57
     58  <ul>
     59  <li>
     60    The <code>port</code> is the port number the signal receiver will listen on.
     61    This is optional and if not given a random non-used port will be used.
     62 
     63  <li>
     64    The <code>proxy</code> is the address of a proxy host that will route
     65    incoming messages to this signal receiver. The proxy will automatically
     66    be added to the list of allowed hosts. This value is optional.
     67   
     68  <li>
     69    The <code>forward</code> attribute is a boolean option (1 or 0).
     70    If 1, this signal receiver will act as a proxy and forward signal
     71    messages which are of the form:
     72    <p>
     73    <code><code>signal://handlerID@host:port/?SIGNAL#forward.to.host:port</code>
     74    <p>
     75    Before the message is forwarded it is re-built like this:
     76    <p>
     77    <code>signal://handlerId@forward.to.host:port/?SIGNAL</code>
     78   
     79  <li>
     80    The <code>allow</code> parts are the ip name or numbers of hosts that are
     81    allowed to send signals to the receiver. Except for the special case
     82    <code>allow=*</code>, which allows any remote host to send signals an
     83    exact match is required. The local host is always allowed to send signals.
     84    <p>
     85    If no <code>allow</code> tags are given only allow connections from the
     86    local host (and the proxy if given) are allowed. 
     87  </ul>
    6488 
    6589  @author nicklas
     
    77101
    78102  private InetAddress ip;
     103  private Thread listener;
    79104  private int port;
    80   private Thread listener;
     105  private String proxy;
     106  private boolean allowForward;
    81107  private Set<InetAddress> allow;
    82108  private boolean allowAll;
     
    101127  public void init(String params)
    102128  {
     129    logger.info("Initialising: params=" + params);
     130    parseInitParameters(params);
     131    listenOnSocket();
     132    super.init(Application.getHostName() + ":" + port);
     133  }
     134
     135  /**
     136    @return {@link SocketSignalTransporter}
     137  */
     138  public Class<? extends SignalTransporter> getSignalTransporterClass()
     139  {
     140    return SocketSignalTransporter.class;
     141  }
     142
     143  /**
     144    Close this receiver and the socket it is listening on.
     145  */
     146  public void close(int wait)
     147  {
     148    if (listener != null)
     149    {
     150      logger.info("Interrupting socket signal receiver on port: " + port);
     151      listener.interrupt();
     152    }
     153    super.close(wait);
     154  }
     155  // -------------------------------------------
     156
     157  /*
     158    From the AbstractSignalReceiver interface
     159    -------------------------------------------
     160  */
     161  /**
     162    Override ID generation so we can add proxy information if needed.
     163  */
     164  @Override
     165  protected String getGlobalSignalId(SignalHandler handler)
     166  {
     167    String globalId = super.getGlobalSignalId(handler);
     168    if (proxy != null)
     169    {
     170      try
     171      {
     172        URI uri = new URI(globalId);
     173        globalId = "signal://" + uri.getUserInfo() + "@" + proxy +
     174          "/?" + uri.getQuery() + "#" + uri.getHost();
     175        if (uri.getPort() >= 0) globalId += ":" + uri.getPort();
     176      }
     177      catch (URISyntaxException ex)
     178      {
     179        // Should not happen
     180      }
     181    }
     182    return globalId;
     183  }
     184  /**
     185    Process the incoming message. This class will check if the
     186    message contains any forwarding information. If so, the
     187    message will be forwarde to the new destination. Otherwise,
     188    the message will be processed as normal.
     189  */
     190  @Override
     191  protected void processSignalMessage(String message)
     192  {
     193    logger.info("Processing signal message: " + message);
     194    boolean forwarded = false;
     195    if (allowForward) forwarded = forwardSignalMessageIfNeeded(message);
     196    if (!forwarded) super.processSignalMessage(message);
     197  }
     198  // -------------------------------------------
     199
     200  /**
     201    Parse the initialisation parameters. Called from the {@link #init(String)}
     202    method.
     203    @param params Parameter string passed to {@link #init(String)} method
     204  */
     205  protected void parseInitParameters(String params)
     206  {
    103207    QueryParameters qp = QueryParameters.parseQueryString(params);
    104208    // Get the port number we listen on
    105209    port = Values.getInt(qp.getValue("port"), 0);
     210    logger.info("port=" + port);
     211
     212    // Are we allowed to forward the messages
     213    allowForward = Values.getBoolean(qp.getValue("forward"));
     214    logger.info("forward=" + allowForward);
    106215   
    107216    // Get allowed hosts
     
    113222      for (String ip : allowIp)
    114223      {
     224        logger.info("allow=" + ip);
    115225        if ("*".equals(ip))
    116226        {
     
    133243    }
    134244   
     245    // Get proxy
     246    proxy = qp.getValue("proxy");
     247    logger.info("proxy=" + proxy);
     248    if (proxy != null)
     249    {
     250      try
     251      {
     252        URI proxyURI = new URI("singal://" + proxy);
     253        allow.add(InetAddress.getByName(proxyURI.getHost()));
     254      }
     255      catch (Exception ex)
     256      {
     257        throw new SignalException("Invalid proxy: " + proxy, ex);
     258      }
     259    }
     260  }
     261 
     262  /**
     263    Start listening for incoming signals.
     264  */
     265  protected void listenOnSocket()
     266  {
    135267    logger.info("Starting socket signal receiver on port: " + port);
    136268    try
     
    142274      ip = socket.getInetAddress();
    143275      listener = new Thread(new ListenerThread(channel), "ListenerThread."+port);
     276      listener.setDaemon(true);
    144277      listener.start();
    145278      logger.info("Socket signal receiver is listening on port: " + port);
     
    150283      throw new SignalException(ex);
    151284    }
    152     super.init(Application.getHostName() + ":" + port);
    153   }
    154 
    155   /**
    156     @return {@link SocketSignalTransporter}
    157   */
    158   public Class<? extends SignalTransporter> getSignalTransporterClass()
    159   {
    160     return SocketSignalTransporter.class;
    161   }
    162 
    163   /**
    164     Close this receiver and the socket it is listening on.
    165   */
    166   public void close()
    167   {
    168     logger.info("Interrupting socket signal receiver on port: " + port);
    169     listener.interrupt();
    170   }
    171   // -------------------------------------------
    172 
     285  }
     286 
     287  /**
     288    If the message has forwarding information, forward the message to
     289    the other host.
     290    @return TRUE if the message was forwarded, FALSE otherwise
     291  */
     292  protected boolean forwardSignalMessageIfNeeded(String message)
     293  {
     294    if (message == null) return false;
     295    try
     296    {
     297      URI uri = new URI(message);
     298      String fragment = uri.getFragment();
     299      if (fragment != null)
     300      {
     301        Signal signal = Signal.getSignal(uri.getQuery());
     302        String init = "signal://" + uri.getUserInfo() + "@" + fragment;
     303        logger.info("Forwarding signal message: message=" + message + "; to=" + init);
     304        SignalTransporter forwardTo = new SocketSignalTransporter();
     305        forwardTo.init(init);
     306        forwardTo.send(signal);
     307        return true;
     308      }
     309    }
     310    catch (Exception ex)
     311    {
     312      // Ignore invalid messages
     313      logger.warn("Could not process signal message: " + message, ex);
     314    }
     315    return false;
     316  }
     317 
     318  /**
     319    Checks if the specified remote host is allowed to send
     320    signals to this receiver.
     321    @param remoteHost The remote host that is sending the signal
     322    @return TRUE if the host is allowed, FALSE otherwise
     323  */
     324  protected boolean isAllowedHost(InetAddress remoteHost)
     325  {
     326    return allowAll || SocketUtil.isLocalHost(remoteHost) || allow.contains(remoteHost);
     327  }
     328 
    173329 
    174330  /**
     
    206362         
    207363          // Check if we are allowed to accept connections from the remote host
    208           if (allowAll || SocketUtil.isLocalHost(remoteHost) || allow.contains(remoteHost))
     364          if (isAllowedHost(remoteHost))
    209365          {
    210366            // Read the incoming message
     
    235391    // -------------------------------------------
    236392  }
     393
    237394 
    238395}
  • trunk/src/core/net/sf/basedb/core/signal/SocketSignalTransporter.java

    r4074 r4078  
    6666    int port = uri.getPort();
    6767    String message = generateSignalMessage(signal);
     68    if (uri.getFragment() != null) message += "#" + uri.getFragment(); // For proxy support
    6869    logger.debug("The message is: " + message);
    6970    Socket s = null;
  • trunk/src/core/net/sf/basedb/core/signal/ThreadSignalHandler.java

    r4074 r4078  
    4444  <pre class="code">
    4545// ... code in worker thread
     46threadSignalHandler.setWorkerThread(null);
    4647beginTransaction();
    4748boolean done = false;
     
    9192    received.
    9293  */
    93   private final Thread workerThread;
     94  private Thread workerThread;
     95 
     96  /**
     97    If TRUE, call Thread.stop() instead of Thread.interrupt()
     98  */
     99  private boolean forceStop;
    94100 
    95101  /**
     
    104110  /**
    105111    Create a new thread signal handler.
    106     @param thread The worker thread to interrupt when it receieves the
     112    @param workerThread The worker thread to interrupt when it receieves the
    107113      {@link Signal#ABORT} signal, or null to interrupt the current thread
    108114  */
    109   public ThreadSignalHandler(Thread thread)
     115  public ThreadSignalHandler(Thread workerThread)
    110116  {
    111117    super(supported);
    112     this.workerThread = thread == null ? Thread.currentThread() : thread;
     118    setWorkerThread(workerThread);
    113119  }
    114120 
     
    127133      {@link Signal#ABORT}
    128134  */
     135  @SuppressWarnings("deprecation")
    129136  public void handleSignal(Signal signal)
    130137  {
     
    134141    {
    135142      logger.debug("Sending signal " + signal + " to thread: " + workerThread);
    136       workerThread.interrupt();
     143      if (forceStop)
     144      {
     145        workerThread.stop(new InterruptedException("Received signal: ABORT"));
     146      }
     147      else
     148      {
     149        workerThread.interrupt();
     150      }
    137151    }
    138152    else
     
    144158  // -------------------------------------------
    145159
     160  /**
     161    Set the worker thread that should be interrupted when a signal is
     162    receiver.
     163    @param workerThread The worker thread, or null to use the current thread
     164  */
     165  public void setWorkerThread(Thread workerThread)
     166  {
     167    this.workerThread = workerThread == null ? Thread.currentThread() : workerThread;
     168  }
     169 
     170  /**
     171    Call this method with a true value to make the signal handler
     172    use {@link Thread#stop(Throwable)} instead of {@link Thread#interrupt()}.
     173    Note that the <code>Thread.stop</code> method is deprecated and it is not
     174    recommended that it is used. For some plug-ins though this may be the
     175    only option, if they are mainly executing code outside of their control
     176    which doesn't check the {@link Thread#interrupted()} status. The exception
     177    passed to the thread will be a {@link InterruptedException}.
     178    @param forceStop TRUE to stop the thread by force, FALSE to signal to
     179      it with a flag
     180  */
     181  public void setForceStop(boolean forceStop)
     182  {
     183    this.forceStop = forceStop;
     184  }
     185 
    146186}
  • trunk/src/core/net/sf/basedb/util/QueryParameters.java

    r4074 r4078  
    6363    if (query != null)
    6464    {
    65       Pattern p = Pattern.compile("(.*)=(.*)&?");
     65      Pattern p = Pattern.compile("([^=]+)=([^&]+)&?");
    6666      Matcher m = p.matcher(query);
    6767      while (m.find())
  • trunk/src/plugins/core/net/sf/basedb/plugins/AbstractFlatFileImporter.java

    r4074 r4078  
    413413  private int skippedLines;
    414414  private ClassMapErrorHandler errorHandler;
     415  private ThreadSignalHandler signalHandler;
    415416
    416417  public AbstractFlatFileImporter()
     
    449450  public void run(Request request, Response response, ProgressReporter progress)
    450451  {
     452    if (signalHandler != null) signalHandler.setWorkerThread(null);
    451453    String command = request.getCommand();
    452454    if (command.equals(Request.COMMAND_EXECUTE))
     
    678680  public SignalHandler getSignalHandler()
    679681  {
    680     return new ThreadSignalHandler();
     682    signalHandler = new ThreadSignalHandler();
     683    return signalHandler;
    681684  }
    682685  // -------------------------------------------
  • trunk/src/plugins/core/net/sf/basedb/plugins/IntensityCalculatorPlugin.java

    r3820 r4078  
    5757import net.sf.basedb.core.query.Hql;
    5858import net.sf.basedb.core.query.Orders;
     59import net.sf.basedb.core.signal.ProgressReporterSignalHandler;
     60import net.sf.basedb.core.signal.Signal;
     61import net.sf.basedb.core.signal.SignalHandler;
     62import net.sf.basedb.core.signal.SignalReceivedException;
     63import net.sf.basedb.core.signal.SignalTarget;
    5964import net.sf.basedb.util.IntensityCalculator;
    6065import net.sf.basedb.util.IntensityCalculatorUtil;
     
    8792public class IntensityCalculatorPlugin
    8893  extends AbstractPlugin
    89   implements InteractivePlugin 
     94  implements InteractivePlugin, SignalTarget
    9095{
    9196
     
    127132
    128133  private RequestInformation configureJob;
     134  private ProgressReporterSignalHandler signalHandler;
    129135 
    130136  /**
     
    180186  public void run(Request request, Response response, ProgressReporter progress)
    181187  {
     188    if (signalHandler != null)
     189    {
     190      signalHandler.forwardTo(progress);
     191      progress = signalHandler;
     192    }
    182193    DbControl dc = sc.newDbControl();
    183194    try
     
    263274      if (progress != null) progress.display(100, "Done");
    264275      response.setDone("Done");
     276    }
     277    catch (SignalReceivedException ex)
     278    {
     279      response.setError("Aborted by user", Arrays.asList(ex));
    265280    }
    266281    catch (Throwable t)
     
    412427    }
    413428  }
     429  // -------------------------------------------
     430  /*
     431    From the SignalTarget interface
     432    -------------------------------------------
     433  */
     434  public SignalHandler getSignalHandler()
     435  {
     436    signalHandler = new ProgressReporterSignalHandler(Collections.singleton(Signal.ABORT));
     437    return signalHandler;
     438  }
     439  // -------------------------------------------
    414440
    415441 
  • trunk/src/plugins/core/net/sf/basedb/plugins/LowessNormalization.java

    r4069 r4078  
    6767import net.sf.basedb.core.query.SqlResult;
    6868import net.sf.basedb.core.query.WhenStatement;
     69import net.sf.basedb.core.signal.ProgressReporterSignalHandler;
     70import net.sf.basedb.core.signal.Signal;
     71import net.sf.basedb.core.signal.SignalHandler;
     72import net.sf.basedb.core.signal.SignalReceivedException;
     73import net.sf.basedb.core.signal.SignalTarget;
    6974import net.sf.basedb.util.Values;
    7075
     
    8792public class LowessNormalization
    8893  extends AbstractAnalysisPlugin
    89   implements InteractivePlugin
     94  implements InteractivePlugin, SignalTarget
    9095{
    9196
     
    165170  private RequestInformation configureJob;
    166171
     172  private ProgressReporterSignalHandler signalHandler;
     173 
    167174  /*
    168175    From the Plugin interface
     
    201208    if (command.equals(Request.COMMAND_EXECUTE))
    202209    {
     210      if (signalHandler != null)
     211      {
     212        signalHandler.forwardTo(progress);
     213        progress = signalHandler;
     214      }
    203215      DbControl dc = null;
    204216      try
     
    224236        response.setDone(normalizedSpots + " spots normalized, " + (source.getNumSpots() - normalizedSpots) + " spots removed");
    225237      }
     238      catch (SignalReceivedException ex)
     239      {
     240        response.setError("Aborted by user", Arrays.asList(ex));
     241      }
    226242      catch (Throwable ex)
    227243      {
     
    305321      }
    306322    }
     323    catch (SignalReceivedException ex)
     324    {
     325      response.setError("Aborted by user", Arrays.asList(ex));
     326    }
    307327    catch (Throwable ex)
    308328    {
    309329      response.setError(ex.getMessage(), Arrays.asList(ex));
    310330    }
     331  }
     332  // -------------------------------------------
     333
     334  /*
     335    From the SignalTarget interface
     336    -------------------------------------------
     337  */
     338  public SignalHandler getSignalHandler()
     339  {
     340    signalHandler = new ProgressReporterSignalHandler(Collections.singleton(Signal.ABORT));
     341    return signalHandler;
    311342  }
    312343  // -------------------------------------------
  • trunk/www/views/jobs/view_job.jsp

    r4074 r4078  
    108108 
    109109  // Check if the plug-in supports the "Abort" signal
    110   boolean supportsAbort = status == Job.Status.WAITING;
    111   if (status == Job.Status.EXECUTING)
     110  boolean supportsAbort = status == Job.Status.WAITING && writePermission;
     111  if (status == Job.Status.EXECUTING && writePermission)
    112112  {
    113113    try
     
    116116      Collection<Signal> supportedSignals = signalTransporter != null ?
    117117        signalTransporter.getSupportedSignals() : null;
    118       supportsAbort = supportedSignals == null || supportedSignals.contains(Signal.ABORT);
     118      supportsAbort = signalTransporter != null &&
     119        (supportedSignals == null || supportedSignals.contains(Signal.ABORT));
    119120    }
    120121    catch (Exception ex)
     
    172173  </base:head>
    173174  <base:body onload="autoUpdate()">
    174    
    175175    <h3 class="docked"><%=title%> <base:help tabcontrol="main" /></h3>
    176176    <t:tabcontrol id="main" active="<%=tab%>" position="bottom" contentstyle="<%="height: "+(int)(scale*320)+"px;"%>">
Note: See TracChangeset for help on using the changeset viewer.