Ignore:
Timestamp:
Sep 12, 2006, 2:24:28 PM (16 years ago)
Author:
Nicklas Nordborg
Message:

References #351: External job server usage

Communincation with the job agent is now working fairly well. The job agent can check for new
jobs but not start them. Need to add more error handling and logging as well.

Added more utility classes to the net.sf.basedb.core.util package and subpackages that may
be useful for other client applications in the future.

Location:
trunk/src/clients/jobagent
Files:
10 added
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java

    r2632 r2634  
    2222  Boston, MA  02111-1307, USA.
    2323*/
    24 package sf.basedb.clients.jobagent;
    25 
     24package net.sf.basedb.clients.jobagent;
     25
     26import java.io.IOException;
     27import java.net.InetAddress;
    2628import java.util.Properties;
    27 
    28 import net.sf.basedb.util.jobagent.JobAgentInfo;
     29import java.util.TimerTask;
     30
     31import net.sf.basedb.clients.jobagent.handlers.DefaultRequestHandler;
     32import net.sf.basedb.core.Application;
     33import net.sf.basedb.core.DbControl;
     34import net.sf.basedb.core.Job;
     35import net.sf.basedb.core.JobAgent;
     36import net.sf.basedb.core.SessionControl;
     37
     38import net.sf.basedb.util.SocketUtil;
     39import net.sf.basedb.util.Values;
    2940import net.sf.basedb.util.jobagent.JobAgentServerConnection;
    3041import net.sf.basedb.util.jobagent.RequestHandler;
     42import net.sf.basedb.util.timer.Scheduler;
    3143
    3244/**
     
    3951public class Agent
    4052{
     53
     54  /**
     55    Log job agent events.
     56  */
     57  private static final org.apache.log4j.Logger log =
     58    org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.Agent");
     59 
    4160  private final String login;
    4261  private final String password;
     
    4564  private final String description;
    4665  private final Integer port;
    47   private final JobAgentServerConnection server;
    48  
    49   Agent(Properties properties)
     66 
     67 
     68  private JobAgentServerConnection server;
     69  private RequestHandler requestHandler;
     70
     71  private TimerTask jobQueueChecker;
     72  private long checkInterval = 10000;
     73 
     74 
     75  private boolean isRunning;
     76 
     77 
     78  private SessionControl sc;
     79 
     80 
     81  public Agent(int port)
     82  {
     83    this.port = port;
     84    this.login = null;
     85    this.password = null;
     86    this.externalId = null;
     87    this.name = null;
     88    this.description = null;
     89  }
     90 
     91  public Agent(Properties properties)
    5092  {
    5193    this.login = properties.getProperty("agent.user");
     
    5496    this.name = properties.getProperty("agent.name");
    5597    this.description = properties.getProperty("agent.description");
    56     this.port = Integer.parseInt(properties.getProperty("agent.port"));
     98    this.port = Values.getInt(properties.getProperty("agent.port"), JobAgent.DEFAULT_PORT);
    5799    validate();
    58     this.server = new JobAgentServerConnection(port, new RequestHandlerImpl());
     100    isRunning = false;
    59101  }
    60102 
     
    62104  {}
    63105
    64   public void start()
    65   {
    66    
    67   }
    68  
    69   public void stop()
    70   {
    71    
    72   }
    73  
    74   public void pause()
    75   {
    76    
    77   }
    78  
    79   private class RequestHandlerImpl
    80     implements RequestHandler
    81   {
    82     public String handleCmd(String cmd)
    83     {
    84       String answer = null;
    85       if ("info".equals(cmd))
    86       {
    87         Runtime runtime = Runtime.getRuntime();
    88         int cpu = (int)(Math.random()*100);
    89         long totalMemory = runtime.maxMemory();
    90         long usedMemory = runtime.totalMemory() - runtime.freeMemory();
    91         JobAgentInfo info = new JobAgentInfo(cpu, totalMemory, usedMemory, null);
    92         answer = info.toString();
    93       }
    94       return answer;
     106  public synchronized void service(RequestHandler requestHandler)
     107    throws IOException
     108  {
     109    log.info("Starting listener service for job agent on port " + port);
     110    if (server == null)
     111    {
     112      this.requestHandler = requestHandler == null ? new DefaultRequestHandler(this) : requestHandler;
     113      this.server = new JobAgentServerConnection(port, this.requestHandler);
     114      server.open();
     115    }
     116  }
     117 
     118  public synchronized void start()
     119  {
     120    log.info("Starting job agent on port " + port);
     121    if (!isRunning)
     122    {
     123      isRunning = true;
     124      Scheduler scheduler = Application.getScheduler();
     125      jobQueueChecker = scheduler.schedule(new JobQueueChecker(this), checkInterval, checkInterval, false);
     126    }
     127  }
     128 
     129  public synchronized void stop()
     130  {
     131    log.info("Stopping job agent on port " + port);
     132    isRunning = false;
     133    closeJobQueueChecker();
     134    closeServer();
     135    Application.stop();
     136  }
     137 
     138  public synchronized void pause()
     139  {
     140    log.info("Pausing job agent on port " + port);
     141    isRunning = false;
     142    closeJobQueueChecker();
     143  }
     144 
     145  public boolean isRunning()
     146  {
     147    return isRunning;
     148  }
     149 
     150  /**
     151    Check if the computer specified by the given address is allowed to
     152    control this job agent.
     153    @param remote The address to the remote computer
     154    @return TRUE if the computer is allowed, FALSE otherwise
     155  */
     156  public boolean isAllowedControl(InetAddress remote)
     157  {
     158    return true; // TODO - implement
     159  }
     160 
     161  public SessionControl getSessionControl()
     162  {
     163    if (sc == null)
     164    {
     165      Application.start(false);
     166      sc = Application.newSessionControl("net.sf.basedb.clients.jobagent",
     167        SocketUtil.getLocalHost().toString(), null);
     168      sc.login(login, password, null, false);
     169    }
     170    return sc;
     171  }
     172 
     173  public JobAgent getJobAgent(DbControl dc)
     174  {
     175    return JobAgent.getByExternalId(dc, externalId);
     176  }
     177 
     178  public Job.ExecutionTime getSlot(Job.ExecutionTime estimated)
     179  {
     180    return Job.ExecutionTime.SHORT; // TODO - implement
     181  }
     182 
     183  private void closeServer()
     184  {
     185    if (server != null)
     186    {
     187      server.close();
     188      server = null;
     189    }
     190  }
     191 
     192  private void closeJobQueueChecker()
     193  {
     194    if (jobQueueChecker != null)
     195    {
     196      jobQueueChecker.cancel();
     197      jobQueueChecker = null;
    95198    }
    96199  }
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/AgentController.java

    r2632 r2634  
    2222  Boston, MA  02111-1307, USA.
    2323*/
    24 package sf.basedb.clients.jobagent;
     24package net.sf.basedb.clients.jobagent;
    2525
    2626import java.io.File;
    2727import java.io.FileInputStream;
    2828import java.io.IOException;
     29import java.net.ConnectException;
     30import java.net.SocketTimeoutException;
    2931import java.util.Properties;
    3032
     33import net.sf.basedb.core.JobAgent;
     34import net.sf.basedb.util.Values;
    3135import net.sf.basedb.util.jobagent.JobAgentConnection;
     36import net.sf.basedb.util.jobagent.JobAgentInfo;
    3237
    3338/**
    34   This is the main class for the job agent application. It is responsible for
    35   starting up and managing the running job agent.
     39  This is the controller class for the job agent application. It is responsible for
     40  starting up and managing a running job agent. The agent itself will be started in
     41  separate thread.
    3642
    3743  @author nicklas
     
    4349
    4450  /**
    45    @param args
    46    */
     51    Log job agent events.
     52  */
     53  private static final org.apache.log4j.Logger log =
     54    org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.AgentController");
     55
    4756  public static void main(String[] args)
    4857  {
    49     Properties p = new Properties();
    50     try
    51     {
    52       p.load(new FileInputStream(new File("jobagent.properties")));
    53     }
    54     catch (IOException ex)
    55     {}
    56     String cmd = "start";
    57    
    58     AgentController ac = new AgentController(p);
    59     if ("start".equals(cmd))
    60     {
    61       ac.startAgent();
    62     }
    63     else if ("stop".equals(cmd))
    64     {
    65       ac.stopAgent();
    66     }
    67     else if ("register".equals(cmd))
    68     {
    69       ac.registerAgent();
    70     }
    71     else if ("unregister".equals(cmd))
    72     {
    73       ac.unregisterAgent();
    74     }
    75     else if ("pause".equals(cmd))
    76     {
    77       // ac.pasueAgent();
     58    CmdLine cmdLine = new CmdLine(args);
     59    String cmd = cmdLine.getCmd();
     60    String propertiesFile = cmdLine.getOption("-c", "jobagent.properties");
     61   
     62    if ("help".equals(cmd))
     63    {
     64      printUsage();
     65      printHelp();
    7866    }
    7967    else
    8068    {
    81       // Unknown command
    82     }
    83    
    84   }
    85 
     69      Properties p = new Properties();
     70      try
     71      {
     72        p.load(new FileInputStream(new File(propertiesFile)));
     73      }
     74      catch (Throwable t)
     75      {
     76        String message = "Could not load configuration file " + propertiesFile;
     77        System.out.println(message + ": " + t.getMessage());
     78        printUsage();
     79        log.debug(message, t);
     80        return;
     81      }
     82   
     83      try
     84      {
     85        AgentController ac = new AgentController(p);
     86        if ("start".equals(cmd))
     87        {
     88          System.out.println("Starting job agent...");
     89          ac.startAgent();
     90          System.out.println("Job agent is up and running.");
     91        }
     92        else if ("stop".equals(cmd))
     93        {
     94          System.out.println("Stopping job agent...");
     95          ac.stopAgent();
     96          System.out.println("The job agent has been stopped");
     97        }
     98        else if ("info".equals(cmd))
     99        {
     100          System.out.println("Getting info from job agent...");
     101          JobAgentInfo info = ac.getInfo();
     102          System.out.println(info.toString());
     103        }
     104        else if ("register".equals(cmd))
     105        {
     106          System.out.println("Registering job agent with BASE...");
     107          ac.registerAgent();
     108          System.out.println("Done");
     109        }
     110        else if ("unregister".equals(cmd))
     111        {
     112          System.out.println("Unregistering job agent from BASE...");
     113          ac.unregisterAgent();
     114          System.out.println("Done");
     115        }
     116        else if ("pause".equals(cmd))
     117        {
     118          System.out.println("Pausing job agent...");
     119          ac.pauseAgent();
     120          System.out.println("The job agent is now paused");
     121        }
     122        else
     123        {
     124          System.out.println("Unknown command: " + cmd);
     125          printUsage();
     126          printHelp();
     127        }
     128      }
     129      catch (Throwable t)
     130      {
     131        String message = "Exception when executing command " + cmd;
     132        System.out.println(message + ": " + t.getMessage());
     133        log.error(message, t);
     134      }
     135    }
     136  }
     137
     138  private static void printUsage()
     139  {
     140    System.out.println("Usage    : jobagent.sh [options] cmd");
     141    System.out.println(" options : -c <configuration file>");
     142    System.out.println(" cmd     : start | stop | pause | info | register | unregister | help");
     143  }
     144
     145  private static void printHelp()
     146  {
     147    System.out.println("Commands");
     148    System.out.println("===========");
     149    System.out.println("start     : Starts the job agent");
     150    System.out.println("stop      : Stops the job agent");
     151    System.out.println("pause     : Pauses the job agent");
     152    System.out.println("info      : Get info about the job agent");
     153    System.out.println("register  : Registers this job agent with the BASE server");
     154    System.out.println("unregister: Unregisters this job agent from the BASE server");
     155    System.out.println("help      : Display this help message");
     156  }
     157 
    86158  private final Properties p;
    87159  private final int port;
     160  private final int timeout;
     161 
     162  /**
     163    Create a new controller for controlling the job agent specified by the
     164    given properties.
     165    @param p The properties for the job agent to control
     166  */
    88167  public AgentController(Properties p)
    89168  {
    90169    this.p = p;
    91     this.port = Integer.parseInt(p.getProperty("agent.port"));
    92   }
    93  
     170    this.port = Values.getInt(p.getProperty("agent.port"), JobAgent.DEFAULT_PORT);
     171    this.timeout = Values.getInt(p.getProperty("agent.timeout"), 1000);
     172  }
     173 
     174  /**
     175    Start the agent this controller is controlling. If no agent is running
     176    a new one is created in another thread in this virtual machine. The thread
     177    must be stopped before the virtual machine exits.
     178   
     179    @throws IOException If there is an error
     180  */
    94181  public void startAgent()
    95   {
    96     JobAgentConnection conn = new JobAgentConnection(port, 1000);
     182    throws IOException
     183  {
     184    log.info("Starting job agent on port " + port);
     185    JobAgentConnection conn = new JobAgentConnection(port, timeout);
    97186    try
    98187    {
    99       if (conn.isRunning())
    100       {
    101         // Error: Already running
     188      JobAgentInfo info = null;
     189      try
     190      {
     191        log.info("Sending info command to job agent on port " + port);
     192        info = conn.getInfo(false);
     193      }
     194      catch (ConnectException t)
     195      {
     196        // Connection was refused, probably due to nobody listening on the port
     197        log.debug(t.getMessage(), t);
     198      }
     199      catch (SocketTimeoutException t)
     200      {
     201        // Connection timed out, maybe because nobody listens
     202        log.debug(t.getMessage(), t);
     203      }
     204     
     205      if (info == null)
     206      {
     207        // No job agent is running, create a new agent
     208        log.info("Creating a new job agent on port " + port);
     209        Agent agent = createAgent(p);
     210        agent.service(null); // Start listening for incoming connections
    102211      }
    103212      else
    104213      {
    105         createAgent(p, true);
     214        log.info("An existing job agent is already running on port " + port +
     215          " (" + (info.isPaused() ? "paused" : "running") + ")");
     216      }
     217
     218      // If we reach this part, we should at least have a job agent listening for connections
     219      if (info == null || info.isPaused())
     220      {
     221        // It is a new agent or an existing that is paused --> Start it
     222        log.info("Sending start command to job agent on port " + port);
    106223        conn.sendStart();
    107224      }
    108225    }
    109     catch (Throwable t)
    110     {
    111       // TODO
    112     }
    113226    finally
    114227    {
     
    118231
    119232  public void stopAgent()
    120   {
    121     JobAgentConnection conn = new JobAgentConnection(port, 1000);
     233    throws IOException
     234  {
     235    log.info("Stopping job agent on port " + port);
     236    JobAgentConnection conn = new JobAgentConnection(port, timeout);
    122237    try
    123238    {
    124       if (!conn.isRunning())
    125       {
    126         // Error: Not running
    127       }
    128       else
    129       {
    130         conn.sendStop();
    131       }
    132     }
    133     catch (Throwable t)
    134     {
    135       // TODO
     239      log.info("Sending stop command to job agent on port " + port);
     240      conn.sendStop();
     241    }
     242    catch (ConnectException t)
     243    {
     244      // Connection was refused, probably due to nobody listening on the port
     245      // Do nothing
     246      log.debug(t.getMessage(), t);
     247      log.info("No job agent is running on port " + port);
    136248    }
    137249    finally
     
    141253  }
    142254 
     255  public void pauseAgent()
     256    throws IOException
     257  {
     258    log.info("Pausing job agent on port " + port);
     259    JobAgentConnection conn = new JobAgentConnection(port, timeout);
     260    try
     261    {
     262      log.info("Sending stop command to job agent on port " + port);
     263      conn.sendPause();
     264    }
     265    catch (ConnectException t)
     266    {
     267      // Connection was refused, probably due to nobody listening on the port
     268      // Do nothing
     269      log.debug(t.getMessage(), t);
     270      log.info("No job agent is running on port " + port);
     271    }
     272    finally
     273    {
     274      conn.close();
     275    }
     276  }
     277
     278  public JobAgentInfo getInfo()
     279    throws IOException
     280  {
     281    log.info("Getting info from agent on port " + port);
     282    JobAgentConnection conn = new JobAgentConnection(port, timeout);
     283    JobAgentInfo info = null;
     284    try
     285    {
     286      log.info("Sending info command to job agent on port " + port);
     287      info = conn.getInfo(true);
     288    }
     289    finally
     290    {
     291      conn.close();
     292    }
     293    return info;
     294  }
     295 
    143296  public void registerAgent()
    144297  {
     
    151304  }
    152305 
    153   private Agent createAgent(Properties p, boolean thread)
     306  private Agent createAgent(Properties p)
     307    throws IOException
    154308  {
    155309    Agent a = new Agent(p);
Note: See TracChangeset for help on using the changeset viewer.