Changeset 5446
- Timestamp:
- Oct 15, 2010, 1:59:00 PM (12 years ago)
- Location:
- trunk
- Files:
-
- 3 added
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/config/dist/jobagent.properties
r4508 r5446 54 54 agent.allowremote.start=true 55 55 agent.allowremote.pause=true 56 57 # ============================== 58 # Custom remote control handlers 59 # ============================== 60 61 ## Handlers for custom remote control can be registered with one or more 62 ## settings like the one below. Replace 'foo' with the actual name of the 63 ## protocol. The class must implement the CustumRequestHandler 64 ## interface. Requests are sent to the handler on the agent's remote control 65 ## port: foo://more-custom-data ..... 66 # agent.request-handler.foo = <class-name> 56 67 57 68 -
trunk/doc/src/docbook/appendix/jobagent.properties.xml
r4889 r5446 187 187 </variablelist> 188 188 189 </simplesect> 190 191 <simplesect id="appendix.jobagent.properties.request"> 192 <title>Custom request handlers</title> 193 194 <variablelist> 195 <varlistentry> 196 <term><property>agent.request-handler.*</property></term> 197 <listitem> 198 <para> 199 Optional. One or more entries for custom remote control handlers. 200 The * should be replaced with the name of the protocol and the 201 value should be the name of a class implementing the 202 <interfacename docapi="net.sf.basedb.clients.jobagent.handlers">CustomRequestHandler</interfacename> 203 interface. Requests can then be sent to the agent's remote control port on 204 the form: <code>foo://custom-data....</code>. 205 </para> 206 </listitem> 207 </varlistentry> 208 </variablelist> 209 189 210 </simplesect> 190 211 -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java
r5399 r5446 34 34 35 35 import net.sf.basedb.clients.jobagent.executors.ProcessJobExecutor; 36 import net.sf.basedb.clients.jobagent.handlers.CustomRequestHandler; 36 37 import net.sf.basedb.clients.jobagent.handlers.DefaultRequestHandler; 38 import net.sf.basedb.clients.jobagent.handlers.MultiProtocolRequestHandler; 39 import net.sf.basedb.clients.jobagent.handlers.SignalRequestHandler; 37 40 import net.sf.basedb.core.Application; 38 41 import net.sf.basedb.core.DbControl; … … 133 136 </tr> 134 137 <tr> 138 <td>agent.request-handler.*</td> 139 <td>-</td> 140 <td> 141 One or more entries that can be used to register custom remote control request handlers. 142 The * should be replaced with the name of the protocol which means that requests 143 should take the form: protocol://custom-data.... 144 <p> 145 The value is the name of a class that implements the {@link CustomRequestHandler} 146 interface. The implementation must provide a public no-argument constructor. The 147 implementation must be thread-safe and be able to handle multiple requests at the 148 time. 149 </td> 150 </tr> 151 <tr> 135 152 <td>agent.executor.class</td> 136 153 <td>{@link ProcessJobExecutor}</td> … … 236 253 private final boolean allowRemotePause; 237 254 private final boolean allowRemoteStart; 255 private final Map<String, Class<? extends CustomRequestHandler>> customRequestHandlerClasses; 238 256 239 257 // Job execution settings … … 243 261 private InetAddress serverAddress; 244 262 private JobAgentServerConnection server; 245 private RequestHandler requestHandler;263 private MultiProtocolRequestHandler requestHandler; 246 264 private AgentSignalReceiver signalReceiver; 247 265 private JobExecutor jobExecutor; … … 306 324 } 307 325 } 308 326 309 327 // BASE settings 310 328 this.login = properties.getProperty("agent.user"); … … 320 338 this.allowRemotePause = Values.getBoolean(properties.getProperty("agent.allowremote.pause"), true); 321 339 this.allowRemoteStart = Values.getBoolean(properties.getProperty("agent.allowremote.start"), true); 340 341 // Additional request handlers 342 this.customRequestHandlerClasses = getCustomRequestHandlerClasses(); 322 343 323 344 // Job execution settings … … 464 485 } 465 486 487 @SuppressWarnings("unchecked") 488 private Map<String, Class<? extends CustomRequestHandler>> getCustomRequestHandlerClasses() 489 { 490 Map<String, Class<? extends CustomRequestHandler>> handlers = new HashMap<String, Class<? extends CustomRequestHandler>>(); 491 for (String property : properties.stringPropertyNames()) 492 { 493 if (property.startsWith("agent.request-handler.")) 494 { 495 String protocol = property.substring("agent.request-handler.".length()); 496 String className = properties.getProperty(property); 497 try 498 { 499 Class<? extends CustomRequestHandler> clazz = (Class<? extends CustomRequestHandler>)Class.forName(className); 500 if (!CustomRequestHandler.class.isAssignableFrom(clazz)) 501 { 502 log.warn("Class '" + className + "' doesn't implement the CustomRequestHandler interface, ignored!"); 503 } 504 else 505 { 506 log.info("Loaded custom request handler for protocol '" + protocol + "': " + clazz.getName()); 507 handlers.put(protocol, clazz); 508 } 509 } 510 catch (Throwable t) 511 { 512 log.warn("Class " + className + " not found, ignored!"); 513 } 514 } 515 } 516 return handlers; 517 } 518 519 private void registerCustomRequestHandlers(MultiProtocolRequestHandler master) 520 { 521 Map<Class<? extends CustomRequestHandler>, CustomRequestHandler> created = 522 new HashMap<Class<? extends CustomRequestHandler>, CustomRequestHandler>(); 523 for (Map.Entry<String, Class<? extends CustomRequestHandler>> entry : customRequestHandlerClasses.entrySet()) 524 { 525 String protocol = entry.getKey(); 526 Class<? extends CustomRequestHandler> clazz = entry.getValue(); 527 CustomRequestHandler handler = created.get(clazz); 528 if (handler == null) 529 { 530 try 531 { 532 handler = clazz.newInstance(); 533 handler.init(this); 534 created.put(clazz, handler); 535 log.info("Created request handler for protocol '" + protocol + ": " + clazz.getName()); 536 } 537 catch (Throwable t) 538 { 539 log.error("Could not create request handler for protocol '" + protocol + ": " + clazz.getName(), t); 540 } 541 } 542 master.registerProtocols(handler, protocol); 543 } 544 } 545 466 546 /** 467 547 Get the class object for the configured job executor. If the … … 520 600 Note! The listener service is started in a separate thread and this method 521 601 returns as soon as the network connections are set up. 522 523 @param requestHandler A {@link RequestHandler} that handles the 602 <p> 603 Note! The default handler supplied as an argument is used as a fallback 604 handler for unregistered protocols. Additional request handlers can be 605 set up by calling {@link #registerRequestHandler(RequestHandler, String...)}. 606 607 @param defaultHandler A {@link RequestHandler} that handles the 524 608 incoming requsts, or null to use the {@link DefaultRequestHandler} 525 609 @throws IOException If there is an error when starting the service 526 610 */ 527 public synchronized void service(RequestHandler requestHandler)611 public synchronized void service(RequestHandler defaultHandler) 528 612 throws IOException 529 613 { … … 533 617 this.signalReceiver = new AgentSignalReceiver(this); 534 618 this.signalReceiver.init(null); 535 this.requestHandler = requestHandler == null ? new DefaultRequestHandler(this) : requestHandler; 619 if (defaultHandler == null) defaultHandler = new DefaultRequestHandler(this); 620 this.requestHandler = new MultiProtocolRequestHandler(this, defaultHandler); 621 registerRequestHandler(new SignalRequestHandler(this), "signal"); 622 registerCustomRequestHandlers(requestHandler); 536 623 this.server = new JobAgentServerConnection(port, this.requestHandler, logServer); 537 624 server.open(); … … 687 774 } 688 775 return allow; 776 } 777 778 /** 779 Register a request handler for one or more protocols. See 780 {@link MultiProtocolRequestHandler} for more information. 781 Calls to this method is ignored if the job agent has no listener 782 service. See {@link #service(RequestHandler)}. 783 784 @param handler The request handler 785 @param protocols The name of the protocols the handler should handle 786 @since 2.16 787 */ 788 public void registerRequestHandler(RequestHandler handler, String... protocols) 789 { 790 if (this.requestHandler == null) return; 791 if (handler == null) throw new NullPointerException("handler"); 792 requestHandler.registerProtocols(handler, protocols); 793 } 794 795 /** 796 Unregister one or more protocols. See 797 {@link MultiProtocolRequestHandler} for more information. 798 Calls to this method is ignored if the job agent has no listener 799 service. See {@link #service(RequestHandler)}. 800 801 @param protocols The name of the protocols the handler should handle 802 @since 2.16 803 */ 804 public void unregisterRequestHandler(String... protocols) 805 { 806 if (this.requestHandler == null) return; 807 requestHandler.unregisterProtocols(protocols); 689 808 } 690 809 … … 858 977 server = null; 859 978 } 979 if (requestHandler != null) 980 { 981 requestHandler.close(); 982 requestHandler = null; 983 } 860 984 } 861 985 … … 937 1061 // Send SHUTDOWN/ABORT to all jobs, that support signals 938 1062 signalReceiver.close(closeTimeout); 1063 signalReceiver = null; 1064 unregisterRequestHandler("signal"); 939 1065 } 940 1066 -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/AgentController.java
r4512 r5446 29 29 import java.util.Properties; 30 30 31 import net.sf.basedb.clients.jobagent.handlers.DefaultRequestHandler; 31 32 import net.sf.basedb.core.DbControl; 32 33 import net.sf.basedb.core.ItemNotFoundException; … … 238 239 log.info("Creating a new job agent on port " + port); 239 240 Agent agent = createAgent(properties); 240 agent.service(n ull); // Start listening for incoming connections241 agent.service(new DefaultRequestHandler(agent)); // Start listening for incoming connections 241 242 } 242 243 else -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/DefaultRequestHandler.java
r4512 r5446 67 67 registerHandler(new StopRequestHandler(agent), "stop"); 68 68 registerHandler(new PauseRequestHandler(agent), "pause"); 69 if (agent.getSignalReceiver() != null)70 {71 registerHandler(new SignalRequestHandler(agent), "signal");72 }73 69 } 74 70 … … 80 76 } 81 77 } 82 78 /* 79 From the RequestHandler interface 80 --------------------------------- 81 */ 82 @Override 83 83 public String handleCmd(Socket incoming, String cmd) 84 84 { … … 91 91 String answer = null; 92 92 RequestHandler handler = commandHandlers.get(cmd); 93 if (handler == null && cmd != null && cmd.startsWith("signal://")) 94 { 95 handler = commandHandlers.get("signal"); 96 } 97 if (!agent.isAllowedControl(remote, cmd)) 98 { 99 answer = "FAILED Permission denied: cmd=" + cmd + "; host=" + remote.toString(); 100 } 101 else if (handler == null) 93 if (handler == null) 102 94 { 103 95 answer = "FAILED Unknown command: " + cmd; … … 113 105 return answer; 114 106 } 107 // -------------------------------------------------- 115 108 } -
trunk/src/core/net/sf/basedb/core/signal/SocketSignalTransporter.java
r4516 r5446 71 71 s = new Socket(host, port); 72 72 SocketUtil.send(s, message, true); 73 SocketUtil.read(s, true); // Consume any response 73 74 } 74 75 catch (Exception ex) -
trunk/src/core/net/sf/basedb/util/jobagent/JobAgentConnection.java
r4515 r5446 133 133 134 134 /** 135 Convenience method for opening a new socket, sending a command and 136 return the answer. 135 Send a remote control command to the job agent. 137 136 138 137 @param cmd The command to send 139 138 @return The answer 140 139 @throws IOException If there is an error 141 */ 142 private String send(String cmd) 140 @since 2.16 (was private before that) 141 */ 142 public String send(String cmd) 143 143 throws IOException 144 144 { -
trunk/src/test/TestJobAgent.java
r5340 r5446 28 28 29 29 import net.sf.basedb.clients.jobagent.Agent; 30 import net.sf.basedb.clients.jobagent.handlers.AbstractCustomRequestHandler; 30 31 import net.sf.basedb.clients.jobagent.handlers.DefaultRequestHandler; 31 32 … … 84 85 test_list_jobs(id, 1); 85 86 87 // Extra test: send custom control command 88 test_send_control_command(id, "foo://hello"); 89 test_send_control_command(id2, "foo://world"); 90 86 91 if (TestUtil.waitBeforeDelete()) TestUtil.waitForEnter(); 87 92 TestJob.test_delete(jobId); … … 415 420 } 416 421 417 static void create_fake_jobagent(final int port, String externalId) 418 { 419 try 420 { 421 // The request handler for incoming requests 422 static void create_fake_jobagent(final int port, final String externalId) 423 { 424 try 425 { 422 426 Properties p = new Properties(); 423 427 p.setProperty("agent.port", Integer.toString(port)); … … 425 429 p.setProperty("agent.password", TestUtil.getPassword()); 426 430 p.setProperty("agent.id", externalId); 431 p.setProperty("agent.request-handler.foo", FooRequestHandler.class.getName()); 427 432 Agent agent = new Agent(p); 428 433 RequestHandler requestHandler = new DefaultRequestHandler(agent) … … 430 435 public String handleCmd(Socket incoming, String cmd) 431 436 { 432 if (!TestUtil.getSilent()) write("--Job agent received cmd: " + cmd + " ( port="+port+")");437 if (!TestUtil.getSilent()) write("--Job agent received cmd: " + cmd + " (id=" + externalId + "; port="+port+")"); 433 438 String answer = super.handleCmd(incoming, cmd); 434 439 if (!TestUtil.getSilent()) write("--Job agent answers: " + answer); … … 437 442 }; 438 443 agent.service(requestHandler); // NOTE! Creates new thread for the listener 439 write("--Create fake job agent OK ( port " + port + ")");444 write("--Create fake job agent OK (id=" + externalId + "; port " + port + ")"); 440 445 } 441 446 catch (Throwable t) 442 447 { 443 write("--Create fake job agent FAILED ( port " + port + ")");448 write("--Create fake job agent FAILED (id=" + externalId + "; port " + port + ")"); 444 449 t.printStackTrace(); 445 450 ok = false; … … 468 473 } 469 474 } 475 476 static void test_send_control_command(int jobAgentId, String cmd) 477 { 478 if (jobAgentId == 0) return; 479 DbControl dc = null; 480 try 481 { 482 dc = TestUtil.getDbControl(); 483 JobAgent j = JobAgent.getById(dc, jobAgentId); 484 JobAgentConnection conn = j.getConnection(null); 485 String answer = conn.send(cmd); 486 write("--Send custom command OK (" + cmd + " --> " + answer + ")"); 487 } 488 catch (Throwable ex) 489 { 490 write("--Send custom command FAILED (" + cmd + ")"); 491 ex.printStackTrace(); 492 ok = false; 493 } 494 finally 495 { 496 if (dc != null) dc.close(); 497 } 498 } 499 500 501 public static class FooRequestHandler 502 extends AbstractCustomRequestHandler 503 { 504 505 public FooRequestHandler() 506 {} 507 508 @Override 509 public void init(Agent agent) 510 { 511 super.init(agent); 512 if (!TestUtil.getSilent()) 513 { 514 write("--Initializing 'foo' handler for job agent (id=" + agent.getId() + "; port=" + agent.getPort() + ")"); 515 } 516 } 517 @Override 518 public void close() 519 { 520 Agent agent = getAgent(); 521 int port = agent.getPort(); 522 String id = agent.getId(); 523 if (!TestUtil.getSilent()) 524 { 525 write("--Closing 'foo' handler for job agent (id=" + id + "; port=" + port + ")"); 526 } 527 super.close(); 528 } 529 530 @Override 531 public String handleCmd(Socket socket, String cmd) 532 { 533 Agent agent = getAgent(); 534 int port = agent.getPort(); 535 String id = agent.getId(); 536 if (!TestUtil.getSilent()) 537 { 538 write("--Job agent received cmd: " + cmd + " (id=" + id + "; port="+port+")"); 539 } 540 return "OK " + cmd; 541 } 542 543 } 544 470 545 }
Note: See TracChangeset
for help on using the changeset viewer.