Changeset 4078
- Timestamp:
- Jan 14, 2008, 12:21:58 PM (16 years ago)
- Location:
- trunk
- Files:
-
- 6 added
- 28 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/config/dist/base.config
r4074 r4078 80 80 # Signal receiver class for sending signals to running jobs (to abort them) 81 81 jobqueue.internal.signalreceiver.class = net.sf.basedb.core.signal.LocalSignalReceiver 82 jobqueue.internal.signalreceiver.init = localhost:082 jobqueue.internal.signalreceiver.init = jobqueue:0 83 83 84 84 # If plugins with useInteralJobQueue = false should be executed or not -
trunk/doc/src/docbook/appendix/base.config.xml
r3944 r4078 379 379 380 380 <varlistentry> 381 <term><property>jobqueue.internal.signalreceiver.class</property></term> 382 <listitem> 383 <para> 384 A class implementing the <interfacename docapi="net.sf.basedb.core.signal" 385 >SignalReceiver</interfacename> 386 interface. The class must have a public no-argument constructor. If 387 no value is specified the default setting is: 388 <classname docapi="net.sf.basedb.core.signal" 389 >net.sf.basedb.core.signal.LocalSignalReceiver</classname>. 390 </para> 391 <para> 392 Change to <classname docapi="net.sf.basedb.core.signal" 393 >net.sf.basedb.core.signal.SocketSignalReceiver</classname> 394 if the internal job queue must be able to receive signals from outside 395 this JVM. 396 </para> 397 </listitem> 398 </varlistentry> 399 400 <varlistentry> 401 <term><property>jobqueue.internal.signalreceiver.init</property></term> 402 <listitem> 403 <para> 404 Initialisation string sent to <methodname>SignalReceiver.init()</methodname>. 405 The syntax and meaning of the string depends on the actual implementation 406 that is used. Please see the javadoc for more information. 407 </para> 408 </listitem> 409 </varlistentry> 410 411 <varlistentry> 381 412 <term><property>jobqueue.internal.checkinterval</property></term> 382 413 <listitem> -
trunk/doc/src/docbook/developerdoc/api_overview.xml
r4076 r4078 2806 2806 2807 2807 </sect2> 2808 2809 <sect2 id="core_api.signals"> 2810 <title>Sending signals (to plug-ins)</title> 2811 2812 <para> 2813 BASE has a simple system for sending signals between different parts of 2814 a system. This signalling system was initially developed to be able to 2815 kill plug-ins that a user for some reason wanted to abort. The signalling 2816 system as such is not limited to this and it can be used for other purposes 2817 as well. Signals can of course be handled internally in a single JVM but 2818 also sent externally to other JVM:s running on the same or a different 2819 computer. The transport mechanism for signals is decoupled from the actual 2820 handling of them. If you want to, you could implement a signal transporter 2821 that sends signal as emails and the target plug-in would never know. 2822 </para> 2823 2824 <para> 2825 The remainder of this section will focus mainly on the sending and 2826 transportation of signals. For more information about handling signals 2827 on the receiving end, see <xref linkend="plugin_developer.signals" />. 2828 </para> 2829 2830 <sect3 id="core_api.signals.diagram"> 2831 <title>Diagram of classes and methods</title> 2832 <figure id="core_api.figures.signals"> 2833 <title>The signalling system</title> 2834 <screenshot> 2835 <mediaobject> 2836 <imageobject> 2837 <imagedata 2838 align="center" 2839 scalefit="1" width="100%" 2840 fileref="figures/uml/corelayer.signals.png" format="PNG" /> 2841 </imageobject> 2842 </mediaobject> 2843 </screenshot> 2844 </figure> 2845 2846 <para> 2847 The signalling system is rather simple. An object that wish 2848 to receieve signals must implement the 2849 <interfacename docapi="net.sf.basedb.core.signal" 2850 >SignalTarget</interfacename>. It's only method 2851 is <methodname>getSignalHandler()</methodname>. A 2852 <interfacename docapi="net.sf.basedb.core.signal" 2853 >SignalHandler</interfacename> is an object that 2854 knows what to do when a signal is delivered to it. The target object 2855 may implement the <interfacename>SignalHandler</interfacename> itself 2856 or use one of the existing handlers. 2857 </para> 2858 2859 <para> 2860 The difficult part here is to be aware that a signal is usually 2861 delivered by a separate thread. The target object must be aware 2862 of this and know how to handle multiple threads. As an example we 2863 can use the <classname docapi="net.sf.basedb.core.signal" 2864 >ThreadSignalHandler</classname> which simply 2865 calls <code>Thread.interrupt()</code> to deliver a signal. The target 2866 object that uses this signal handler it must know that it should check 2867 <code>Thread.interrupted()</code> at regular intervals from the main 2868 thread. If that method returns true, it means that the <constant>ABORT</constant> 2869 signal has been delivered and the main thread should clean up and exit as 2870 soon as possible. 2871 </para> 2872 2873 <para> 2874 Even if a signal handler could be given directly to the party 2875 that may be interested in sending a signal to the target this 2876 is not recommended. This would only work when sending signals 2877 within the same virtual machine. The signalling system includes 2878 <interfacename docapi="net.sf.basedb.core.signal" 2879 >SignalTransporter</interfacename> and 2880 <interfacename docapi="net.sf.basedb.core.signal" 2881 >SignalReceiver</interfacename> objects that are used 2882 to decouple the sending of signals with the handling of signals. The 2883 implementation usually comes in pairs, for example 2884 <classname docapi="net.sf.basedb.core.signal" 2885 >SocketSignalTransporters</classname> and <classname 2886 docapi="net.sf.basedb.core.signal">SocketSignalReceiver</classname>. 2887 </para> 2888 2889 <para> 2890 Setting up the transport mechanism is usually a system responsibility. 2891 Only the system know what kind of transport that is appropriate for it's current 2892 setup. Ie. should signals be delievered by TCP/IP sockets, only internally, or 2893 should a delivery mechanism based on web services be implemented? 2894 If a system wants to receive signals it must create an appropriate 2895 <interfacename>SignalReceiver</interfacename> object. Within BASE the 2896 internal job queue set up it's own signalling system that can be used to 2897 send signals (eg. kill) running jobs. The job agents do the same but uses 2898 a different implementation. See <xref linkend="appendix.base.config.jobqueue" /> 2899 for more information about how to configure the internal job queue's 2900 signal receiver. In both cases, there is only one signal receiver instance 2901 active in the system. 2902 </para> 2903 2904 <para> 2905 Let's take the internal job queue as an example. Here is how it works: 2906 </para> 2907 2908 <itemizedlist> 2909 <listitem> 2910 <para> 2911 When the internal job queue is started, it will also create a signal 2912 receiver instance according to the settings in <filename>base.config</filename>. 2913 The default is to create <classname docapi="net.sf.basedb.core.signal" 2914 >LocalSignalReceiver</classname> 2915 which can only be used inside the same JVM. If needed, this can 2916 be changed to a <classname docapi="net.sf.basedb.core.signal" 2917 >SocketSignalReceiver</classname> or any other 2918 user-provided implementation. 2919 </para> 2920 </listitem> 2921 2922 <listitem> 2923 <para> 2924 When the job queue has found a plug-in to execute it will check if 2925 it also implements the <interfacename docapi="net.sf.basedb.core.signal" 2926 >SignalTarget</interfacename> 2927 interface. If it does, a signal handler is created and registered 2928 with the signal receiver. This is actually done by the BASE core 2929 by calling <methodname>PluginExecutionRequest.registerSignalReceiver()</methodname> 2930 which also makes sure that the the ID returned from the registration is 2931 stored in the database together with the job item representing the 2932 plug-in to execute. 2933 </para> 2934 </listitem> 2935 2936 <listitem> 2937 <para> 2938 Now, when the web client see's a running job which has a non-empty 2939 signal transporter property, the <guilabel>Abort</guilabel> 2940 button is activated. If the user clicks this button the BASE core 2941 uses the information in the database to create 2942 <interfacename docapi="net.sf.basedb.core.signal" 2943 >SignalTransporter</interfacename> object. This 2944 is simply done by calling <code>Job.getSignalTransporter()</code>. 2945 The created signal transporter knows how to send a signal 2946 to the signal receiver it was first registered with. When the 2947 signal arrives at the receiver it will find the handler for it 2948 and call <code>SignalHandler.handleSignal()</code>. This will in it's turn 2949 trigger some action in the signal target which soon will abort what 2950 it is doing and exit. 2951 </para> 2952 </listitem> 2953 </itemizedlist> 2954 2955 2956 </sect3> 2957 2958 </sect2> 2959 2808 2960 </sect1> 2809 2961 -
trunk/doc/src/docbook/developerdoc/plugin_developer.xml
r3958 r4078 4365 4365 </sect1> 4366 4366 4367 <sect1 id="plugin_developer.signals"> 4368 <title>Enable support for aborting a running a plug-in</title> 4369 4370 <para> 4371 BASE includes a simple signalling system that can be used to send 4372 signals to plug-ins. The system was primarly developed to allow a user 4373 to kill a plug-in when it is executing. Therfore, the focus of this chapter 4374 will be how to implement a plug-in to make it possible to kill it 4375 during it's execution. 4376 </para> 4377 4378 <para> 4379 Since we don't want to do this by brute force such as destroying the 4380 process or stopping thread the plug-in executes in, cooperation is needed 4381 by the plug-in. First, the plug-in must implement the 4382 <interfacename docapi="net.sf.basedb.core.signal">SignalTarget</interfacename> 4383 interface. From this, a <interfacename 4384 docapi="net.sf.basedb.core.signal">SignalHandler</interfacename> can be created. 4385 A plug-in may choose to implement it's own signal handler or use an existing 4386 implementation. BASE, for example, provides the <classname docapi="net.sf.basedb.core.signal" 4387 >ThreadSignalHandler</classname> implementation that supports the <constant>ABORT</constant> signal. 4388 This is a simple implementation that just calls <code>Thread.interrupt()</code> 4389 on the plug-in worker thread. This may cause two different effects: 4390 </para> 4391 4392 <itemizedlist> 4393 <listitem> 4394 <para> 4395 The <code>Thread.interrupted()</code> flag is set. The plug-in must check this 4396 at regular intervals and if the flag is set it must cleanup, rollback 4397 open transactions and exit as soon as possible. 4398 </para> 4399 </listitem> 4400 4401 <listitem> 4402 <para> 4403 If the plug-in is waiting in a blocking call that is interruptable, for 4404 example <code>Thread.sleep()</code>, an <classname>InterruptedException</classname> 4405 is thrown. This should cause the same actions as if the flag was set to happen. 4406 </para> 4407 4408 <warning> 4409 <title>Not all blocking calls are interruptable</title> 4410 <para> 4411 For example calling <code>InputStream.read()</code> may leave the 4412 plug-in waiting in a non-interruptable state. In this case there 4413 is nothing BASE can do to wake it up again. 4414 </para> 4415 </warning> 4416 </listitem> 4417 </itemizedlist> 4418 4419 <para> 4420 Here is a general outline for a plug-in that uses the 4421 <classname>ThreadSignalHandler</classname>. 4422 </para> 4423 <programlisting language="java"> 4424 <![CDATA[ 4425 private ThreadSignalHandler signalHandler; 4426 public SignalHandler getSignalHandler() 4427 { 4428 signalHandler = new ThreadSignalHandler(); 4429 return signalHandler; 4430 } 4431 4432 public void run(Request request, Response response, ProgressReporter progress) 4433 { 4434 if (signalHandler != null) signalHandler.setWorkerThread(null); 4435 beginTransaction(); 4436 boolean done = false; 4437 boolean interrupted = false; 4438 while (!done && !interrupted) 4439 { 4440 try 4441 { 4442 done = doSomeWork(); // NOTE! This must not take forever! 4443 interrupted = Thread.interrupted(); 4444 } 4445 catch (InterruptedException ex) 4446 { 4447 // NOTE! Try-catch is only needed if thread calls 4448 // a blocking method that is interruptable 4449 interrupted = true; 4450 } 4451 } 4452 if (interrupted) 4453 { 4454 rollbackTransaction(); 4455 response.setError("Aborted by user", null); 4456 } 4457 else 4458 { 4459 commitTransaction(); 4460 response.setDone("Done"); 4461 } 4462 } 4463 ]]> 4464 </programlisting> 4465 4466 <para> 4467 Another signal handler implementation is the 4468 <classname docapi="net.sf.basedb.core.signal">ProgressReporterSignalHandler</classname>. 4469 See that javadoc for information about how to use it. 4470 For more information about the signalling system as a whole, 4471 see <xref linkend="core_api.signals" />. 4472 </para> 4473 4474 </sect1> 4475 4367 4476 <sect1 id="plugin_developer.classload"> 4368 4477 <title>How BASE load plug-in classes</title> -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java
r4070 r4078 251 251 private JobAgentServerConnection server; 252 252 private RequestHandler requestHandler; 253 private AgentSignalReceiver signalReceiver; 253 254 private JobExecutor jobExecutor; 254 255 … … 275 276 276 277 private final Set<JobInfo> activeJobs; 277 278 279 /** 280 Timeout to wait for jobs to act on the ABORT signal when stopping. 281 Default value is 20 seconds. 282 */ 283 private int closeTimeout = 20000; 284 278 285 /** 279 286 The group were all job runners are placed. … … 531 538 if (server == null) 532 539 { 540 this.signalReceiver = new AgentSignalReceiver(this); 541 this.signalReceiver.init(null); 533 542 this.requestHandler = requestHandler == null ? new DefaultRequestHandler(this) : requestHandler; 534 543 this.server = new JobAgentServerConnection(port, this.requestHandler, logServer); … … 685 694 } 686 695 return allow; 696 } 697 698 /** 699 Get the signal receiver that is processing signal messages 700 on behalf of this job agent. 701 @since 2.6 702 */ 703 public AgentSignalReceiver getSignalReceiver() 704 { 705 return signalReceiver; 687 706 } 688 707 … … 917 936 918 937 /** 919 Try to stop running jobs by interrupting the threads th aey are executing in.938 Try to stop running jobs by interrupting the threads they are executing in. 920 939 */ 921 940 private void maybeStopRunningJobs() 922 941 { 923 942 log.info("Stopping running jobs. " + activeJobs.size() + " job(s) still active."); 924 // Interrupt all threads. Hopefully they will do as we tell them. 925 runnersGroup.interrupt(); 943 944 // Send ABORT to all jobs, that support signals 945 signalReceiver.close(closeTimeout); 926 946 } 927 947 -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/DummyJobExecutor.java
r3857 r4078 24 24 package net.sf.basedb.clients.jobagent.executors; 25 25 26 import java.util.Collections; 27 26 28 import net.sf.basedb.clients.jobagent.Agent; 27 29 import net.sf.basedb.clients.jobagent.JobExecutor; … … 30 32 import net.sf.basedb.core.JobAgentSettings; 31 33 import net.sf.basedb.core.SessionControl; 34 import net.sf.basedb.core.signal.SignalReceiver; 35 import net.sf.basedb.core.signal.ThreadSignalHandler; 32 36 import net.sf.basedb.util.Values; 33 37 … … 89 93 log.info("Executing job: " + job); 90 94 DbControl dc = null; 95 boolean aborted = false; 96 Throwable error = null; 91 97 try 92 98 { … … 96 102 if (wait > 0) 97 103 { 104 SignalReceiver signalReceiver = agent.getSignalReceiver(); 105 job.setSignalTransporter(signalReceiver.getSignalTransporterClass(), 106 signalReceiver.registerSignalHandler(new ThreadSignalHandler())); 98 107 job.setProgress(50, "Halfway; waiting " + wait + " seconds"); 99 108 dc.commit(); … … 105 114 } 106 115 } 116 catch (InterruptedException ex) 117 { 118 aborted = true; 119 } 107 120 catch (Throwable t) 108 121 { 109 122 log.error(t.getMessage(), t); 123 error = t; 110 124 } 111 125 dc = sc.newDbControl(); 112 126 dc.reattachItem(job); 113 127 } 114 job.doneOk("Not really, but used for testing job agent"); 128 if (aborted) 129 { 130 job.doneError("Aborted by user"); 131 } 132 else if (error != null) 133 { 134 job.doneError(error.getMessage(), Collections.singleton(error)); 135 } 136 else 137 { 138 job.doneOk("Not really, but used for testing job agent"); 139 } 115 140 dc.commit(); 116 141 log.info("Done executing: " + job); -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/ProcessJobExecutor.java
r3675 r4078 42 42 import net.sf.basedb.core.PluginDefinition; 43 43 import net.sf.basedb.core.SessionControl; 44 import net.sf.basedb.core.signal.Signal; 45 import net.sf.basedb.core.signal.SignalHandler; 46 import net.sf.basedb.core.signal.SignalReceiver; 47 import net.sf.basedb.core.signal.SignalTransporter; 48 import net.sf.basedb.core.signal.ThreadSignalHandler; 44 49 import net.sf.basedb.util.Values; 45 50 … … 101 106 private String javaBin; 102 107 private String options; 108 private SignalReceiver signalReceiver; 109 private SignalHandler signalHandler; 103 110 104 111 public ProcessJobExecutor() … … 120 127 if (javaBin == null) javaBin = "java"; 121 128 options = Values.getString(agent.getProperty("agent.executor.process.options"), "-server"); 129 signalReceiver = agent.getSignalReceiver(); 122 130 } 123 131 … … 126 134 { 127 135 log.info("Executing job: " + job); 128 136 129 137 // Generate command for new process 130 138 List<String> cmd = new ArrayList<String>(10); … … 164 172 cmd.add("-t"); // Thread priority 165 173 cmd.add(Integer.toString(agent.getThreadPriority(job.getEstimatedExecutionTime()))); 174 cmd.add("-x"); // Port the job agent is listening on for remote control 175 cmd.add(Integer.toString(agent.getPort())); 166 176 167 177 Process process = null; … … 201 211 new InputStreamReader(process.getInputStream()), result)); 202 212 t.start(); 203 213 204 214 try 205 215 { 206 216 log.info("Waiting for process to end"); 217 signalHandler = new ThreadSignalHandler(); 218 signalReceiver.registerSignalHandler(signalHandler); 207 219 int exitCode = process.waitFor(); 208 220 if (exitCode != 0) … … 224 236 { 225 237 log.info("Job was interrupted: " + job, ex); 226 // Kill the process 227 process.destroy(); 238 // First, send ABORT to the job if it supports it 228 239 dc = sc.newDbControl(); 229 240 job = Job.getById(dc, job.getId()); 230 job.doneError(ex.getMessage(), Arrays.asList(ex)); 241 SignalTransporter signalTransporter = job.getSignalTransporter(); 242 if (signalTransporter != null) 243 { 244 signalTransporter.send(Signal.ABORT); 245 } 246 else 247 { 248 job.doneError("Aborted by user", Arrays.asList(ex)); 249 process.destroy(); 250 } 231 251 dc.commit(); 252 } 253 finally 254 { 255 signalReceiver.unregisterSignalHandler(signalHandler); 232 256 } 233 257 } -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/ThreadJobExecutor.java
r3857 r4078 39 39 import net.sf.basedb.core.SessionControl; 40 40 import net.sf.basedb.core.plugin.Response; 41 import net.sf.basedb.core.signal.SignalReceiver; 42 import net.sf.basedb.core.signal.SocketSignalReceiver; 41 43 import net.sf.basedb.util.SocketUtil; 42 44 import net.sf.basedb.util.Values; … … 65 67 org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.executors.ThreadJobExecutor"); 66 68 69 private SignalReceiver signalReceiver; 67 70 68 71 public ThreadJobExecutor() … … 74 77 */ 75 78 public void init(Agent agent) 76 {} 79 { 80 this.signalReceiver = agent == null ? null : agent.getSignalReceiver(); 81 } 77 82 78 83 public void executeJob(SessionControl sc, Agent agent, Job job, JobAgentSettings settings, … … 99 104 log.error("Error executing job: " + job, t); 100 105 } 106 exec.registerSignalReceiver(signalReceiver); 101 107 dc.commit(); 102 108 … … 130 136 // ------------------------------------------- 131 137 138 private void setSignalReceiver(SignalReceiver signalReceiver) 139 { 140 this.signalReceiver = signalReceiver; 141 } 142 132 143 public static void main(String[] args) 133 144 { … … 138 149 String login = cmdLine.getOption("-u"); 139 150 String password = cmdLine.getOption("-p"); 151 String proxyPort = cmdLine.getOption("-x"); 140 152 int threadPriority = Values.getInt(cmdLine.getOption("-t"), Thread.NORM_PRIORITY); 141 153 … … 152 164 ThreadJobExecutor executor = new ThreadJobExecutor(); 153 165 166 if (proxyPort != null) 167 { 168 SocketSignalReceiver signalReceiver = new SocketSignalReceiver(); 169 signalReceiver.init("proxy=" + Application.getHostName() + ":" + proxyPort); 170 executor.setSignalReceiver(signalReceiver); 171 Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(signalReceiver))); 172 } 173 154 174 sc = Application.newSessionControl("net.sf.basedb.clients.jobagent", 155 175 SocketUtil.getLocalHost().toString(), null); … … 188 208 } 189 209 210 private static class ShutdownHook 211 implements Runnable 212 { 213 private SignalReceiver signalReceiver; 214 215 private ShutdownHook(SignalReceiver signalReceiver) 216 { 217 this.signalReceiver = signalReceiver; 218 } 219 public void run() 220 { 221 signalReceiver.close(5000); 222 } 223 } 190 224 } -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/DefaultRequestHandler.java
r3675 r4078 69 69 registerHandler(new StopRequestHandler(agent), "stop"); 70 70 registerHandler(new PauseRequestHandler(agent), "pause"); 71 if (agent.getSignalReceiver() != null) 72 { 73 registerHandler(new SignalRequestHandler(agent), "signal"); 74 } 71 75 } 72 76 … … 89 93 String answer = null; 90 94 RequestHandler handler = commandHandlers.get(cmd); 95 if (handler == null && cmd != null && cmd.startsWith("signal://")) 96 { 97 handler = commandHandlers.get("signal"); 98 } 91 99 if (!agent.isAllowedControl(remote, cmd)) 92 100 { -
trunk/src/core/net/sf/basedb/core/Application.java
r3960 r4078 568 568 // Stop timer and clean the cache of SessionControl objects 569 569 if (timer != null) timer.cancel(); 570 if (internalJobQueue != null) internalJobQueue.close(); 571 if (secondaryStorageController != null) secondaryStorageController.close(); 570 572 timer = null; 571 573 scheduler = null; 572 574 cleanSessionControlCache(true); 573 575 sessionCache = null; 574 if (internalJobQueue != null) internalJobQueue.close();575 if (secondaryStorageController != null) secondaryStorageController.close();576 576 577 577 Keyring.unload(); -
trunk/src/core/net/sf/basedb/core/InternalJobQueue.java
r4074 r4078 203 203 private boolean ignoreUseInternalJobQueueFlag = false; 204 204 205 /** 206 Timeout to wait for jobs to act on the ABORT signal when shutting down BASE. 207 Default value is 20 seconds. 208 */ 209 private int closeTimeout = 20000; 210 205 211 /** 206 212 Use the local signal receiver only. … … 381 387 sc.logout(); 382 388 383 // Interrupt all threads. Hopefully they will do as we tell them.384 runnersGroup.interrupt();389 // Send ABORT to all jobs that supports signalling 390 signalReceiver.close(closeTimeout); 385 391 } 386 392 … … 454 460 throw t; 455 461 } 456 exec.registerSignalReceiver(signalReceiver , false);462 exec.registerSignalReceiver(signalReceiver); 457 463 dc.commit(); 458 464 -
trunk/src/core/net/sf/basedb/core/Job.java
r4074 r4078 592 592 } 593 593 594 /** 595 Set information needed to create a signal transporter object that can be used 596 to send signals to the plug-in that is currently executing this job. 597 @param clazz The signal transporter class 598 @param initParams Initialisation parameters that will be passed 599 to {@link SignalTransporter#init(String)} 600 @throws PermissionDeniedException If the user doesn't have write 601 permission for the job 602 @since 2.6 603 */ 594 604 public void setSignalTransporter(Class<? extends SignalTransporter> clazz, String initParams) 595 605 { … … 600 610 } 601 611 612 /** 613 Checks if a signal transporter has been registered for this job or not. 614 @return TRUE if a signal transporter is registered, FALSE otherwise 615 @since 2.6 616 */ 602 617 public boolean hasSignalTransporter() 603 618 { … … 605 620 } 606 621 622 /** 623 Create and initialise a signal transporter object that can be used 624 to send signal to the plug-in running this job. Note! Depending on 625 the transport mechanism used, it is not certain that the created 626 transporter can actually send the signal. For example, some transporters 627 only work if the plug-in is running in the same virtual machine, while others 628 supports sending signals to other JVM:s on the same or a different 629 computer. 630 @return An initialised signal transporter object, or null 631 if no signal transporter has been registered 632 @throws PermissionDeniedException If the user doesn't have write 633 permission for the job 634 @since 2.6 635 */ 607 636 public SignalTransporter getSignalTransporter() 608 637 { 638 checkPermission(Permission.WRITE); 609 639 String tmp = getData().getSignalTransporter(); 610 640 SignalTransporter transporter = null; -
trunk/src/core/net/sf/basedb/core/PluginExecutionRequest.java
r4074 r4078 66 66 private Job.ProgressReporterImpl progress; 67 67 private SignalReceiver signalReceiver = null; 68 private boolean forceSignalReceiver = false;69 68 70 69 PluginExecutionRequest(SessionControl sc, Plugin plugin, String command, … … 100 99 // Register a signal handler, receiver and transporter 101 100 signalHandler = ((SignalTarget)plugin).getSignalHandler(); 102 if (!forceSignalReceiver && signalHandler != null)103 {104 SignalReceiver override = signalHandler.getSignalReceiver();105 if (override != null) signalReceiver = override;106 }107 101 if (signalReceiver != null && signalHandler != null) 108 102 { … … 158 152 159 153 @param signalReceiver The signal recevier to use 160 @param force If true, the specified signal recevier will always be used,161 even if the plug-in wants to use another receiver implementation162 154 */ 163 public void registerSignalReceiver(SignalReceiver signalReceiver , boolean force)155 public void registerSignalReceiver(SignalReceiver signalReceiver) 164 156 { 165 157 this.signalReceiver = signalReceiver; 166 this.forceSignalReceiver = force;167 158 } 168 159 -
trunk/src/core/net/sf/basedb/core/signal/AbstractSignalHandler.java
r4074 r4078 54 54 protected AbstractSignalHandler() 55 55 { 56 this .supported = new HashSet<Signal>();56 this(null); 57 57 } 58 58 59 59 /** 60 60 Create a new signal handler that supports the given signals. 61 @param supported A set with the signals that are supported 61 62 @param supported A collection with the signals that are initially supported. 63 More signals can be added with {@link #addSignal(Signal)} 62 64 */ 63 protected AbstractSignalHandler( Set<Signal> supported)65 protected AbstractSignalHandler(Collection<Signal> supported) 64 66 { 65 this.supported = new HashSet<Signal>(supported); 67 this.supported = new HashSet<Signal>(); 68 if (supported != null) this.supported.addAll(supported); 66 69 } 67 70 … … 78 81 return supported != null && supported.contains(signal); 79 82 } 80 public SignalReceiver getSignalReceiver()81 {82 return null;83 }84 83 // ------------------------------------------- 85 84 … … 88 87 @param signal The signal to add 89 88 */ 90 p ublicvoid addSignal(Signal signal)89 protected void addSignal(Signal signal) 91 90 { 92 91 if (signal != null) supported.add(signal); … … 96 95 @param signal The signal to remove 97 96 */ 98 p ublicvoid removeSignal(Signal signal)97 protected void removeSignal(Signal signal) 99 98 { 100 99 supported.remove(signal); -
trunk/src/core/net/sf/basedb/core/signal/AbstractSignalReceiver.java
r4074 r4078 25 25 26 26 import java.net.URI; 27 import java.util.ArrayList; 27 28 import java.util.Collection; 28 29 import java.util.Collections; … … 42 43 subclass. 43 44 44 <li>The <i>handlerId</i> part is given by calling {@link System#identityHashCode(Object)} 45 for the signal handler. 45 <li>The <i>handlerId</i> part is given by calling {@link #getLocalSignalHandlerId(SignalHandler)}. 46 This method may be overridden by subclasses that needs a different method of 47 local ID generation. The important thing is that the ID is unique among all 48 registered signal handlers. 46 49 47 50 <li>The <i>supportedSignals</i> part is created by joining all signal ID:s … … 76 79 private Map<String, SignalHandler> handlers; 77 80 private String receiverId; 81 private Thread notifyThread; 78 82 79 83 /** … … 99 103 } 100 104 /** 101 Close this signal receiver. 102 */ 103 public void close() 105 Close this signal receiver. If a subclass overrides this method it 106 should call <code>super.close()</code> if it wants to 107 use the default shutdown notification to signal handlers. This 108 implementation will start a separate notification thread that sends 109 the {@link Signal#ABORT} signal to all registered handlers. The current 110 thread will wait at most the given time. If the number of registered handlers 111 goes down to 0 before the time has ended the current thread will be awakened 112 so it can continue. Note that the notification thread will usually finish in 113 a short time, but it may take longer for all worker thread of each job to 114 react to the signal. 115 @param wait Number of milliseconds to wait for registered jobs to abort, 116 use a negative value to disable notification, and 0 to disable waiting 117 */ 118 public void close(int wait) 104 119 { 105 120 logger.info("Closing signal receiver: receiver id=" + receiverId); 106 if (handlers != null) handlers.clear(); 121 if (handlers == null) return; 122 if (wait >= 0 && handlers.size() > 0) 123 { 124 Thread abortAll = new Thread( 125 new Runnable() 126 { 127 public void run() 128 { 129 logger.info("Sending ABORT to all handlers"); 130 sendToAll(Signal.ABORT); 131 } 132 } 133 ); 134 abortAll.setDaemon(true); 135 abortAll.start(); 136 if (wait > 0) 137 { 138 try 139 { 140 logger.info("Sleeping for " + wait + " milliseconds"); 141 notifyThread = Thread.currentThread(); 142 Thread.sleep(wait); 143 notifyThread = null; 144 } 145 catch (InterruptedException ex) 146 { 147 // Good, all handlers got the signal before timeout 148 } 149 } 150 logger.info("Remaining active handlers: " + handlers.size()); 151 } 152 handlers.clear(); 107 153 handlers = null; 108 154 } … … 115 161 public String registerSignalHandler(SignalHandler handler) 116 162 { 163 if (notifyThread != null) throw new SignalException("Signal receiver is shutting down."); 117 164 String globalId = getGlobalSignalId(handler); 118 165 String localId = getLocalSignalHandlerId(handler); 119 if (handlers != null) handlers.put(localId, handler); 166 if (handlers != null) 167 { 168 if (handlers.containsKey(localId)) 169 { 170 throw new SignalException("A handler with ID '" + localId + "' is already registered: " 171 + handlers.get(localId)); 172 } 173 handlers.put(localId, handler); 174 } 175 120 176 logger.info("Register signal handler: recevier id = " + receiverId + 121 177 "; global id=" + globalId + "; local id=" + localId); 122 logger.debug("Current number of registered signal handlers: " + handlers.size()); 178 logger.debug("Current number of registered signal handlers: " + 179 (handlers == null ? 0 : handlers.size())); 123 180 return globalId; 124 181 } … … 133 190 logger.info("Unregister signal handler: recevier id = " + receiverId + 134 191 "; local id=" + localId); 135 if (handlers != null) handlers.remove(localId); 136 logger.debug("Current number of registered signal handlers: " + handlers.size()); 192 if (handlers != null) 193 { 194 handlers.remove(localId); 195 // If called from the close() method() this is waiting for 196 if (notifyThread != null && handlers.size() == 0) 197 { 198 logger.info("Interrupting closedown thread"); 199 notifyThread.interrupt(); 200 } 201 } 202 logger.debug("Current number of registered signal handlers: " + 203 (handlers == null ? 0 : handlers.size())); 204 } 205 206 public void sendToAll(Signal signal) 207 { 208 if (handlers == null) return; 209 logger.info("Sending " + signal + " to " + (handlers.size()) + " handlers"); 210 // Copy the signal handlers to a temporary collection, 211 // since sending a signal may cause a handler to get 212 // unregistered, which may fail or block due to synchronization 213 Collection<SignalHandler> temp = null; 214 synchronized (handlers) 215 { 216 temp = new ArrayList<SignalHandler>(handlers.values()); 217 } 218 for (SignalHandler handler : temp) 219 { 220 if (handler.supports(signal)) handler.handleSignal(signal); 221 } 137 222 } 138 223 // ------------------------------------------- 139 224 225 /** 226 Get the receiver ID that was passed to the {@link #init(String)} 227 method. 228 */ 229 protected String getReceiverId() 230 { 231 return receiverId; 232 } 233 140 234 /** 141 235 Generate a signal ID string. This string is returned by … … 144 238 a transporter object so that it can send signals to the specified handler. 145 239 See the class documentation for a description of the format of the 146 generated string. 240 generated string. The string is of the format: 241 <p> 242 <code>signal://handlerId@receiverId/?supportedSignals</code> 243 <p> 244 See class description for detailed information. 147 245 148 246 @param handler The signal handler to generate the ID for … … 154 252 sb.append("signal://"); 155 253 sb.append(getLocalSignalHandlerId(handler)).append("@"); 156 sb.append( receiverId).append("/?");254 sb.append(getReceiverId()).append("/?"); 157 255 Collection<Signal> signals = handler.getSupportedSignals(); 158 256 if (signals != null) … … 172 270 Get the local signal handler id of the given signal handler. 173 271 This implementation simply return the system hashcode for the 174 handler. 272 handler. The returned ID must be unique among the registered 273 signal handlers. 175 274 @param handler The handler to get the id for 176 275 @return The local handler id -
trunk/src/core/net/sf/basedb/core/signal/DelegatingSignalHandler.java
r4074 r4078 26 26 import java.util.Collection; 27 27 import java.util.Collections; 28 import java.util.HashMap;29 28 import java.util.HashSet; 30 29 import java.util.Iterator; … … 43 42 */ 44 43 public class DelegatingSignalHandler 45 implementsSignalHandler44 extends AbstractSignalHandler 46 45 { 47 46 /** … … 57 56 58 57 /** 59 If a specific signal receiver must be used.60 */61 private SignalReceiver signalReceiver;62 63 /**64 58 Create a new delegating signal handler. Signal handlers to 65 59 delegate to should be registered with … … 68 62 public DelegatingSignalHandler() 69 63 { 70 this(null); 71 } 72 73 /** 74 Create a new delegating signal handler using a specified signal 75 receiver. Signal handlers to delegate to should be registered with 76 {@link #registerSignalHandler(SignalHandler)}. 77 @param signalReceiver The signal receiver that should receive the signals, 78 or null to use the system default signal receiver 79 */ 80 public DelegatingSignalHandler(SignalReceiver signalReceiver) 81 { 82 this.handlers = new HashMap<Signal, Set<SignalHandler>>(); 83 this.signalReceiver = signalReceiver; 64 super(); 84 65 } 85 66 … … 92 73 registered handler 93 74 */ 75 @Override 94 76 public Collection<Signal> getSupportedSignals() 95 77 { … … 100 82 Check if at least one handler has been registered for the given signal. 101 83 */ 84 @Override 102 85 public boolean supports(Signal signal) 103 86 { … … 126 109 handler.handleSignal(signal); 127 110 } 128 }129 130 public SignalReceiver getSignalReceiver()131 {132 return signalReceiver;133 111 } 134 112 // ------------------------------------------- … … 174 152 } 175 153 } 176 177 154 178 155 } -
trunk/src/core/net/sf/basedb/core/signal/LocalSignalReceiver.java
r4074 r4078 90 90 91 91 @Override 92 public void close( )92 public void close(int wait) 93 93 { 94 super.close(wait); 94 95 receivers.remove(receiverId); 95 super.close();96 96 } 97 97 -
trunk/src/core/net/sf/basedb/core/signal/SignalHandler.java
r4074 r4078 34 34 implementation. 35 35 <p> 36 Signal handler implementations need to be partly thread safe. Once they have37 been registered with a {@link SignalReceiver} they may receive multiple signals38 in different threads at the same time.36 Signal handler implementations need to be implemented in a thread safe. Once 37 they have been registered with a {@link SignalReceiver} they may receive 38 multiple signals in different threads at the same time. 39 39 40 40 @author nicklas … … 63 63 */ 64 64 public boolean supports(Signal signal); 65 66 /** 67 Return a specific signal receiever that must be used with this 68 signal handler. Null should be returned to let the system select 69 an appropriate signal receiver. If a non-null value is returned, the 70 system should use this signal receiver instead of the system default. 71 @return A signal receiver or null 72 */ 73 public SignalReceiver getSignalReceiver(); 74 65 75 66 } -
trunk/src/core/net/sf/basedb/core/signal/SignalReceiver.java
r4074 r4078 57 57 Close the receiver. The receiver should close and cleanup any opened 58 58 resources, unregister itself if neccesary and stop listening for signals. 59 60 @param wait If the value is zero or positive, the {@link Signal#ABORT} 61 should be sent to all registered signal handlers. The signal receiver 62 should then wait at most the specified number of milliseconds for 63 all signal handlers to get unregistered. If the timeout expires before 64 all signal handlers has processed the signal, the signal receiver should 65 continue it's shutdown. 59 66 */ 60 public void close( );67 public void close(int wait); 61 68 62 69 /** … … 91 98 */ 92 99 public void unregisterSignalHandler(SignalHandler handler); 100 101 /** 102 Send a signal to all registered signal handlers that supports 103 it. The main purpose of this method is to be able to send 104 the {@link Signal#ABORT} when the system is shutting down. 105 @param signal The signal to send 106 */ 107 public void sendToAll(Signal signal); 108 93 109 } -
trunk/src/core/net/sf/basedb/core/signal/SocketSignalReceiver.java
r4074 r4078 29 29 import java.net.ServerSocket; 30 30 import java.net.Socket; 31 import java.net.URI; 32 import java.net.URISyntaxException; 31 33 import java.net.UnknownHostException; 32 34 import java.nio.channels.ClosedByInterruptException; … … 49 51 50 52 <p> 51 <code>port=xx& allow=ip-address&allow=ip-address...</code>53 <code>port=xx&proxy=host:port&forward=xx&allow=ip-address&allow=ip-address...</code> 52 54 53 55 <p> 54 where <code>port</code> is the port number the signal receiver will listen on and 55 the <code>allow</code> parts are the ip name or numbers of hosts that are allowed 56 to send signals to the receiver. Except for the special case <code>allow=*</code>, 57 which allows any remote host to send signals an exact match is required. The 58 local host is always allowed to send signals. 59 <p> 60 If no port is given, the signal receiver will randomly choose a free port 61 <p> 62 If no <code>allow</code> tags are given only allow connections from the 63 local host are allowed. 56 where: 57 58 <ul> 59 <li> 60 The <code>port</code> is the port number the signal receiver will listen on. 61 This is optional and if not given a random non-used port will be used. 62 63 <li> 64 The <code>proxy</code> is the address of a proxy host that will route 65 incoming messages to this signal receiver. The proxy will automatically 66 be added to the list of allowed hosts. This value is optional. 67 68 <li> 69 The <code>forward</code> attribute is a boolean option (1 or 0). 70 If 1, this signal receiver will act as a proxy and forward signal 71 messages which are of the form: 72 <p> 73 <code><code>signal://handlerID@host:port/?SIGNAL#forward.to.host:port</code> 74 <p> 75 Before the message is forwarded it is re-built like this: 76 <p> 77 <code>signal://handlerId@forward.to.host:port/?SIGNAL</code> 78 79 <li> 80 The <code>allow</code> parts are the ip name or numbers of hosts that are 81 allowed to send signals to the receiver. Except for the special case 82 <code>allow=*</code>, which allows any remote host to send signals an 83 exact match is required. The local host is always allowed to send signals. 84 <p> 85 If no <code>allow</code> tags are given only allow connections from the 86 local host (and the proxy if given) are allowed. 87 </ul> 64 88 65 89 @author nicklas … … 77 101 78 102 private InetAddress ip; 103 private Thread listener; 79 104 private int port; 80 private Thread listener; 105 private String proxy; 106 private boolean allowForward; 81 107 private Set<InetAddress> allow; 82 108 private boolean allowAll; … … 101 127 public void init(String params) 102 128 { 129 logger.info("Initialising: params=" + params); 130 parseInitParameters(params); 131 listenOnSocket(); 132 super.init(Application.getHostName() + ":" + port); 133 } 134 135 /** 136 @return {@link SocketSignalTransporter} 137 */ 138 public Class<? extends SignalTransporter> getSignalTransporterClass() 139 { 140 return SocketSignalTransporter.class; 141 } 142 143 /** 144 Close this receiver and the socket it is listening on. 145 */ 146 public void close(int wait) 147 { 148 if (listener != null) 149 { 150 logger.info("Interrupting socket signal receiver on port: " + port); 151 listener.interrupt(); 152 } 153 super.close(wait); 154 } 155 // ------------------------------------------- 156 157 /* 158 From the AbstractSignalReceiver interface 159 ------------------------------------------- 160 */ 161 /** 162 Override ID generation so we can add proxy information if needed. 163 */ 164 @Override 165 protected String getGlobalSignalId(SignalHandler handler) 166 { 167 String globalId = super.getGlobalSignalId(handler); 168 if (proxy != null) 169 { 170 try 171 { 172 URI uri = new URI(globalId); 173 globalId = "signal://" + uri.getUserInfo() + "@" + proxy + 174 "/?" + uri.getQuery() + "#" + uri.getHost(); 175 if (uri.getPort() >= 0) globalId += ":" + uri.getPort(); 176 } 177 catch (URISyntaxException ex) 178 { 179 // Should not happen 180 } 181 } 182 return globalId; 183 } 184 /** 185 Process the incoming message. This class will check if the 186 message contains any forwarding information. If so, the 187 message will be forwarde to the new destination. Otherwise, 188 the message will be processed as normal. 189 */ 190 @Override 191 protected void processSignalMessage(String message) 192 { 193 logger.info("Processing signal message: " + message); 194 boolean forwarded = false; 195 if (allowForward) forwarded = forwardSignalMessageIfNeeded(message); 196 if (!forwarded) super.processSignalMessage(message); 197 } 198 // ------------------------------------------- 199 200 /** 201 Parse the initialisation parameters. Called from the {@link #init(String)} 202 method. 203 @param params Parameter string passed to {@link #init(String)} method 204 */ 205 protected void parseInitParameters(String params) 206 { 103 207 QueryParameters qp = QueryParameters.parseQueryString(params); 104 208 // Get the port number we listen on 105 209 port = Values.getInt(qp.getValue("port"), 0); 210 logger.info("port=" + port); 211 212 // Are we allowed to forward the messages 213 allowForward = Values.getBoolean(qp.getValue("forward")); 214 logger.info("forward=" + allowForward); 106 215 107 216 // Get allowed hosts … … 113 222 for (String ip : allowIp) 114 223 { 224 logger.info("allow=" + ip); 115 225 if ("*".equals(ip)) 116 226 { … … 133 243 } 134 244 245 // Get proxy 246 proxy = qp.getValue("proxy"); 247 logger.info("proxy=" + proxy); 248 if (proxy != null) 249 { 250 try 251 { 252 URI proxyURI = new URI("singal://" + proxy); 253 allow.add(InetAddress.getByName(proxyURI.getHost())); 254 } 255 catch (Exception ex) 256 { 257 throw new SignalException("Invalid proxy: " + proxy, ex); 258 } 259 } 260 } 261 262 /** 263 Start listening for incoming signals. 264 */ 265 protected void listenOnSocket() 266 { 135 267 logger.info("Starting socket signal receiver on port: " + port); 136 268 try … … 142 274 ip = socket.getInetAddress(); 143 275 listener = new Thread(new ListenerThread(channel), "ListenerThread."+port); 276 listener.setDaemon(true); 144 277 listener.start(); 145 278 logger.info("Socket signal receiver is listening on port: " + port); … … 150 283 throw new SignalException(ex); 151 284 } 152 super.init(Application.getHostName() + ":" + port); 153 } 154 155 /** 156 @return {@link SocketSignalTransporter} 157 */ 158 public Class<? extends SignalTransporter> getSignalTransporterClass() 159 { 160 return SocketSignalTransporter.class; 161 } 162 163 /** 164 Close this receiver and the socket it is listening on. 165 */ 166 public void close() 167 { 168 logger.info("Interrupting socket signal receiver on port: " + port); 169 listener.interrupt(); 170 } 171 // ------------------------------------------- 172 285 } 286 287 /** 288 If the message has forwarding information, forward the message to 289 the other host. 290 @return TRUE if the message was forwarded, FALSE otherwise 291 */ 292 protected boolean forwardSignalMessageIfNeeded(String message) 293 { 294 if (message == null) return false; 295 try 296 { 297 URI uri = new URI(message); 298 String fragment = uri.getFragment(); 299 if (fragment != null) 300 { 301 Signal signal = Signal.getSignal(uri.getQuery()); 302 String init = "signal://" + uri.getUserInfo() + "@" + fragment; 303 logger.info("Forwarding signal message: message=" + message + "; to=" + init); 304 SignalTransporter forwardTo = new SocketSignalTransporter(); 305 forwardTo.init(init); 306 forwardTo.send(signal); 307 return true; 308 } 309 } 310 catch (Exception ex) 311 { 312 // Ignore invalid messages 313 logger.warn("Could not process signal message: " + message, ex); 314 } 315 return false; 316 } 317 318 /** 319 Checks if the specified remote host is allowed to send 320 signals to this receiver. 321 @param remoteHost The remote host that is sending the signal 322 @return TRUE if the host is allowed, FALSE otherwise 323 */ 324 protected boolean isAllowedHost(InetAddress remoteHost) 325 { 326 return allowAll || SocketUtil.isLocalHost(remoteHost) || allow.contains(remoteHost); 327 } 328 173 329 174 330 /** … … 206 362 207 363 // Check if we are allowed to accept connections from the remote host 208 if ( allowAll || SocketUtil.isLocalHost(remoteHost) || allow.contains(remoteHost))364 if (isAllowedHost(remoteHost)) 209 365 { 210 366 // Read the incoming message … … 235 391 // ------------------------------------------- 236 392 } 393 237 394 238 395 } -
trunk/src/core/net/sf/basedb/core/signal/SocketSignalTransporter.java
r4074 r4078 66 66 int port = uri.getPort(); 67 67 String message = generateSignalMessage(signal); 68 if (uri.getFragment() != null) message += "#" + uri.getFragment(); // For proxy support 68 69 logger.debug("The message is: " + message); 69 70 Socket s = null; -
trunk/src/core/net/sf/basedb/core/signal/ThreadSignalHandler.java
r4074 r4078 44 44 <pre class="code"> 45 45 // ... code in worker thread 46 threadSignalHandler.setWorkerThread(null); 46 47 beginTransaction(); 47 48 boolean done = false; … … 91 92 received. 92 93 */ 93 private final Thread workerThread; 94 private Thread workerThread; 95 96 /** 97 If TRUE, call Thread.stop() instead of Thread.interrupt() 98 */ 99 private boolean forceStop; 94 100 95 101 /** … … 104 110 /** 105 111 Create a new thread signal handler. 106 @param thread The worker thread to interrupt when it receieves the112 @param workerThread The worker thread to interrupt when it receieves the 107 113 {@link Signal#ABORT} signal, or null to interrupt the current thread 108 114 */ 109 public ThreadSignalHandler(Thread thread)115 public ThreadSignalHandler(Thread workerThread) 110 116 { 111 117 super(supported); 112 this.workerThread = thread == null ? Thread.currentThread() : thread;118 setWorkerThread(workerThread); 113 119 } 114 120 … … 127 133 {@link Signal#ABORT} 128 134 */ 135 @SuppressWarnings("deprecation") 129 136 public void handleSignal(Signal signal) 130 137 { … … 134 141 { 135 142 logger.debug("Sending signal " + signal + " to thread: " + workerThread); 136 workerThread.interrupt(); 143 if (forceStop) 144 { 145 workerThread.stop(new InterruptedException("Received signal: ABORT")); 146 } 147 else 148 { 149 workerThread.interrupt(); 150 } 137 151 } 138 152 else … … 144 158 // ------------------------------------------- 145 159 160 /** 161 Set the worker thread that should be interrupted when a signal is 162 receiver. 163 @param workerThread The worker thread, or null to use the current thread 164 */ 165 public void setWorkerThread(Thread workerThread) 166 { 167 this.workerThread = workerThread == null ? Thread.currentThread() : workerThread; 168 } 169 170 /** 171 Call this method with a true value to make the signal handler 172 use {@link Thread#stop(Throwable)} instead of {@link Thread#interrupt()}. 173 Note that the <code>Thread.stop</code> method is deprecated and it is not 174 recommended that it is used. For some plug-ins though this may be the 175 only option, if they are mainly executing code outside of their control 176 which doesn't check the {@link Thread#interrupted()} status. The exception 177 passed to the thread will be a {@link InterruptedException}. 178 @param forceStop TRUE to stop the thread by force, FALSE to signal to 179 it with a flag 180 */ 181 public void setForceStop(boolean forceStop) 182 { 183 this.forceStop = forceStop; 184 } 185 146 186 } -
trunk/src/core/net/sf/basedb/util/QueryParameters.java
r4074 r4078 63 63 if (query != null) 64 64 { 65 Pattern p = Pattern.compile("( .*)=(.*)&?");65 Pattern p = Pattern.compile("([^=]+)=([^&]+)&?"); 66 66 Matcher m = p.matcher(query); 67 67 while (m.find()) -
trunk/src/plugins/core/net/sf/basedb/plugins/AbstractFlatFileImporter.java
r4074 r4078 413 413 private int skippedLines; 414 414 private ClassMapErrorHandler errorHandler; 415 private ThreadSignalHandler signalHandler; 415 416 416 417 public AbstractFlatFileImporter() … … 449 450 public void run(Request request, Response response, ProgressReporter progress) 450 451 { 452 if (signalHandler != null) signalHandler.setWorkerThread(null); 451 453 String command = request.getCommand(); 452 454 if (command.equals(Request.COMMAND_EXECUTE)) … … 678 680 public SignalHandler getSignalHandler() 679 681 { 680 return new ThreadSignalHandler(); 682 signalHandler = new ThreadSignalHandler(); 683 return signalHandler; 681 684 } 682 685 // ------------------------------------------- -
trunk/src/plugins/core/net/sf/basedb/plugins/IntensityCalculatorPlugin.java
r3820 r4078 57 57 import net.sf.basedb.core.query.Hql; 58 58 import net.sf.basedb.core.query.Orders; 59 import net.sf.basedb.core.signal.ProgressReporterSignalHandler; 60 import net.sf.basedb.core.signal.Signal; 61 import net.sf.basedb.core.signal.SignalHandler; 62 import net.sf.basedb.core.signal.SignalReceivedException; 63 import net.sf.basedb.core.signal.SignalTarget; 59 64 import net.sf.basedb.util.IntensityCalculator; 60 65 import net.sf.basedb.util.IntensityCalculatorUtil; … … 87 92 public class IntensityCalculatorPlugin 88 93 extends AbstractPlugin 89 implements InteractivePlugin 94 implements InteractivePlugin, SignalTarget 90 95 { 91 96 … … 127 132 128 133 private RequestInformation configureJob; 134 private ProgressReporterSignalHandler signalHandler; 129 135 130 136 /** … … 180 186 public void run(Request request, Response response, ProgressReporter progress) 181 187 { 188 if (signalHandler != null) 189 { 190 signalHandler.forwardTo(progress); 191 progress = signalHandler; 192 } 182 193 DbControl dc = sc.newDbControl(); 183 194 try … … 263 274 if (progress != null) progress.display(100, "Done"); 264 275 response.setDone("Done"); 276 } 277 catch (SignalReceivedException ex) 278 { 279 response.setError("Aborted by user", Arrays.asList(ex)); 265 280 } 266 281 catch (Throwable t) … … 412 427 } 413 428 } 429 // ------------------------------------------- 430 /* 431 From the SignalTarget interface 432 ------------------------------------------- 433 */ 434 public SignalHandler getSignalHandler() 435 { 436 signalHandler = new ProgressReporterSignalHandler(Collections.singleton(Signal.ABORT)); 437 return signalHandler; 438 } 439 // ------------------------------------------- 414 440 415 441 -
trunk/src/plugins/core/net/sf/basedb/plugins/LowessNormalization.java
r4069 r4078 67 67 import net.sf.basedb.core.query.SqlResult; 68 68 import net.sf.basedb.core.query.WhenStatement; 69 import net.sf.basedb.core.signal.ProgressReporterSignalHandler; 70 import net.sf.basedb.core.signal.Signal; 71 import net.sf.basedb.core.signal.SignalHandler; 72 import net.sf.basedb.core.signal.SignalReceivedException; 73 import net.sf.basedb.core.signal.SignalTarget; 69 74 import net.sf.basedb.util.Values; 70 75 … … 87 92 public class LowessNormalization 88 93 extends AbstractAnalysisPlugin 89 implements InteractivePlugin 94 implements InteractivePlugin, SignalTarget 90 95 { 91 96 … … 165 170 private RequestInformation configureJob; 166 171 172 private ProgressReporterSignalHandler signalHandler; 173 167 174 /* 168 175 From the Plugin interface … … 201 208 if (command.equals(Request.COMMAND_EXECUTE)) 202 209 { 210 if (signalHandler != null) 211 { 212 signalHandler.forwardTo(progress); 213 progress = signalHandler; 214 } 203 215 DbControl dc = null; 204 216 try … … 224 236 response.setDone(normalizedSpots + " spots normalized, " + (source.getNumSpots() - normalizedSpots) + " spots removed"); 225 237 } 238 catch (SignalReceivedException ex) 239 { 240 response.setError("Aborted by user", Arrays.asList(ex)); 241 } 226 242 catch (Throwable ex) 227 243 { … … 305 321 } 306 322 } 323 catch (SignalReceivedException ex) 324 { 325 response.setError("Aborted by user", Arrays.asList(ex)); 326 } 307 327 catch (Throwable ex) 308 328 { 309 329 response.setError(ex.getMessage(), Arrays.asList(ex)); 310 330 } 331 } 332 // ------------------------------------------- 333 334 /* 335 From the SignalTarget interface 336 ------------------------------------------- 337 */ 338 public SignalHandler getSignalHandler() 339 { 340 signalHandler = new ProgressReporterSignalHandler(Collections.singleton(Signal.ABORT)); 341 return signalHandler; 311 342 } 312 343 // ------------------------------------------- -
trunk/www/views/jobs/view_job.jsp
r4074 r4078 108 108 109 109 // Check if the plug-in supports the "Abort" signal 110 boolean supportsAbort = status == Job.Status.WAITING ;111 if (status == Job.Status.EXECUTING )110 boolean supportsAbort = status == Job.Status.WAITING && writePermission; 111 if (status == Job.Status.EXECUTING && writePermission) 112 112 { 113 113 try … … 116 116 Collection<Signal> supportedSignals = signalTransporter != null ? 117 117 signalTransporter.getSupportedSignals() : null; 118 supportsAbort = supportedSignals == null || supportedSignals.contains(Signal.ABORT); 118 supportsAbort = signalTransporter != null && 119 (supportedSignals == null || supportedSignals.contains(Signal.ABORT)); 119 120 } 120 121 catch (Exception ex) … … 172 173 </base:head> 173 174 <base:body onload="autoUpdate()"> 174 175 175 <h3 class="docked"><%=title%> <base:help tabcontrol="main" /></h3> 176 176 <t:tabcontrol id="main" active="<%=tab%>" position="bottom" contentstyle="<%="height: "+(int)(scale*320)+"px;"%>">
Note: See TracChangeset
for help on using the changeset viewer.