Changeset 4074
- Timestamp:
- Jan 8, 2008, 1:06:44 PM (15 years ago)
- Location:
- trunk
- Files:
-
- 2 added
- 24 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/config/dist/base.config
r3871 r4074 77 77 # If the internal job queue should be enabled or not 78 78 jobqueue.internal.enabled = true 79 80 # Signal receiver class for sending signals to running jobs (to abort them) 81 jobqueue.internal.signalreceiver.class = net.sf.basedb.core.signal.LocalSignalReceiver 82 jobqueue.internal.signalreceiver.init = localhost:0 79 83 80 84 # If plugins with useInteralJobQueue = false should be executed or not -
trunk/config/dist/log4j.properties
r3992 r4074 54 54 #log4j.logger.net.sf.basedb.core.storage=debug 55 55 56 ### Log Signal handling events 57 #log4j.logger.net.sf.basedb.core.signal=debug 58 59 56 60 # ----------------- 57 61 # Migration loggers -
trunk/src/core/net/sf/basedb/core/Install.java
r4034 r4074 107 107 method. 108 108 */ 109 public static final int NEW_SCHEMA_VERSION = Integer.valueOf(4 6).intValue();109 public static final int NEW_SCHEMA_VERSION = Integer.valueOf(47).intValue(); 110 110 111 111 public static synchronized void createTables(boolean update, final ProgressReporter progress) -
trunk/src/core/net/sf/basedb/core/InternalJobQueue.java
r4073 r4074 26 26 27 27 import net.sf.basedb.core.plugin.Response; 28 import net.sf.basedb.core.signal.SignalReceiver; 28 29 import net.sf.basedb.core.data.JobData; 29 30 import net.sf.basedb.core.data.PluginDefinitionData; … … 201 202 */ 202 203 private boolean ignoreUseInternalJobQueueFlag = false; 203 204 205 /** 206 Use the local signal receiver only. 207 */ 208 private static SignalReceiver signalReceiver; 209 204 210 /** 205 211 Create the job queue. The one and only instance is created … … 226 232 } 227 233 runners = Collections.synchronizedSet(new HashSet<JobRunner>()); 234 235 // Signal receiver 236 String signalReceiverClass = Config.getString("jobqueue.internal.signalreceiver.class"); 237 String signalReceiverInit = Config.getString("jobqueue.internal.signalreceiver.init"); 238 if (signalReceiverClass == null) 239 { 240 signalReceiverClass = "net.sf.basedb.core.signal.LocalSignalReceiver"; 241 if (signalReceiverInit == null) signalReceiverInit = "localhost:0"; 242 } 243 try 244 { 245 signalReceiver = (SignalReceiver)Class.forName(signalReceiverClass).newInstance(); 246 signalReceiver.init(signalReceiverInit); 247 } 248 catch (Exception ex) 249 { 250 throw new BaseException(ex); 251 } 228 252 229 253 // Configure thread group … … 430 454 throw t; 431 455 } 432 //exec.setSignalReceiver(signalReceiver);456 exec.registerSignalReceiver(signalReceiver, false); 433 457 dc.commit(); 434 458 -
trunk/src/core/net/sf/basedb/core/Job.java
r4034 r4074 39 39 import net.sf.basedb.core.plugin.Response; 40 40 import net.sf.basedb.core.plugin.Plugin.MainType; 41 import net.sf.basedb.core.signal.SignalTransporter; 41 42 42 43 import java.util.Collection; … … 591 592 } 592 593 594 public void setSignalTransporter(Class<? extends SignalTransporter> clazz, String initParams) 595 { 596 checkPermission(Permission.WRITE); 597 if (clazz == null) throw new InvalidUseOfNullException("clazz"); 598 if (initParams == null) initParams = ""; 599 getData().setSignalTransporter(clazz.getName() + ":" + initParams); 600 } 601 602 public boolean hasSignalTransporter() 603 { 604 return getData().getSignalTransporter() != null; 605 } 606 607 public SignalTransporter getSignalTransporter() 608 { 609 String tmp = getData().getSignalTransporter(); 610 SignalTransporter transporter = null; 611 if (tmp != null) 612 { 613 try 614 { 615 String[] tmp2 = tmp.split(":", 2); 616 String transporterClass = tmp2[0]; 617 String initParams = tmp2.length > 1 ? tmp2[1] : null; 618 transporter = (SignalTransporter)Class.forName(transporterClass).newInstance(); 619 transporter.init(initParams); 620 } 621 catch (Exception ex) 622 { 623 throw new BaseException(ex); 624 } 625 } 626 return transporter; 627 } 628 593 629 /** 594 630 Get the date and time the job ended. … … 705 741 data.setEnded(new Date()); 706 742 data.setStackTrace(null); 743 data.setSignalTransporter(null); 707 744 if (getSendMessage()) sendMessage(); 708 745 } … … 726 763 data.setStackTrace(null); 727 764 data.setEnded(new Date()); 765 data.setSignalTransporter(null); 728 766 if (getSendMessage()) sendMessage(); 729 767 } … … 747 785 data.setPercentComplete(100); 748 786 data.setStatus(Status.ERROR.getValue()); 787 data.setSignalTransporter(null); 749 788 if (errors != null) 750 789 { … … 1227 1266 public void display(int percent, String message) 1228 1267 { 1229 // Do not update unless at least some time has passed 1230 if (System.currentTimeMillis() - lastUpdate > UPDATE_INTERVAL )1268 // Do not update unless at least some time has passed, or the plug-in has finished 1269 if (System.currentTimeMillis() - lastUpdate > UPDATE_INTERVAL || percent == 100) 1231 1270 { 1232 1271 lastUpdate = System.currentTimeMillis(); … … 1284 1323 } 1285 1324 1286 void setStarted( )1325 void setStarted(Class<? extends SignalTransporter> signalTransporter, String signalId) 1287 1326 { 1288 1327 DbControl dc = null; … … 1291 1330 dc = sc.newDbControl(); 1292 1331 Job job = Job.getById(dc, jobId); 1293 job.start("Starting...", server); 1332 if (signalTransporter != null && signalId != null) 1333 { 1334 job.setSignalTransporter(signalTransporter, signalId); 1335 } 1336 job.start("Starting...", server); 1294 1337 dc.commit(); 1295 1338 } -
trunk/src/core/net/sf/basedb/core/PluginExecutionRequest.java
r4073 r4074 31 31 import net.sf.basedb.core.signal.SignalReceiver; 32 32 import net.sf.basedb.core.signal.SignalTarget; 33 import net.sf.basedb.core.signal.SignalTransporter; 33 34 34 35 /** … … 64 65 { 65 66 private Job.ProgressReporterImpl progress; 67 private SignalReceiver signalReceiver = null; 68 private boolean forceSignalReceiver = false; 66 69 67 70 PluginExecutionRequest(SessionControl sc, Plugin plugin, String command, … … 87 90 Request request = new RequestImpl(false); 88 91 Response response = pluginResponse.getResponseImpl(); 89 if (progress != null) progress.setStarted();92 SignalHandler signalHandler = null; 90 93 try 91 94 { 92 getPlugin().run(request, response, progress); 95 Plugin plugin = getPlugin(); 96 String signalId = null; 97 Class<? extends SignalTransporter> signalTransporter = null; 98 if (plugin instanceof SignalTarget) 99 { 100 // Register a signal handler, receiver and transporter 101 signalHandler = ((SignalTarget)plugin).getSignalHandler(); 102 if (!forceSignalReceiver && signalHandler != null) 103 { 104 SignalReceiver override = signalHandler.getSignalReceiver(); 105 if (override != null) signalReceiver = override; 106 } 107 if (signalReceiver != null && signalHandler != null) 108 { 109 signalId = signalReceiver.registerSignalHandler(signalHandler); 110 signalTransporter = signalReceiver.getSignalTransporterClass(); 111 } 112 } 113 if (progress != null) progress.setStarted(signalTransporter, signalId); 114 plugin.run(request, response, progress); 93 115 } 94 116 catch (Throwable t) 95 117 { 96 118 pluginResponse.setError("Error invoking plugin: " + t.getMessage() , t); 119 } 120 finally 121 { 122 if (signalReceiver != null && signalHandler != null) 123 { 124 signalReceiver.unregisterSignalHandler(signalHandler); 125 } 97 126 } 98 127 done(); … … 121 150 } 122 151 123 public void setSignalReceiver(SignalReceiver signalReceiver) 152 /** 153 Register a signal receiver that will receive signals for this job. 154 The plug-in must implement the {@link SignalTarget} interface 155 and provide a {@link SignalHandler}. If not, the job will not be 156 registered with the signal receiver and it will not be possible to 157 send signals to the job. 158 159 @param signalReceiver The signal recevier to use 160 @param force If true, the specified signal recevier will always be used, 161 even if the plug-in wants to use another receiver implementation 162 */ 163 public void registerSignalReceiver(SignalReceiver signalReceiver, boolean force) 124 164 { 125 Plugin p = getPlugin(); 126 SignalHandler signalHandler = null; 127 if (p instanceof SignalTarget) 128 { 129 signalHandler = ((SignalTarget)p).getSignalHandler(); 130 } 131 if (signalHandler != null) 132 { 133 String ID = signalReceiver.registerSignalHandler(signalHandler); 134 Job job = getJob(); 135 //job.setSignalReceiverId(ID); 136 } 165 this.signalReceiver = signalReceiver; 166 this.forceSignalReceiver = force; 137 167 } 138 168 -
trunk/src/core/net/sf/basedb/core/Update.java
r3979 r4074 544 544 </td> 545 545 </tr> 546 <tr> 547 <td>47</td> 548 <td> 549 <ul> 550 <li>Added {@link net.sf.basedb.core.data.JobData#getSignalTransporter()}. 551 </ul> 552 No special database update is needed. Only increase the schema version. 553 </td> 554 </tr> 546 555 </table> 547 556 … … 774 783 if (progress != null) progress.display((int)(45*progress_factor), "--Updating schema version: " + schemaVersion + " -> 46..."); 775 784 schemaVersion = setSchemaVersionInTransaction(session, 46); 785 } 786 787 if (schemaVersion < 47) 788 { 789 if (progress != null) progress.display((int)(46*progress_factor), "--Updating schema version: " + schemaVersion + " -> 47..."); 790 schemaVersion = setSchemaVersionInTransaction(session, 47); 776 791 } 777 792 -
trunk/src/core/net/sf/basedb/core/data/JobData.java
r3948 r4074 356 356 } 357 357 358 /** 359 The maximum allowed length of the signal transporter string. 360 */ 361 public static final int MAX_SIGNAL_TRANSPORTER_LENGTH = 655536; 362 private String signalTransporter; 363 /** 364 Identifies the signal transporter to use. This is a string with 365 two parts separated by colon (:). The first part is the class name 366 of the signal transporter class, the second part is the initialisation 367 string for the transporter. 368 @hibernate.property column="`signal_transporter`" type="text" not-null="false" 369 @since 2.6 370 */ 371 public String getSignalTransporter() 372 { 373 return signalTransporter; 374 } 375 public void setSignalTransporter(String signalTransporter) 376 { 377 this.signalTransporter = signalTransporter; 378 } 358 379 359 380 private Map<String, ParameterValueData<?>> parameters; -
trunk/src/core/net/sf/basedb/core/signal/AbstractSignalHandler.java
r4073 r4074 78 78 return supported != null && supported.contains(signal); 79 79 } 80 public SignalReceiver getSignalReceiver() 81 { 82 return null; 83 } 80 84 // ------------------------------------------- 81 85 -
trunk/src/core/net/sf/basedb/core/signal/AbstractSignalReceiver.java
r4073 r4074 24 24 package net.sf.basedb.core.signal; 25 25 26 import java.net.URI; 26 27 import java.util.Collection; 27 28 import java.util.Collections; … … 34 35 return values for the {@link #registerSignalHandler(SignalHandler)} method. 35 36 This class will generate values in the form of {@link java.net.URI}:s. 36 <code>signal:// receiverId/handlerId?supportedSignals</code>.37 <code>signal://handlerId@receiverId/?supportedSignals</code>. 37 38 38 39 <ul> 39 40 <li>The <i>receiverId</i> part is given by the parameter in the 40 {@link #init(String)} method and must be set by the implementing subclass. 41 {@link #init(String)} method and must be set by the implementing 42 subclass. 41 43 42 44 <li>The <i>handlerId</i> part is given by calling {@link System#identityHashCode(Object)} … … 51 53 as higher-level objects. 52 54 <p> 53 Subclasses should override the {@link #get SignalHandlerId(SignalHandler)} if they55 Subclasses should override the {@link #getGlobalSignalId(SignalHandler)} if they 54 56 want to use a different ID generation scheme. 55 57 … … 62 64 { 63 65 66 /** 67 Log signals processing. 68 */ 69 private static final org.apache.log4j.Logger logger = 70 org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.AbstractSignalReceiver"); 71 72 73 /** 74 Maps local signal handler ID to -> signal handler itself. 75 */ 64 76 private Map<String, SignalHandler> handlers; 65 77 private String receiverId; … … 77 89 /** 78 90 Initialise the signal receiver. 79 @param paramsThe ID of the signal receiver, needed if the default91 @param receiverId The ID of the signal receiver, needed if the default 80 92 ID generation should be used 81 93 */ 82 public void init(String params)94 public void init(String receiverId) 83 95 { 84 96 handlers = Collections.synchronizedMap(new HashMap<String, SignalHandler>()); 85 receiverId = params; 86 } 87 97 this.receiverId = receiverId; 98 logger.info("Initializing signal receiver: id=" + receiverId); 99 } 100 /** 101 Close this signal receiver. 102 */ 88 103 public void close() 89 104 { 105 logger.info("Closing signal receiver: receiver id=" + receiverId); 90 106 if (handlers != null) handlers.clear(); 91 107 handlers = null; 92 108 } 93 109 110 /** 111 Register a signal handler with this receiver. 112 @param handler The signal handler to register 113 @return The global ID of the signal handler 114 */ 94 115 public String registerSignalHandler(SignalHandler handler) 95 116 { 96 String id = getSignalHandlerId(handler); 97 if (handlers != null) handlers.put(id, handler); 98 return id; 99 } 100 117 String globalId = getGlobalSignalId(handler); 118 String localId = getLocalSignalHandlerId(handler); 119 if (handlers != null) handlers.put(localId, handler); 120 logger.info("Register signal handler: recevier id = " + receiverId + 121 "; global id=" + globalId + "; local id=" + localId); 122 logger.debug("Current number of registered signal handlers: " + handlers.size()); 123 return globalId; 124 } 125 126 /** 127 Unregister a signal handler. 128 @param handler The signal handler to unregister 129 */ 101 130 public void unregisterSignalHandler(SignalHandler handler) 102 131 { 103 if (handlers != null) handlers.remove(getSignalHandlerId(handler)); 132 String localId = getLocalSignalHandlerId(handler); 133 logger.info("Unregister signal handler: recevier id = " + receiverId + 134 "; local id=" + localId); 135 if (handlers != null) handlers.remove(localId); 136 logger.debug("Current number of registered signal handlers: " + handlers.size()); 104 137 } 105 138 // ------------------------------------------- 106 139 107 140 /** 108 Generate a signal handlerID string. This string is returned by141 Generate a signal ID string. This string is returned by 109 142 the {@link #registerSignalHandler(SignalHandler)} method and is used 110 143 in {@link SignalTransporter#init(String)} method to initialise 111 a transporter object. 144 a transporter object so that it can send signals to the specified handler. 145 See the class documentation for a description of the format of the 146 generated string. 112 147 113 148 @param handler The signal handler to generate the ID for 114 149 @return The signal handler ID 115 150 */ 116 protected String get SignalHandlerId(SignalHandler handler)151 protected String getGlobalSignalId(SignalHandler handler) 117 152 { 118 153 StringBuilder sb = new StringBuilder(); 119 sb.append("signal://").append(receiverId).append("/"); 120 sb.append(System.identityHashCode(handler)).append("?"); 154 sb.append("signal://"); 155 sb.append(getLocalSignalHandlerId(handler)).append("@"); 156 sb.append(receiverId).append("/?"); 121 157 Collection<Signal> signals = handler.getSupportedSignals(); 122 158 if (signals != null) … … 134 170 135 171 /** 172 Get the local signal handler id of the given signal handler. 173 This implementation simply return the system hashcode for the 174 handler. 175 @param handler The handler to get the id for 176 @return The local handler id 177 */ 178 protected String getLocalSignalHandlerId(SignalHandler handler) 179 { 180 return String.valueOf(System.identityHashCode(handler)); 181 } 182 183 /** 136 184 Get the signal handler with a given ID. 137 @param handlerId The signal handler ID 185 @param localId The local signal handler ID as 186 returned by the {@link #getLocalSignalHandlerId(SignalHandler)} 187 method 138 188 @return The signal handler, or null if no handler is found 139 189 */ 140 protected SignalHandler getSignalHandler(String handlerId) 141 { 142 return handlers == null ? null : handlers.get(handlerId); 143 } 190 protected SignalHandler getSignalHandler(String localId) 191 { 192 return handlers == null ? null : handlers.get(localId); 193 } 194 195 /** 196 Process a signal message. If the message can't be understood or 197 if no handler can be found this method does nothing. The signal will 198 be delivered to the signal handler in the current thread. 199 @param message The message to process, the format of the message 200 must be compatible with the message that {@link 201 AbstractSignalTransporter#generateSignalMessage(Signal)} generates 202 */ 203 protected void processSignalMessage(String message) 204 { 205 logger.debug("Processing signal message: " + message); 206 if (message == null) return; 207 try 208 { 209 URI uri = new URI(message); 210 String localId = uri.getUserInfo(); 211 Signal signal = Signal.getSignal(uri.getQuery()); 212 SignalHandler signalHandler = getSignalHandler(localId); 213 if (signalHandler != null) 214 { 215 signalHandler.handleSignal(signal); 216 } 217 else 218 { 219 logger.warn("No signal handler found for id: " + localId); 220 } 221 } 222 catch (Exception ex) 223 { 224 // Ignore invalid messages 225 logger.warn("Could not process signal message: " + message, ex); 226 } 227 } 228 144 229 145 230 } -
trunk/src/core/net/sf/basedb/core/signal/AbstractSignalTransporter.java
r4073 r4074 30 30 31 31 /** 32 Abstract base class that is useful when implementing signal transporters. 32 33 33 34 @author nicklas … … 38 39 implements SignalTransporter 39 40 { 41 /** 42 Log signals processing. 43 */ 44 private static final org.apache.log4j.Logger logger = 45 org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.AbstractSignalTransporter"); 40 46 41 private String signalHandlerId;47 private String globalSignalId; 42 48 private URI signalHandlerURI; 43 49 private Collection<Signal> signals; … … 52 58 /** 53 59 Initialise the transporter. We expect the parameter to be the one 54 returned from {@link AbstractSignalReceiver#get SignalHandlerId(SignalHandler)}.60 returned from {@link AbstractSignalReceiver#getGlobalSignalId(SignalHandler)}. 55 61 If the receiver subclass overrided that method, the corresponding transporter 56 62 subclass should override this method. 63 @see AbstractSignalReceiver for a description of the format expected 57 64 */ 58 65 public void init(String params) 59 66 { 60 signalHandlerId = params; 67 logger.info("Initializing signal transporter: id=" + params); 68 globalSignalId = params; 61 69 } 62 63 70 64 71 public Collection<Signal> getSupportedSignals() … … 67 74 { 68 75 signals = new HashSet<Signal>(); 69 URI uri = getSignal HandlerURI();76 URI uri = getSignalURI(); 70 77 String query = uri.getQuery(); 71 78 if (query != null) … … 83 90 // ------------------------------------------- 84 91 85 protected String getSignalHandlerId() 92 /** 93 Generate a signal message string for the given signal. The string 94 will have the following format: 95 <code>signal://handlerId@receiverId/?signal</code> 96 @see AbstractSignalReceiver#processSignalMessage(String) 97 */ 98 protected String generateSignalMessage(Signal signal) 86 99 { 87 return signalHandlerId; 100 String message = "signal://" + getHandlerId() + "@" + getReceiverId() + "/?" + signal.getId(); 101 return message; 88 102 } 89 103 90 protected URI getSignalHandlerURI() 104 /** 105 Get the raw ID string that was passed to the {@link #init(String)} 106 method. 107 */ 108 protected String getGlobalSignalId() 109 { 110 return globalSignalId; 111 } 112 113 /** 114 Get the URI representation of the global signal ID. 115 @return An URI 116 @throws SignalException If the signal ID is not a valid URI. 117 */ 118 protected URI getSignalURI() 91 119 { 92 120 if (signalHandlerURI == null) … … 94 122 try 95 123 { 96 signalHandlerURI = new URI( signalHandlerId);124 signalHandlerURI = new URI(globalSignalId); 97 125 } 98 126 catch (URISyntaxException ex) 99 127 { 100 throw new SignalException( signalHandlerId, ex);128 throw new SignalException(globalSignalId, ex); 101 129 } 102 130 } … … 104 132 } 105 133 134 /** 135 Get the receiverId part of the signal URI. 136 @return The ID of the receiver 137 */ 106 138 protected String getReceiverId() 107 139 { 108 return getSignalHandlerURI().getAuthority(); 140 URI uri = getSignalURI(); 141 String receiverId = uri.getHost(); 142 if (uri.getPort() >= 0) receiverId += ":" + uri.getPort(); 143 return receiverId; 109 144 } 110 145 146 /** 147 Get the local handler ID part of the signal URI. 148 @return The local ID of the signal handler 149 */ 111 150 protected String getHandlerId() 112 151 { 113 return getSignal HandlerURI().getPath();152 return getSignalURI().getUserInfo(); 114 153 } 115 154 -
trunk/src/core/net/sf/basedb/core/signal/DelegatingSignalHandler.java
r4073 r4074 46 46 { 47 47 /** 48 Log signals processing. 49 */ 50 private static final org.apache.log4j.Logger logger = 51 org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.DelegatingSignalHandler"); 52 53 /** 48 54 Holds the registered signal handlers. 49 55 */ … … 51 57 52 58 /** 59 If a specific signal receiver must be used. 60 */ 61 private SignalReceiver signalReceiver; 62 63 /** 53 64 Create a new delegating signal handler. Signal handlers to 54 65 delegate to should be registered with 55 66 {@link #registerSignalHandler(SignalHandler)}. 56 67 */ 57 68 public DelegatingSignalHandler() 58 69 { 70 this(null); 71 } 72 73 /** 74 Create a new delegating signal handler using a specified signal 75 receiver. Signal handlers to delegate to should be registered with 76 {@link #registerSignalHandler(SignalHandler)}. 77 @param signalReceiver The signal receiver that should receive the signals, 78 or null to use the system default signal receiver 79 */ 80 public DelegatingSignalHandler(SignalReceiver signalReceiver) 81 { 59 82 this.handlers = new HashMap<Signal, Set<SignalHandler>>(); 83 this.signalReceiver = signalReceiver; 60 84 } 61 85 … … 90 114 public void handleSignal(Signal signal) 91 115 { 116 logger.debug("Got signal: " + signal); 92 117 Set<SignalHandler> all = handlers.get(signal); 93 if (all == null) throw new UnsupportedSignalException(signal); 118 if (all == null) 119 { 120 logger.debug("Signal not supported " + signal); 121 throw new UnsupportedSignalException(signal); 122 } 94 123 for (SignalHandler handler : all) 95 124 { 125 logger.debug("Sending signal " + signal + " to: " + handler); 96 126 handler.handleSignal(signal); 97 127 } 128 } 129 130 public SignalReceiver getSignalReceiver() 131 { 132 return signalReceiver; 98 133 } 99 134 // ------------------------------------------- … … 106 141 public void registerSignalHandler(SignalHandler handler) 107 142 { 143 logger.debug("Registering signal handler: " + handler); 108 144 for (Signal signal : handler.getSupportedSignals()) 109 145 { … … 124 160 public void unregisterSignalHandler(SignalHandler handler) 125 161 { 162 logger.debug("Unregistering signal handler: " + handler); 126 163 Iterator<Map.Entry<Signal, Set<SignalHandler>>> it = handlers.entrySet().iterator(); 127 164 while (it.hasNext()) -
trunk/src/core/net/sf/basedb/core/signal/LocalSignalReceiver.java
r4073 r4074 40 40 extends AbstractSignalReceiver 41 41 { 42 /** 43 Log signals processing. 44 */ 45 private static final org.apache.log4j.Logger logger = 46 org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.LocalSignalReceiver"); 42 47 48 /** 49 Holds all registered signal receivers. 50 */ 43 51 private static Map<String, LocalSignalReceiver> receivers = 44 52 Collections.synchronizedMap(new HashMap<String, LocalSignalReceiver>()); 45 53 46 54 /** 47 55 Get a signal receiver with a given ID. … … 88 96 } 89 97 98 /** 99 @return {@link LocalSignalTransporter} 100 */ 90 101 public Class<? extends SignalTransporter> getSignalTransporterClass() 91 102 { … … 94 105 // ------------------------------------------- 95 106 107 /** 108 Send the signal to a registered handler. If no handler with the 109 given ID is found this method does nothing. The signal will be 110 processed by the handler in the current thread. 111 @param handlerId The ID of a registered handler 112 @param signal The signal to send 113 */ 96 114 public void send(String handlerId, Signal signal) 97 115 { 116 logger.info("Receiving signal " + signal.getId() + " to " + handlerId); 98 117 SignalHandler handler = super.getSignalHandler(handlerId); 99 118 if (handler != null) handler.handleSignal(signal); -
trunk/src/core/net/sf/basedb/core/signal/LocalSignalTransporter.java
r4073 r4074 25 25 26 26 /** 27 A signal transporter implementation that can transport signals within 28 the local virtual machine only. The corresponding receiver class 29 is {@link LocalSignalReceiver}. 27 30 28 31 @author nicklas … … 33 36 extends AbstractSignalTransporter 34 37 { 38 /** 39 Log signals processing. 40 */ 41 private static final org.apache.log4j.Logger logger = 42 org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.LocalSignalTransporter"); 35 43 44 /** 45 Create a new signal transporter instance. 46 */ 36 47 public LocalSignalTransporter() 37 48 {} … … 41 52 ------------------------------------------- 42 53 */ 54 /** 55 Send the signal to a local signal receiver. This method will call 56 {@link LocalSignalReceiver#getSignalReceiver(String)} to find a registered 57 signal receiver and then call {@link LocalSignalReceiver#send(String, Signal)}. 58 The signal is sent and processed by the signal handler in the current thread. 59 */ 43 60 public void send(Signal signal) 44 61 { 62 logger.info("Sending signal " + signal.getId() + " to " + getGlobalSignalId()); 45 63 LocalSignalReceiver receiver = LocalSignalReceiver.getSignalReceiver(getReceiverId()); 46 if (receiver != null) receiver.send(get SignalHandlerId(), signal);64 if (receiver != null) receiver.send(getHandlerId(), signal); 47 65 } 48 66 // ------------------------------------------- -
trunk/src/core/net/sf/basedb/core/signal/Signal.java
r4073 r4074 55 55 Holds all registered signals. 56 56 */ 57 private static Map<String, Signal> signals = new HashMap<String, Signal>();57 private static Map<String, Signal> signals; 58 58 59 59 /** … … 69 69 public static synchronized Signal registerSignal(String id, String name, String description) 70 70 { 71 if (signals.containsKey(id)) 71 if (signals == null) signals = new HashMap<String, Signal>(); 72 Signal s = signals.get(id); 73 if (s == null) 72 74 { 73 return signals.get(id); 75 s = new Signal(id, name, description); 76 signals.put(id, s); 74 77 } 75 Signal s = new Signal(id, name, description);76 signals.put(id, s);77 78 return s; 78 79 } -
trunk/src/core/net/sf/basedb/core/signal/SignalHandler.java
r4073 r4074 33 33 in most cases notify the target about it. How this is done is up to each 34 34 implementation. 35 <p> 36 Signal handler implementations need to be partly thread safe. Once they have 37 been registered with a {@link SignalReceiver} they may receive multiple signals 38 in different threads at the same time. 35 39 36 40 @author nicklas … … 59 63 */ 60 64 public boolean supports(Signal signal); 65 66 /** 67 Return a specific signal receiever that must be used with this 68 signal handler. Null should be returned to let the system select 69 an appropriate signal receiver. If a non-null value is returned, the 70 system should use this signal receiver instead of the system default. 71 @return A signal receiver or null 72 */ 73 public SignalReceiver getSignalReceiver(); 74 61 75 } -
trunk/src/core/net/sf/basedb/core/signal/SignalReceiver.java
r4073 r4074 32 32 the {@link SocketSignalReceiver} and {@link SocketSignalTransporter} implementations 33 33 which uses network sockets to transport signals. 34 <p> 35 Signal receivers must be thread safe since a single receiver may be used to 36 handle signals for multiple jobs at the same time. 34 37 35 38 @author nicklas 36 39 @version 2.6 40 @see SignalTransporter 41 @see SignalHandler 37 42 @base.modified $Date$ 38 43 */ … … 75 80 @return A string that allows a transporter instance locate and send a signal to 76 81 the given signal handler 82 @see AbstractSignalReceiver 77 83 */ 78 84 public String registerSignalHandler(SignalHandler handler); -
trunk/src/core/net/sf/basedb/core/signal/SignalTransporter.java
r4073 r4074 38 38 and {@link SocketSignalTransporter} uses a {@link java.net.URI} that contains 39 39 the IP number and port of the receiver and an ID for identifying the signal handler. 40 <p> 41 Signal transporters need not be thread safe since a new instance is created 42 for each signal that is going to be sent. 40 43 41 44 @author nicklas -
trunk/src/core/net/sf/basedb/core/signal/SocketSignalReceiver.java
r4073 r4074 28 28 import java.net.InetSocketAddress; 29 29 import java.net.ServerSocket; 30 import java.net.Socket; 31 import java.net.UnknownHostException; 30 32 import java.nio.channels.ClosedByInterruptException; 31 33 import java.nio.channels.ServerSocketChannel; 32 34 import java.nio.channels.SocketChannel; 33 35 import java.util.HashSet; 36 import java.util.List; 37 import java.util.Set; 38 39 import net.sf.basedb.core.Application; 40 import net.sf.basedb.util.QueryParameters; 34 41 import net.sf.basedb.util.SocketUtil; 35 42 import net.sf.basedb.util.Values; … … 38 45 A signal receiver implementation that listens on a socket for incoming signals. 39 46 Accordingly, this receiver supports sending signals between different virtual 40 machines running on different servers. The initialisation parameter 41 for this should be the port number the receiver should listen on. If null, the 42 receiver will open a port number at random. 47 machines running on different servers. The initialisation string for this 48 class should be of the format: 49 50 <p> 51 <code>port=xx&allow=ip-address&allow=ip-address...</code> 52 53 <p> 54 where <code>port</code> is the port number the signal receiver will listen on and 55 the <code>allow</code> parts are the ip name or numbers of hosts that are allowed 56 to send signals to the receiver. Except for the special case <code>allow=*</code>, 57 which allows any remote host to send signals an exact match is required. The 58 local host is always allowed to send signals. 59 <p> 60 If no port is given, the signal receiver will randomly choose a free port 61 <p> 62 If no <code>allow</code> tags are given only allow connections from the 63 local host are allowed. 43 64 44 65 @author nicklas … … 49 70 extends AbstractSignalReceiver 50 71 { 72 /** 73 Log signals processing. 74 */ 75 private static final org.apache.log4j.Logger logger = 76 org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.SocketSignalReceiver"); 51 77 52 78 private InetAddress ip; 53 79 private int port; 54 80 private Thread listener; 81 private Set<InetAddress> allow; 82 private boolean allowAll; 55 83 56 84 /** 57 85 Create a new socket signal receiver. Before it can be used it must 58 86 be initialised with {@link #init(String)}. 59 87 */ 60 88 public SocketSignalReceiver() 61 89 {} … … 69 97 receiver should listen on. If the parameter can't be parsed as a numeric value, 70 98 a random free port will be used. This method will open the socket and 99 start a separate thread that listens for incoming signals. 71 100 */ 72 101 public void init(String params) 73 102 { 74 port = Values.getInt(params, 0); 75 103 QueryParameters qp = QueryParameters.parseQueryString(params); 104 // Get the port number we listen on 105 port = Values.getInt(qp.getValue("port"), 0); 106 107 // Get allowed hosts 108 allow = new HashSet<InetAddress>(); 109 allowAll = false; 110 List<String> allowIp = qp.getValues("allow"); 111 if (allowIp != null) 112 { 113 for (String ip : allowIp) 114 { 115 if ("*".equals(ip)) 116 { 117 logger.debug("Allow all hosts enabled!"); 118 allowAll = true; 119 } 120 else 121 { 122 try 123 { 124 logger.debug("Adding host " + ip + " to list of allowed hosts."); 125 allow.add(InetAddress.getByName(ip)); 126 } 127 catch (UnknownHostException ex) 128 { 129 logger.warn("Unknown host: " + ip, ex); 130 } 131 } 132 } 133 } 134 135 logger.info("Starting socket signal receiver on port: " + port); 76 136 try 77 137 { … … 81 141 port = socket.getLocalPort(); 82 142 ip = socket.getInetAddress(); 83 listener = new Thread(new ListenerThread(channel), 84 "ListenerThread."+this.toString()); 143 listener = new Thread(new ListenerThread(channel), "ListenerThread."+port); 85 144 listener.start(); 145 logger.info("Socket signal receiver is listening on port: " + port); 86 146 } 87 147 catch (IOException ex) 88 148 { 149 logger.error("Could not initialise socket signal receiver", ex); 89 150 throw new SignalException(ex); 90 151 } 91 super.init(ip.getHostAddress() + ":" + port); 92 } 93 94 152 super.init(Application.getHostName() + ":" + port); 153 } 154 155 /** 156 @return {@link SocketSignalTransporter} 157 */ 95 158 public Class<? extends SignalTransporter> getSignalTransporterClass() 96 159 { … … 98 161 } 99 162 163 /** 164 Close this receiver and the socket it is listening on. 165 */ 100 166 public void close() 101 167 { 168 logger.info("Interrupting socket signal receiver on port: " + port); 102 169 listener.interrupt(); 103 170 } … … 133 200 { 134 201 SocketChannel incoming = socket.accept(); 202 Socket inSocket = incoming.socket(); 203 InetAddress remoteHost = inSocket.getInetAddress(); 135 204 136 String cmd = SocketUtil.read(incoming.socket(), true); 137 String[] tmp = cmd.split("#"); 138 String signalHandlerId = tmp[0]; 139 Signal signal = Signal.getSignal(tmp[1]); 140 SignalHandler signalHandler = getSignalHandler(signalHandlerId); 141 signalHandler.handleSignal(signal); 205 logger.debug("Incoming signal: remote host=" + remoteHost); 206 207 // Check if we are allowed to accept connections from the remote host 208 if (allowAll || SocketUtil.isLocalHost(remoteHost) || allow.contains(remoteHost)) 209 { 210 // Read the incoming message 211 String message = SocketUtil.read(inSocket, true); 212 logger.debug("Incoming signal accepted: remote host=" + remoteHost + 213 "; message=" + message); 214 processSignalMessage(message); 215 } 216 else 217 { 218 logger.debug("Incoming signal rejected: remote host=" + remoteHost); 219 } 220 142 221 interrupted = Thread.interrupted(); 143 222 } 144 223 catch (ClosedByInterruptException ex) 145 224 { 225 logger.info("Shutting down socket signal receiver on port: " + port); 146 226 interrupted = true; 147 227 } 148 catch ( Throwable t)149 { 150 interrupted = true;228 catch (IOException ex) 229 { 230 logger.warn("Error on socket signal receiver on port: " + port, ex); 151 231 } 152 232 } -
trunk/src/core/net/sf/basedb/core/signal/SocketSignalTransporter.java
r4073 r4074 27 27 import java.net.URI; 28 28 29 import net.sf.basedb.core.BaseException;30 29 import net.sf.basedb.util.SocketUtil; 31 30 … … 40 39 { 41 40 41 /** 42 Log signals processing. 43 */ 44 private static final org.apache.log4j.Logger logger = 45 org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.SocketSignalTransporter"); 46 47 /** 48 Create a new socket signal transporter. Before it can be used it must 49 be initialised with {@link #init(String)}. 50 */ 51 public SocketSignalTransporter() 52 {} 53 42 54 /* 43 55 From the SignalTransporter interface 44 56 ------------------------------------------- 45 57 */ 58 /** 59 Connect to the remote host and send the signal. 60 */ 46 61 public void send(Signal signal) 47 62 { 48 URI uri = getSignalHandlerURI(); 49 String host = uri.getAuthority(); 63 logger.info("Sending signal " + signal.getId() + " to " + getGlobalSignalId()); 64 URI uri = getSignalURI(); 65 String host = uri.getHost(); 50 66 int port = uri.getPort(); 51 67 String message = generateSignalMessage(signal); 68 logger.debug("The message is: " + message); 69 Socket s = null; 52 70 try 53 71 { 54 Socket s = new Socket(host, port); 55 SocketUtil.send(s, getSignalHandlerId() + "#" + signal.getId(), true); 56 SocketUtil.close(s); 72 s = new Socket(host, port); 73 SocketUtil.send(s, message, true); 57 74 } 58 75 catch (Exception ex) 59 76 { 60 throw new BaseException(ex); 77 logger.error("Error sending signal " + signal.getId() + " to " + getGlobalSignalId(), ex); 78 throw new SignalException(ex); 61 79 } 62 80 finally 81 { 82 if (s != null) SocketUtil.close(s); 83 } 63 84 } 64 85 // ------------------------------------------- -
trunk/src/core/net/sf/basedb/core/signal/ThreadSignalHandler.java
r4073 r4074 78 78 { 79 79 80 /** 81 Log signals processing. 82 */ 83 private static final org.apache.log4j.Logger logger = 84 org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.ThreadSignalHandler"); 85 86 80 87 private static final Set<Signal> supported = Collections.singleton(Signal.ABORT); 81 88 … … 122 129 public void handleSignal(Signal signal) 123 130 { 131 logger.debug("Got signal: " + signal); 124 132 if (!workerThread.isAlive()) return; 125 133 if (Signal.ABORT.equals(signal)) 126 134 { 135 logger.debug("Sending signal " + signal + " to thread: " + workerThread); 127 136 workerThread.interrupt(); 128 137 } 129 138 else 130 139 { 140 logger.debug("Signal not supported " + signal); 131 141 throw new UnsupportedSignalException(signal); 132 142 } -
trunk/src/plugins/core/net/sf/basedb/plugins/AbstractFlatFileImporter.java
r4073 r4074 52 52 import net.sf.basedb.core.plugin.Response; 53 53 import net.sf.basedb.core.plugin.Plugin; 54 import net.sf.basedb.core.signal.SignalException; 54 55 import net.sf.basedb.core.signal.SignalHandler; 55 56 import net.sf.basedb.core.signal.SignalTarget; … … 598 599 } 599 600 // In case the server is shutting down... throw exception, rollback and quit 600 if (Thread.interrupted()) throw new BaseException("Thread was interrupted.");601 if (Thread.interrupted()) throw new SignalException("Aborted by user."); 601 602 } 602 603 } … … 641 642 } 642 643 // In case the server is shutting down... throw exception, rollback and quit 643 if (Thread.interrupted()) throw new BaseException("Thread was interrupted.");644 if (Thread.interrupted()) throw new SignalException("Aborted by user."); 644 645 } 645 646 } -
trunk/www/views/jobs/index.jsp
r3679 r4074 39 39 import="net.sf.basedb.core.PermissionDeniedException" 40 40 import="net.sf.basedb.core.ItemAlreadyExistsException" 41 import="net.sf.basedb.core.signal.SignalTransporter" 42 import="net.sf.basedb.core.signal.Signal" 41 43 import="net.sf.basedb.util.RemovableUtil" 42 44 import="net.sf.basedb.util.ShareableUtil" … … 238 240 redirect = viewPage; 239 241 } 242 else if ("AbortJob".equals(cmd)) 243 { 244 ItemContext cc = Base.getAndSetCurrentContext(sc, itemType, pageContext, defaultContext); 245 dc = sc.newDbControl(); 246 Job job = Job.getById(dc, cc.getId()); 247 if (job.getStatus() == Job.Status.WAITING) 248 { 249 job.doneError("Aborted by user"); 250 } 251 else 252 { 253 SignalTransporter signalTransporter = job.getSignalTransporter(); 254 if (signalTransporter != null) signalTransporter.send(Signal.ABORT); 255 } 256 dc.commit(); 257 Thread.sleep(500); 258 redirect = viewPage; 259 } 240 260 else 241 261 { -
trunk/www/views/jobs/view_job.jsp
r4003 r4074 47 47 import="net.sf.basedb.core.plugin.GuiContext" 48 48 import="net.sf.basedb.core.plugin.Plugin" 49 import="net.sf.basedb.core.signal.SignalTransporter" 50 import="net.sf.basedb.core.signal.Signal" 49 51 import="net.sf.basedb.clients.web.Base" 50 52 import="net.sf.basedb.clients.web.util.HTML" … … 57 59 import="java.util.List" 58 60 import="java.util.Collections" 61 import="java.util.Collection" 59 62 %> 60 63 <%@ taglib prefix="base" uri="/WEB-INF/base.tld" %> … … 103 106 Formatter<Date> dateFormatter = FormatterFactory.getDateFormatter(sc); 104 107 Formatter<Date> dateTimeFormatter = FormatterFactory.getDateTimeFormatter(sc); 108 109 // Check if the plug-in supports the "Abort" signal 110 boolean supportsAbort = status == Job.Status.WAITING; 111 if (status == Job.Status.EXECUTING) 112 { 113 try 114 { 115 SignalTransporter signalTransporter = job.getSignalTransporter(); 116 Collection<Signal> supportedSignals = signalTransporter != null ? 117 signalTransporter.getSupportedSignals() : null; 118 supportsAbort = supportedSignals == null || supportedSignals.contains(Signal.ABORT); 119 } 120 catch (Exception ex) 121 {} 122 } 105 123 %> 106 124 … … 114 132 { 115 133 setTimeout('location.reload()', 10000); 134 } 135 } 136 function abortJob() 137 { 138 if (confirm('Are you sure? This action may not be undone')) 139 { 140 location.href = 'index.jsp?ID=<%=ID%>&cmd=AbortJob&item_id=<%=itemId%>'; 116 141 } 117 142 } … … 289 314 </table> 290 315 </t:tab> 291 292 316 <% 293 317 if (job.getStackTrace() != null) … … 474 498 %> 475 499 <% 500 if (supportsAbort) 501 { 502 %> 503 <base:button onclick="abortJob()" title="Abort…" image="abort.png" /> 504 <% 505 } 506 %> 507 <% 476 508 if (job.getStatus() == Job.Status.ERROR && job.getJobType() == Job.Type.RUN_PLUGIN) 477 509 {
Note: See TracChangeset
for help on using the changeset viewer.