Changeset 5446


Ignore:
Timestamp:
Oct 15, 2010, 1:59:00 PM (12 years ago)
Author:
Nicklas Nordborg
Message:

References #1527: Improve remote control in job agents

This should provide everything we need. The real test is implementing #1528 so I'll keep this open until it is known to work.

Location:
trunk
Files:
3 added
8 edited

Legend:

Unmodified
Added
Removed
  • trunk/config/dist/jobagent.properties

    r4508 r5446  
    5454agent.allowremote.start=true
    5555agent.allowremote.pause=true
     56
     57# ==============================
     58# Custom remote control handlers
     59# ==============================
     60
     61## Handlers for custom remote control can be registered with one or more
     62## settings like the one below. Replace 'foo' with the actual name of the
     63## protocol. The class must implement the CustumRequestHandler
     64## interface. Requests are sent to the handler on the agent's remote control
     65## port: foo://more-custom-data .....
     66# agent.request-handler.foo = <class-name>
    5667
    5768
  • trunk/doc/src/docbook/appendix/jobagent.properties.xml

    r4889 r5446  
    187187    </variablelist>
    188188
     189  </simplesect>
     190
     191  <simplesect id="appendix.jobagent.properties.request">
     192    <title>Custom request handlers</title>
     193
     194    <variablelist>
     195    <varlistentry>
     196      <term><property>agent.request-handler.*</property></term>
     197      <listitem>
     198        <para>
     199          Optional. One or more entries for custom remote control handlers.
     200          The * should be replaced with the name of the protocol and the
     201          value should be the name of a class implementing the
     202          <interfacename docapi="net.sf.basedb.clients.jobagent.handlers">CustomRequestHandler</interfacename>
     203          interface. Requests can then be sent to the agent's remote control port on
     204          the form: <code>foo://custom-data....</code>.
     205        </para>
     206      </listitem>
     207    </varlistentry>
     208    </variablelist>
     209   
    189210  </simplesect>
    190211
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java

    r5399 r5446  
    3434
    3535import net.sf.basedb.clients.jobagent.executors.ProcessJobExecutor;
     36import net.sf.basedb.clients.jobagent.handlers.CustomRequestHandler;
    3637import net.sf.basedb.clients.jobagent.handlers.DefaultRequestHandler;
     38import net.sf.basedb.clients.jobagent.handlers.MultiProtocolRequestHandler;
     39import net.sf.basedb.clients.jobagent.handlers.SignalRequestHandler;
    3740import net.sf.basedb.core.Application;
    3841import net.sf.basedb.core.DbControl;
     
    133136  </tr>
    134137  <tr>
     138    <td>agent.request-handler.*</td>
     139    <td>-</td>
     140    <td>
     141      One or more entries that can be used to register custom remote control request handlers.
     142      The * should be replaced with the name of the protocol which means that requests
     143      should take the form: protocol://custom-data....
     144      <p>
     145      The value is the name of a class that implements the {@link CustomRequestHandler}
     146      interface. The implementation must provide a public no-argument constructor. The
     147      implementation must be thread-safe and be able to handle multiple requests at the
     148      time.
     149    </td>
     150  </tr>
     151  <tr>
    135152    <td>agent.executor.class</td>
    136153    <td>{@link ProcessJobExecutor}</td>
     
    236253  private final boolean allowRemotePause;
    237254  private final boolean allowRemoteStart;
     255  private final Map<String, Class<? extends CustomRequestHandler>> customRequestHandlerClasses;
    238256 
    239257  // Job execution settings
     
    243261  private InetAddress serverAddress;
    244262  private JobAgentServerConnection server;
    245   private RequestHandler requestHandler;
     263  private MultiProtocolRequestHandler requestHandler;
    246264  private AgentSignalReceiver signalReceiver;
    247265  private JobExecutor jobExecutor;
     
    306324      }
    307325    }
    308    
     326
    309327    // BASE settings
    310328    this.login = properties.getProperty("agent.user");
     
    320338    this.allowRemotePause = Values.getBoolean(properties.getProperty("agent.allowremote.pause"), true);
    321339    this.allowRemoteStart = Values.getBoolean(properties.getProperty("agent.allowremote.start"), true);
     340   
     341    // Additional request handlers
     342    this.customRequestHandlerClasses = getCustomRequestHandlerClasses();
    322343   
    323344    // Job execution settings
     
    464485  }
    465486 
     487  @SuppressWarnings("unchecked")
     488  private Map<String, Class<? extends CustomRequestHandler>> getCustomRequestHandlerClasses()
     489  {
     490    Map<String, Class<? extends CustomRequestHandler>> handlers = new HashMap<String, Class<? extends CustomRequestHandler>>();
     491    for (String property : properties.stringPropertyNames())
     492    {
     493      if (property.startsWith("agent.request-handler."))
     494      {
     495        String protocol = property.substring("agent.request-handler.".length());
     496        String className = properties.getProperty(property);
     497        try
     498        {
     499          Class<? extends CustomRequestHandler> clazz = (Class<? extends CustomRequestHandler>)Class.forName(className);
     500          if (!CustomRequestHandler.class.isAssignableFrom(clazz))
     501          {
     502            log.warn("Class '" + className + "' doesn't implement the CustomRequestHandler interface, ignored!");
     503          }
     504          else
     505          {
     506            log.info("Loaded custom request handler for protocol '" + protocol + "': " + clazz.getName());
     507            handlers.put(protocol, clazz);
     508          }
     509        }
     510        catch (Throwable t)
     511        {
     512          log.warn("Class " + className + " not found, ignored!");
     513        }
     514      }
     515    }
     516    return handlers;
     517  }
     518 
     519  private void registerCustomRequestHandlers(MultiProtocolRequestHandler master)
     520  {
     521    Map<Class<? extends CustomRequestHandler>, CustomRequestHandler> created =
     522      new HashMap<Class<? extends CustomRequestHandler>, CustomRequestHandler>();
     523    for (Map.Entry<String, Class<? extends CustomRequestHandler>> entry : customRequestHandlerClasses.entrySet())
     524    {
     525      String protocol = entry.getKey();
     526      Class<? extends CustomRequestHandler> clazz = entry.getValue();
     527      CustomRequestHandler handler = created.get(clazz);
     528      if (handler == null)
     529      {
     530        try
     531        {
     532          handler = clazz.newInstance();
     533          handler.init(this);
     534          created.put(clazz, handler);
     535          log.info("Created request handler for protocol '" + protocol + ": " + clazz.getName());
     536        }
     537        catch (Throwable t)
     538        {
     539          log.error("Could not create request handler for protocol '" + protocol + ": " + clazz.getName(), t);
     540        }
     541      }
     542      master.registerProtocols(handler, protocol);
     543    }
     544  }
     545 
    466546  /**
    467547    Get the class object for the configured job executor. If the
     
    520600    Note! The listener service is started in a separate thread and this method
    521601    returns as soon as the network connections are set up.
    522    
    523     @param requestHandler A {@link RequestHandler} that handles the
     602    <p>
     603    Note! The default handler supplied as an argument is used as a fallback
     604    handler for unregistered protocols. Additional request handlers can be
     605    set up by calling {@link #registerRequestHandler(RequestHandler, String...)}.
     606   
     607    @param defaultHandler A {@link RequestHandler} that handles the
    524608      incoming requsts, or null to use the {@link DefaultRequestHandler}
    525609    @throws IOException If there is an error when starting the service
    526610  */
    527   public synchronized void service(RequestHandler requestHandler)
     611  public synchronized void service(RequestHandler defaultHandler)
    528612    throws IOException
    529613  {
     
    533617      this.signalReceiver = new AgentSignalReceiver(this);
    534618      this.signalReceiver.init(null);
    535       this.requestHandler = requestHandler == null ? new DefaultRequestHandler(this) : requestHandler;
     619      if (defaultHandler == null) defaultHandler = new DefaultRequestHandler(this);
     620      this.requestHandler = new MultiProtocolRequestHandler(this, defaultHandler);
     621      registerRequestHandler(new SignalRequestHandler(this), "signal");
     622      registerCustomRequestHandlers(requestHandler);
    536623      this.server = new JobAgentServerConnection(port, this.requestHandler, logServer);
    537624      server.open();
     
    687774    }
    688775    return allow;
     776  }
     777 
     778  /**
     779    Register a request handler for one or more protocols. See
     780    {@link MultiProtocolRequestHandler} for more information.
     781    Calls to this method is ignored if the job agent has no listener
     782    service. See {@link #service(RequestHandler)}.
     783   
     784    @param handler The request handler
     785    @param protocols The name of the protocols the handler should handle
     786    @since 2.16
     787  */
     788  public void registerRequestHandler(RequestHandler handler, String... protocols)
     789  {
     790    if (this.requestHandler == null) return;
     791    if (handler == null) throw new NullPointerException("handler");
     792    requestHandler.registerProtocols(handler, protocols);
     793  }
     794
     795  /**
     796    Unregister one or more protocols. See
     797    {@link MultiProtocolRequestHandler} for more information.
     798    Calls to this method is ignored if the job agent has no listener
     799    service. See {@link #service(RequestHandler)}.
     800   
     801    @param protocols The name of the protocols the handler should handle
     802    @since 2.16
     803  */
     804  public void unregisterRequestHandler(String... protocols)
     805  {
     806    if (this.requestHandler == null) return;
     807    requestHandler.unregisterProtocols(protocols);
    689808  }
    690809 
     
    858977      server = null;
    859978    }
     979    if (requestHandler != null)
     980    {
     981      requestHandler.close();
     982      requestHandler = null;
     983    }
    860984  }
    861985 
     
    9371061    // Send SHUTDOWN/ABORT to all jobs, that support signals
    9381062    signalReceiver.close(closeTimeout);
     1063    signalReceiver = null;
     1064    unregisterRequestHandler("signal");
    9391065  }
    9401066 
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/AgentController.java

    r4512 r5446  
    2929import java.util.Properties;
    3030
     31import net.sf.basedb.clients.jobagent.handlers.DefaultRequestHandler;
    3132import net.sf.basedb.core.DbControl;
    3233import net.sf.basedb.core.ItemNotFoundException;
     
    238239        log.info("Creating a new job agent on port " + port);
    239240        Agent agent = createAgent(properties);
    240         agent.service(null); // Start listening for incoming connections
     241        agent.service(new DefaultRequestHandler(agent)); // Start listening for incoming connections
    241242      }
    242243      else
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/DefaultRequestHandler.java

    r4512 r5446  
    6767    registerHandler(new StopRequestHandler(agent), "stop");
    6868    registerHandler(new PauseRequestHandler(agent), "pause");
    69     if (agent.getSignalReceiver() != null)
    70     {
    71       registerHandler(new SignalRequestHandler(agent), "signal");
    72     }
    7369  }
    7470 
     
    8076    }
    8177  }
    82  
     78  /*
     79    From the RequestHandler interface
     80    ---------------------------------
     81  */
     82  @Override
    8383  public String handleCmd(Socket incoming, String cmd)
    8484  {
     
    9191    String answer = null;
    9292    RequestHandler handler = commandHandlers.get(cmd);
    93     if (handler == null && cmd != null && cmd.startsWith("signal://"))
    94     {
    95       handler = commandHandlers.get("signal");
    96     }
    97     if (!agent.isAllowedControl(remote, cmd))
    98     {
    99       answer = "FAILED Permission denied: cmd=" + cmd + "; host=" + remote.toString();
    100     }
    101     else if (handler == null)
     93    if (handler == null)
    10294    {
    10395      answer = "FAILED Unknown command: " + cmd;
     
    113105    return answer;
    114106  }
     107  // --------------------------------------------------
    115108}
  • trunk/src/core/net/sf/basedb/core/signal/SocketSignalTransporter.java

    r4516 r5446  
    7171      s = new Socket(host, port);
    7272      SocketUtil.send(s, message, true);
     73      SocketUtil.read(s, true); // Consume any response
    7374    }
    7475    catch (Exception ex)
  • trunk/src/core/net/sf/basedb/util/jobagent/JobAgentConnection.java

    r4515 r5446  
    133133 
    134134  /**
    135     Convenience method for opening a new socket, sending a command and
    136     return the answer.
     135    Send a remote control command to the job agent.
    137136   
    138137    @param cmd The command to send
    139138    @return The answer
    140139    @throws IOException If there is an error
    141    */
    142   private String send(String cmd)
     140    @since 2.16 (was private before that)
     141  */
     142  public String send(String cmd)
    143143    throws IOException
    144144  {
  • trunk/src/test/TestJobAgent.java

    r5340 r5446  
    2828
    2929import net.sf.basedb.clients.jobagent.Agent;
     30import net.sf.basedb.clients.jobagent.handlers.AbstractCustomRequestHandler;
    3031import net.sf.basedb.clients.jobagent.handlers.DefaultRequestHandler;
    3132
     
    8485    test_list_jobs(id, 1);
    8586   
     87    // Extra test: send custom control command
     88    test_send_control_command(id, "foo://hello");
     89    test_send_control_command(id2, "foo://world");
     90   
    8691    if (TestUtil.waitBeforeDelete()) TestUtil.waitForEnter();
    8792    TestJob.test_delete(jobId);
     
    415420  }
    416421 
    417   static void create_fake_jobagent(final int port, String externalId)
    418   {
    419     try
    420     {
    421       // The request handler for incoming requests
     422  static void create_fake_jobagent(final int port, final String externalId)
     423  {
     424    try
     425    {
    422426      Properties p = new Properties();
    423427      p.setProperty("agent.port", Integer.toString(port));
     
    425429      p.setProperty("agent.password", TestUtil.getPassword());
    426430      p.setProperty("agent.id", externalId);
     431      p.setProperty("agent.request-handler.foo", FooRequestHandler.class.getName());
    427432      Agent agent = new Agent(p);
    428433      RequestHandler requestHandler = new DefaultRequestHandler(agent)
     
    430435        public String handleCmd(Socket incoming, String cmd)
    431436        {
    432           if (!TestUtil.getSilent()) write("--Job agent received cmd: " + cmd + " (port="+port+")");
     437          if (!TestUtil.getSilent()) write("--Job agent received cmd: " + cmd + " (id=" + externalId + "; port="+port+")");
    433438          String answer = super.handleCmd(incoming, cmd);
    434439          if (!TestUtil.getSilent()) write("--Job agent answers: " + answer);
     
    437442      };
    438443      agent.service(requestHandler); // NOTE! Creates new thread for the listener
    439       write("--Create fake job agent OK (port " + port + ")");
     444      write("--Create fake job agent OK (id=" + externalId + "; port " + port + ")");
    440445    }
    441446    catch (Throwable t)
    442447    {
    443       write("--Create fake job agent FAILED (port " + port + ")");
     448      write("--Create fake job agent FAILED (id=" + externalId + "; port " + port + ")");
    444449      t.printStackTrace();
    445450      ok = false;
     
    468473    }
    469474  }
     475 
     476  static void test_send_control_command(int jobAgentId, String cmd)
     477  {
     478    if (jobAgentId == 0) return;
     479    DbControl dc = null;
     480    try
     481    {
     482      dc = TestUtil.getDbControl();
     483      JobAgent j = JobAgent.getById(dc, jobAgentId);
     484      JobAgentConnection conn = j.getConnection(null);
     485      String answer = conn.send(cmd);
     486      write("--Send custom command OK (" + cmd + " --> " + answer + ")");
     487    }
     488    catch (Throwable ex)
     489    {
     490      write("--Send custom command FAILED (" + cmd + ")");
     491      ex.printStackTrace();
     492      ok = false;
     493    }
     494    finally
     495    {
     496      if (dc != null) dc.close();
     497    }
     498  }
     499
     500 
     501  public static class FooRequestHandler
     502    extends AbstractCustomRequestHandler
     503  {
     504 
     505    public FooRequestHandler()
     506    {}
     507
     508    @Override
     509    public void init(Agent agent)
     510    {
     511      super.init(agent);
     512      if (!TestUtil.getSilent())
     513      {
     514        write("--Initializing 'foo' handler for job agent (id=" + agent.getId() + "; port=" + agent.getPort() + ")");
     515      }
     516    }
     517    @Override
     518    public void close()
     519    {
     520      Agent agent = getAgent();
     521      int port = agent.getPort();
     522      String id = agent.getId();
     523      if (!TestUtil.getSilent())
     524      {
     525        write("--Closing 'foo' handler for job agent (id=" + id + "; port=" + port + ")");
     526      }
     527      super.close();
     528    }
     529   
     530    @Override
     531    public String handleCmd(Socket socket, String cmd)
     532    {
     533      Agent agent = getAgent();
     534      int port = agent.getPort();
     535      String id = agent.getId();
     536      if (!TestUtil.getSilent())
     537      {
     538        write("--Job agent received cmd: " + cmd + " (id=" + id + "; port="+port+")");
     539      }
     540      return "OK " + cmd;
     541    }
     542   
     543  }
     544 
    470545}
Note: See TracChangeset for help on using the changeset viewer.