Ignore:
Timestamp:
Sep 14, 2006, 12:17:01 PM (16 years ago)
Author:
Nicklas Nordborg
Message:

References #351: External job server usage

The core of the job agent is now working. Still missing code for actually running the jobs. A dummy
job executor which marks the job as beeing executed allows testing of the job agent.

Location:
trunk/src/clients/jobagent
Files:
7 added
9 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java

    r2634 r2641  
    2626import java.io.IOException;
    2727import java.net.InetAddress;
     28import java.net.UnknownHostException;
     29import java.util.Collections;
     30import java.util.HashMap;
     31import java.util.HashSet;
     32import java.util.Map;
    2833import java.util.Properties;
     34import java.util.Set;
    2935import java.util.TimerTask;
    3036
     37import net.sf.basedb.clients.jobagent.executors.ProcessJobExecutor;
    3138import net.sf.basedb.clients.jobagent.handlers.DefaultRequestHandler;
    3239import net.sf.basedb.core.Application;
    3340import net.sf.basedb.core.DbControl;
     41import net.sf.basedb.core.InvalidDataException;
     42import net.sf.basedb.core.InvalidUseOfNullException;
    3443import net.sf.basedb.core.Job;
    3544import net.sf.basedb.core.JobAgent;
     45import net.sf.basedb.core.Project;
    3646import net.sf.basedb.core.SessionControl;
    3747
     
    4353
    4454/**
    45   This is the actual agent that is checking the database for jobs.
     55  This is the actual job agent application. It delegates the actual
     56  checking of the job queue to the {@link JobQueueChecker} class.
     57  It includes a listener service that can be used for remote control
     58  of the agent. It is for example possible to send <code>start</code>,
     59  <code>stop</code> and <code>pause</code> requests.
     60  <p>
     61  This class is responsible for creating a {@link JobExecutor}
     62  object and to delegate the actual execution of a job to it. The agent keep
     63  track of the running jobs and makes sure that only the configured
     64  number of jobs are running at the same time.
     65  <p>
     66  The agent is configured at construction time with parameters from
     67  a {@link Properties} object.
     68 
     69  <table>
     70  <tr>
     71    <th>Parameter</th>
     72    <th>Default value</th>
     73    <th>Description</th>
     74  </tr>
     75  <tr>
     76    <td>agent.user</td>
     77    <td>-</td>
     78    <td>The username to use for logging in to BASE (required)</td>
     79  </tr>
     80  <tr>
     81    <td>agent.password</td>
     82    <td>-</td>
     83    <td>The password to use for logging in to BASE (required)</td>
     84  </tr>
     85  <tr>
     86    <td>agent.id</td>
     87    <td>-</td>
     88    <td rowspan="3">
     89      The external ID (required), name and description properties of the
     90      corresponding {@link JobAgent} item in the BASE database
     91    </td>
     92  </tr>
     93  <tr>
     94    <td>agent.name</td>
     95    <td>-</td>
     96  </tr>
     97  <tr>
     98    <td>agent.description</td>
     99    <td>-</td>
     100  </tr>
     101  <tr>
     102    <td>agent.port</td>
     103    <td>47822</td>
     104    <td>The port the remote control service listener is listening to</td>
     105  </tr>
     106  <tr>
     107    <td>agent.remotecontrol</td>
     108    <td>-</td>
     109    <td>
     110      A comma-separated list of computer that are allowed remote control. It is recommended
     111      that the web server is put in this list. The local host is always allowed control
     112      and doesn't have to be in this list.
     113    </td>
     114  </tr>
     115  <tr>
     116    <td>agent.allowremote.stop</td>
     117    <td>false</td>
     118    <td>
     119      If <code>stop</code> requests should be allowed from remote hosts or not.
     120      Note! A stop request shuts down the agent making it impossible to start it
     121      again using remote control.
     122    </td>
     123  </tr>
     124  <tr>
     125    <td>agent.allowremote.start</td>
     126    <td>true</td>
     127    <td>If <code>start</code> requests should be allowed from remote hosts or not</td>
     128  </tr>
     129  <tr>
     130    <td>agent.allowremote.pause</td>
     131    <td>true</td>
     132    <td>If <code>pause</code> requests should be allowed from remote hosts or not</td>
     133  </tr>
     134  <tr>
     135    <td>agent.executor.class</td>
     136    <td>{@link ProcessJobExecutor}</td>
     137    <td>
     138      The name of a class that is responsible for starting the a job once
     139      the agent has determined that is allowed to be exected. The class must
     140      implement the {@link JobExecutor} interface and provide a public noargument
     141      constructor. Note that only one instance of this class exists for an agent.
     142      It must be thread-safe since the jobs are executed in parallel threads.
     143    </td>
     144  </tr>
     145  <tr>
     146    <td>agent.executor.init</td>
     147    <td>-</td>
     148    <td>
     149      Initialisation parameters for the executor class. They are sent to the
     150      {@link JobExecutor#init(String)} method. For a meaning and syntax description
     151      of this string see the excutor implementation you are using.
     152    </td>
     153  </tr>
     154  <tr>
     155    <td>agent.checkinterval</td>
     156    <td>30</td>
     157    <td>Number of seconds between checks to the database for new jobs.</td>
     158  </tr>
     159  <tr>
     160    <td>agent.shortest.slots</td>
     161    <td>1</td>
     162    <td>Number of slots to reserve for jobs that take &lt; 1 minute to execute</td>
     163  </tr>
     164  <tr>
     165    <td>agent.shortest.priority</td>
     166    <td>4</td>
     167    <td>The thread priority of jobs in this slot. See {@link Thread#setPriority(int)}.</td>
     168  </tr>
     169  <tr>
     170    <td>agent.short.slots</td>
     171    <td>1</td>
     172    <td>Number of slots to reserve for jobs that take &lt; 10 minute to execute</td>
     173  </tr>
     174  <tr>
     175    <td>agent.short.priority</td>
     176    <td>4</td>
     177    <td>The thread priority of jobs in this slot. See {@link Thread#setPriority(int)}.</td>
     178  </tr>
     179  <tr>
     180    <td>agent.medium.slots</td>
     181    <td>2</td>
     182    <td>Number of slots to reserve for jobs that take &lt; 1 hour to execute</td>
     183  </tr>
     184  <tr>
     185    <td>agent.medium.priority</td>
     186    <td>3</td>
     187    <td>The thread priority of jobs in this slot. See {@link Thread#setPriority(int)}.</td>
     188  </tr>
     189  <tr>
     190    <td>agent.long.slots</td>
     191    <td>2</td>
     192    <td>Number of slots to reserve for jobs that take &gt; 1 hour to execute</td>
     193  </tr>
     194  <tr>
     195    <td>agent.long.priority</td>
     196    <td>3</td>
     197    <td>The thread priority of jobs in this slot. See {@link Thread#setPriority(int)}.</td>
     198  </tr>
     199  </table>
    46200
    47201  @author nicklas
     
    53207
    54208  /**
     209    The default job executor to use if none has been specified
     210    in the configuration file.
     211  */
     212  public static final Class<? extends JobExecutor> DEFAULT_JOB_EXECUTOR =
     213    ProcessJobExecutor.class;
     214 
     215  /**
     216    The default check interval in seconds.
     217  */
     218  public static final int DEFAULT_CHECK_INTERVAL = 30;
     219 
     220  /**
    55221    Log job agent events.
    56222  */
    57223  private static final org.apache.log4j.Logger log =
    58224    org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.Agent");
    59  
     225
     226  /**
     227    Log job agent server events.
     228  */
     229  private static final org.apache.log4j.Logger logServer =
     230    org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.JobAgentServerConnection");
     231 
     232  // Base settings
    60233  private final String login;
    61234  private final String password;
     
    63236  private final String name;
    64237  private final String description;
     238
     239  // Service listener settings
    65240  private final Integer port;
    66  
    67  
     241  private final Set<InetAddress> remote;
     242  private final boolean allowRemoteStop;
     243  private final boolean allowRemotePause;
     244  private final boolean allowRemoteStart;
     245 
     246  // Job execution settings
     247  private final long checkInterval;
     248  private final Class<? extends JobExecutor> executorClass;
     249  private final String executorInitParameters;
     250 
     251  private InetAddress serverAddress;
    68252  private JobAgentServerConnection server;
    69253  private RequestHandler requestHandler;
     254  private JobExecutor jobExecutor;
    70255
    71256  private TimerTask jobQueueChecker;
    72   private long checkInterval = 10000;
    73257 
    74258 
    75259  private boolean isRunning;
    76  
    77  
    78260  private SessionControl sc;
    79261 
    80  
    81   public Agent(int port)
    82   {
    83     this.port = port;
    84     this.login = null;
    85     this.password = null;
    86     this.externalId = null;
    87     this.name = null;
    88     this.description = null;
    89   }
    90  
     262  /**
     263    The current number of threads executing in each slot.
     264  */
     265  private final Map<Job.ExecutionTime, Integer> usedSlots;
     266 
     267  /**
     268    The maximum number of threads that are allowed in each slot.
     269  */
     270  private final Map<Job.ExecutionTime, Integer> maxSlots;
     271 
     272  /**
     273    The thread priority to use when executing jobs in each slot.
     274  */
     275  private final Map<Job.ExecutionTime, Integer> priorities;
     276 
     277  private final Set<Integer> activeJobs;
     278 
     279  /**
     280    The group were all job runners are placed.
     281  */
     282  private final ThreadGroup runnersGroup;
     283
     284  /**
     285    Create a new job agent. See class documentation for information about
     286    configuration parameters.
     287
     288    @param properties A properties object containing the configuration
     289      parameters for the agent
     290  */
    91291  public Agent(Properties properties)
    92292  {
     293    // BASE settings
    93294    this.login = properties.getProperty("agent.user");
    94295    this.password = properties.getProperty("agent.password");
     
    96297    this.name = properties.getProperty("agent.name");
    97298    this.description = properties.getProperty("agent.description");
     299
     300    // Listener service settings
    98301    this.port = Values.getInt(properties.getProperty("agent.port"), JobAgent.DEFAULT_PORT);
    99     validate();
     302    this.remote = getRemoteAddresses(properties.getProperty("agent.remotecontrol"));
     303    this.allowRemoteStop = Values.getBoolean(properties.getProperty("agent.allowremote.stop"), false);
     304    this.allowRemotePause = Values.getBoolean(properties.getProperty("agent.allowremote.pause"), true);
     305    this.allowRemoteStart = Values.getBoolean(properties.getProperty("agent.allowremote.start"), true);
     306   
     307    // Job execution settings
     308    this.checkInterval = Values.getInt(properties.getProperty("agent.checkinterval"), DEFAULT_CHECK_INTERVAL);
     309    this.executorClass = getJobExecutorClass(properties.getProperty("agent.executor.class"));
     310    this.executorInitParameters = properties.getProperty("agent.executor.init");
     311   
     312    // Slots and priorities
     313    this.usedSlots = new HashMap<Job.ExecutionTime, Integer>();
     314    this.maxSlots = new HashMap<Job.ExecutionTime, Integer>();
     315    this.priorities = new HashMap<Job.ExecutionTime, Integer>();
     316    for (Job.ExecutionTime et : Job.ExecutionTime.values())
     317    {
     318      usedSlots.put(et, 0);
     319      String configName = "agent."+et.name().toLowerCase();
     320      int configuredSlots = Values.getInt(properties.getProperty(configName+".slots"), et.getDefaultSlots());
     321      maxSlots.put(et, configuredSlots);
     322      int priority = Values.getInt(properties.getProperty(configName+".priority"), et.getDefaultPriority());
     323      priorities.put(et, priority);
     324    }
     325
     326    validateConfiguration();
     327   
     328    // Configure the job runners thread group
     329    runnersGroup = new ThreadGroup("Agent."+externalId);
     330    runnersGroup.setDaemon(false);
     331
     332    this.activeJobs = new HashSet<Integer>();
    100333    isRunning = false;
    101334  }
    102335 
    103   private void validate()
    104   {}
    105 
     336  /**
     337    Get the <code>agent.id</code> configuration value.
     338    @return The ID
     339  */
     340  public String getId()
     341  {
     342    return externalId;
     343  }
     344 
     345  /**
     346    Get the <code>agent.name</code> configuration value.
     347    @return The name or the ID if the name is null
     348  */
     349  public String getName()
     350  {
     351    return name == null ? externalId : name;
     352  }
     353 
     354  /**
     355    Get the <code>agent.description</code> configuration value.
     356    @return The description
     357  */
     358  public String getDescription()
     359  {
     360    return description;
     361  }
     362 
     363  /**
     364    Get the <code>agent.port</code> configuration value.
     365    @return The port number
     366  */
     367  public int getPort()
     368  {
     369    return port;
     370  }
     371 
     372  /**
     373    Get the host name of the server where the job agent is running.
     374    @see SocketUtil#getPublicLocalHost()
     375  */
     376  public String getServerName()
     377  {
     378    if (serverAddress == null)
     379    {
     380      serverAddress = SocketUtil.getPublicLocalHost();
     381    }
     382    return serverAddress.getHostName();
     383  }
     384 
     385  /**
     386    Split the string at commas and try to create an {@link InetAddress}
     387    for each part. Parts that turn out to be invalid are skipped and
     388    a warning message is logged.
     389    @param config The string to split
     390    @return A set of <code>InetAdress</code> object
     391  */
     392  private Set<InetAddress> getRemoteAddresses(String config)
     393  {
     394    Set<InetAddress> remote = new HashSet<InetAddress>();
     395    if (config != null)
     396    {
     397      for (String host : config.split(","))
     398      {
     399        try
     400        {
     401          remote.add(InetAddress.getByName(host));
     402        }
     403        catch (UnknownHostException ex)
     404        {
     405          log.warn("Unknown host: " + host, ex);
     406        }
     407      }
     408    }
     409    return remote;
     410  }
     411 
     412  /**
     413    Get the class object for the configured job executor. If the
     414    specified class can't be found or doesn't implement the
     415    {@link JobExecutor} interface a warning message is logged and
     416    the {@link #DEFAULT_JOB_EXECUTOR} is used instead.
     417   
     418    @param className The name of the job executor class
     419    @return The class object for that class or the default job executor
     420  */
     421  @SuppressWarnings("unchecked")
     422  private Class<? extends JobExecutor> getJobExecutorClass(String className)
     423  {
     424    Class<? extends JobExecutor> executor = DEFAULT_JOB_EXECUTOR;
     425    try
     426    {
     427      executor = (Class<JobExecutor>)Class.forName(className);
     428      if (!JobExecutor.class.isAssignableFrom(executor))
     429      {
     430        log.warn("Class " + className + " doesn't implement the JobExecutor interface, using " +
     431          DEFAULT_JOB_EXECUTOR.getName() + " instead");
     432        executor =  DEFAULT_JOB_EXECUTOR;
     433      }
     434    }
     435    catch (ClassNotFoundException ex)
     436    {
     437      log.warn("Class " + className + " not found, using " + DEFAULT_JOB_EXECUTOR.getName() +
     438        " instead", ex);
     439    }
     440    return executor;
     441  }
     442 
     443  /**
     444    Validate that all required configuration parameters have been specified.
     445    @throws InvalidDataException If parameters are missing or have incorrect values
     446  */
     447  private void validateConfiguration()
     448    throws InvalidDataException
     449  {
     450    if (login == null) throw new InvalidUseOfNullException("agent.user");
     451    if (password == null) throw new InvalidUseOfNullException("agent.password");
     452    if (externalId == null) throw new InvalidUseOfNullException("agent.id");
     453  }
     454
     455  /**
     456    Start the listener service that listens for control commands such
     457    as <code>start</code>, <code>stop</code>, <code>pause</code> and
     458    <code>info</code>. The listener service is only required if the job
     459    agent needs to respond to remote control commands.
     460    <p>
     461    Note! The {@link AgentController} which is the default controller for
     462    agents always uses the listener service for communicating with the job
     463    agent.
     464    <p>
     465    Note! The listener service is started in a separate thread and this method
     466    returns as soon as the network connections are set up.
     467   
     468    @param requestHandler A {@link RequestHandler} that handles the
     469      incoming requsts, or null to use the {@link DefaultRequestHandler}
     470    @throws IOException If there is an error when starting the service
     471  */
    106472  public synchronized void service(RequestHandler requestHandler)
    107473    throws IOException
     
    111477    {
    112478      this.requestHandler = requestHandler == null ? new DefaultRequestHandler(this) : requestHandler;
    113       this.server = new JobAgentServerConnection(port, this.requestHandler);
     479      this.server = new JobAgentServerConnection(port, this.requestHandler, logServer);
    114480      server.open();
    115481    }
    116482  }
    117483 
     484  /**
     485    Start the job agent. This method will register a {@link JobQueueChecker}
     486    object with the BASE core scheduler {@link Application#getScheduler()}. The
     487    timer will call the {@link JobQueueChecker#run()} method at intervals
     488    specified by the <code>agent.checkinterval</code> configuration settings.
     489    <p>
     490    Note! The <code>JobQueueChecker</code> will run in a separate thread
     491    and this method return immediately after registering the object with the
     492    scheduler.
     493    <p>
     494    Note! This method also creats a single instance of a {@link JobExecutor}.
     495    The actual class to use is specified by the <code>agent.jobexecutor.class</code>
     496    configuration setting. The default job executor is {@link ProcessJobExecutor}.
     497  */
    118498  public synchronized void start()
    119499  {
     
    122502    {
    123503      isRunning = true;
    124       Scheduler scheduler = Application.getScheduler();
    125       jobQueueChecker = scheduler.schedule(new JobQueueChecker(this), checkInterval, checkInterval, false);
    126     }
    127   }
    128  
     504      jobExecutor = createJobExecutor();
     505      jobQueueChecker = createJobQueueChecker();
     506    }
     507  }
     508 
     509  /**
     510    Stop the job agent. This method will:
     511
     512    <ul>
     513    <li>Cancel the {@link JobQueueChecker} that was registered with the BASE
     514      core scheduler by the {@link #start()} method.
     515    <li>Call the {@link JobExecutor#close()} method on the job executor
     516      created by the {@link #start()} method.
     517    <li>Close the service listener started by the {@link #service(RequestHandler)}
     518      method.
     519    <li>Try to stop all running jobs by calling {@link Thread#interrupt()}
     520      on all job threads.
     521    <li>Logout and stop the BASE {@link Application}.
     522    </ul>
     523   
     524    Unless no other things are running in the same virtual machine as this
     525    job agent the virtual machine should exit as a result from calling this
     526    method.
     527  */
    129528  public synchronized void stop()
    130529  {
     
    132531    isRunning = false;
    133532    closeJobQueueChecker();
     533    maybeStopRunningJobs();
     534    closeJobExecutor();
    134535    closeServer();
     536    if (sc != null) sc.logout();
    135537    Application.stop();
    136538  }
    137539 
     540  /**
     541    Pause the job agent. This method will:
     542
     543    <ul>
     544    <li>Cancel the {@link JobQueueChecker} that was registered with the BASE
     545      core scheduler by the {@link #start()} method.
     546    <li>Call the {@link JobExecutor#close()} method on the job executor
     547      created by the {@link #start()} method.
     548    <li>Logout and stop the BASE {@link Application}.
     549    </ul>
     550   
     551    This method will not try to stop the running jobs or shut down the
     552    BASE application. Calling {@link #start()} again will start the job
     553    agent again.
     554   
     555    @see #stop()
     556    @see #start()
     557  */
    138558  public synchronized void pause()
    139559  {
     
    141561    isRunning = false;
    142562    closeJobQueueChecker();
    143   }
    144  
     563    closeJobExecutor();
     564    if (sc != null) sc.logout();
     565  }
     566 
     567  /**
     568    Check if the job agent is running or not.
     569    @return TRUE if the job agent is running, FALSE otherwise
     570  */
    145571  public boolean isRunning()
    146572  {
     
    149575 
    150576  /**
     577    Get a set containing the ID:s of the jobs that are currently
     578    beeing executed by this job agent.
     579    @return A set of integers
     580  */
     581  public Set<Integer> getRunningJobs()
     582  {
     583    return Collections.unmodifiableSet(activeJobs);
     584  }
     585 
     586  /**
    151587    Check if the computer specified by the given address is allowed to
    152     control this job agent.
     588    control this job agent. A computer is allowed to control this
     589    job agent if it's name or ip-address is listed in the
     590    <code>agent.remotecontrol</code> property. This method is called from
     591    the {@link DefaultRequestHandler#handleCmd(Socket, String)} method
     592    to determine if a service request should be accepted or not.
     593    <p>
     594    Note! The <code>stop</code>, <code>start</code> and <code>pause</code>
     595    commands are only allowed from the local host unless the <code>agent.allowremote.stop</code>,
     596    <code>agent.allowremote.start</code> and <code>agent.allowremote.pause</code>
     597    are set to a true values.
     598    <p>
     599    Note! The local host doesn't have to be listed in the <code>agent.remotecontrol</code>
     600    property. Requests are always allowed from the local host.
     601   
    153602    @param remote The address to the remote computer
     603    @param cmd The command the remote computer wants to execute
    154604    @return TRUE if the computer is allowed, FALSE otherwise
    155605  */
    156   public boolean isAllowedControl(InetAddress remote)
    157   {
    158     return true; // TODO - implement
    159   }
    160  
     606  public boolean isAllowedControl(InetAddress remote, String cmd)
     607  {
     608    boolean allow = false;
     609    boolean isLocal = SocketUtil.isLocalHost(remote);
     610    if ("stop".equals(cmd) && !allowRemoteStop)
     611    {
     612      allow = isLocal;
     613    }
     614    else if ("start".equals(cmd) && !allowRemoteStart)
     615    {
     616      allow = isLocal;
     617    }
     618    else if ("pause".equals(cmd) && !allowRemotePause)
     619    {
     620      allow = isLocal;
     621    }
     622    else
     623    {
     624      allow = isLocal || this.remote.contains(remote);
     625    }
     626    return allow;
     627  }
     628 
     629  /**
     630    Get a session control with the configured user logged in.
     631    @return A session control
     632  */
    161633  public SessionControl getSessionControl()
    162634  {
     
    166638      sc = Application.newSessionControl("net.sf.basedb.clients.jobagent",
    167639        SocketUtil.getLocalHost().toString(), null);
     640    }
     641    if (!sc.isLoggedIn())
     642    {
    168643      sc.login(login, password, null, false);
    169644    }
     
    171646  }
    172647 
     648  /**
     649    Get a session control where the owner of the job has been impersonated and
     650    the active project has been set if needed.
     651    @param job The job to get the impersonated session control for
     652    @return A session control object
     653  */
     654  public SessionControl getImpersonatedSessionControl(Job job)
     655  {
     656    SessionControl sc = getSessionControl();
     657    SessionControl impersonated = null;
     658    DbControl dc = null;
     659    try
     660    {
     661      // Reload job and impersonate the owner
     662      dc = sc.newDbControl();
     663      job = Job.getById(dc, job.getId());
     664      impersonated = sc.impersonateLogin(job, "Running job: " + job.getName());
     665      dc.close();
     666     
     667      // Set the active project if any
     668      int projectId = job.getActiveProjectId();
     669      if (projectId != 0)
     670      {
     671        try
     672        {
     673          dc = impersonated.newDbControl();
     674          Project activeProject = Project.getById(dc, projectId);
     675          impersonated.setActiveProject(activeProject);
     676          dc.close();
     677        }
     678        catch (Throwable t)
     679        {
     680          log.error("Exception while setting active project to " + projectId +
     681            ". Continuing with no active project.", t);
     682        }
     683      }
     684    }
     685    finally
     686    {
     687      if (dc != null) dc.close();
     688    }
     689    return impersonated;
     690  }
     691 
     692  /**
     693    Get the {@link JobAgent} item corresponding to this agent. The
     694    job agent is looked up by the {@link JobAgent#getByExternalId(DbControl, String)}
     695    method where the external ID is given by the <code>agent.id</code> property.
     696    @param dc The DbControl to use for database acces
     697    @return A JobAgent item
     698  */
    173699  public JobAgent getJobAgent(DbControl dc)
    174700  {
     
    176702  }
    177703 
    178   public Job.ExecutionTime getSlot(Job.ExecutionTime estimated)
    179   {
    180     return Job.ExecutionTime.SHORT; // TODO - implement
    181   }
    182  
     704  /**
     705    Find a free slot for executing a job. If there is a free slot
     706    in the requested pool the requested slot is returned, otherwise
     707    the method checks the slower-running pools for free slots
     708    and returns one of those. If there are no free slot available
     709    null is returned.
     710    <p>
     711    Note! This method reserves the slot for the job. It is important that
     712    the {@link #jobDone(Job, Job.ExecutionTime)} method is called once
     713    the job has completed to return the slot to the pool. Failure to do
     714    so may result in that the agent thinks that all slots are
     715    used when they are not.
     716   
     717    @param requested The slot the job requested
     718    @return The assigned slot or null if no slot is available
     719  */
     720  synchronized Job.ExecutionTime getSlot(Job.ExecutionTime requested)
     721  {
     722    log.debug("Requesting slot for job: " + requested);
     723    Job.ExecutionTime slotToUse = null;
     724    Job.ExecutionTime[] slots =  Job.ExecutionTime.values();
     725
     726    // Check all slots from the requested execution time and longer execution times
     727    for (int i = requested.ordinal(); i < slots.length; ++i)
     728    {
     729      if (usedSlots.get(slots[i]) < maxSlots.get(slots[i]))
     730      {
     731        // This slot has free jobs
     732        slotToUse = slots[i];
     733        usedSlots.put(slotToUse, usedSlots.get(slotToUse) + 1);
     734        log.debug("Slot: " + slotToUse + "; used: " + usedSlots.get(slotToUse) + "; max: " + maxSlots.get(slotToUse));
     735        break;
     736      }
     737    }
     738    // If null we couldn't find a free slot
     739    return slotToUse;
     740  }
     741
     742  /**
     743    Start a job. This method will create a new thread for running the job
     744    and return immediately. If the job for some reason can't be started,
     745    for example, if there are no available slots, a log messsage will be
     746    written but nothing else will happen. No exception will be thrown to
     747    the caller of this method.
     748   
     749    @param job The job to start
     750  */
     751  public void startJob(Job job)
     752  {
     753    JobRunner runner = new JobRunner(this, job, jobExecutor);
     754    Thread t = new Thread(runnersGroup, runner);
     755    t.setDaemon(false);
     756    t.setPriority(priorities.get(job.getEstimatedExecutionTime()));
     757    t.start();
     758  }
     759
     760  /**
     761    Used by {@link JobRunner} to tell that a job has finished executing and
     762    that the used slot should be released.
     763   
     764    @param job The job that has finished
     765    @param usedSlot The slot that was used
     766  */
     767  synchronized void jobDone(Job job, Job.ExecutionTime usedSlot)
     768  {
     769    usedSlots.put(usedSlot, usedSlots.get(usedSlot) - 1);
     770    activeJobs.remove(job.getId());
     771  }
     772 
     773  /**
     774    Close the service listener.
     775  */
    183776  private void closeServer()
    184777  {
     778    log.info("Closing service listener: " + server);
    185779    if (server != null)
    186780    {
     
    190784  }
    191785 
     786  /**
     787    Create a job queue checker. and register it with the BASE core
     788    scheduler.
     789    @see Application#getScheduler()
     790    @see JobQueueChecker
     791  */
     792  private TimerTask createJobQueueChecker()
     793  {
     794    log.info("Creating job queue checker; checkInterval=" + checkInterval + " s");
     795    Scheduler scheduler = Application.getScheduler();
     796    return scheduler.schedule(
     797      new JobQueueChecker(this), 1000*checkInterval, 1000*checkInterval, false);
     798  }
     799 
     800  /**
     801    Close the job queue checker.
     802  */
    192803  private void closeJobQueueChecker()
    193804  {
     805    log.info("Closing job queue checker: " + jobQueueChecker);
    194806    if (jobQueueChecker != null)
    195807    {
     
    198810    }
    199811  }
     812
     813  /**
     814    Create a job executor and initialise it.
     815    @return A JobExecutor instance or null if none could be created
     816  */
     817  private JobExecutor createJobExecutor()
     818  {
     819    log.info("Creating job executor: " + executorClass.getName());
     820    JobExecutor executor = null;
     821    try
     822    {
     823      executor = executorClass.newInstance();
     824      executor.init(executorInitParameters);
     825    }
     826    catch (Throwable t)
     827    {
     828      log.error("Could not create job executor instance: " + executorClass.getName(), t);
     829      executor = null;
     830    }
     831    return executor;
     832  }
     833 
     834  /**
     835    Close the job executor.
     836  */
     837  private void closeJobExecutor()
     838  {
     839    log.info("Closing job executor: " + jobExecutor);
     840    if (jobExecutor != null)
     841    {
     842      jobExecutor.close();
     843      jobExecutor = null;
     844    }
     845  }
     846 
     847  /**
     848    Try to stop running jobs by interrupting the threads thaey are executing in.
     849  */
     850  private void maybeStopRunningJobs()
     851  {
     852    log.info("Stopping running jobs. " + activeJobs.size() + " job(s) still active.");
     853    // Interrupt all threads. Hopefully they will do as we tell them.
     854    runnersGroup.interrupt();
     855  }
     856 
    200857 
    201858}
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/AgentController.java

    r2634 r2641  
    3131import java.util.Properties;
    3232
     33import net.sf.basedb.core.DbControl;
     34import net.sf.basedb.core.ItemNotFoundException;
    3335import net.sf.basedb.core.JobAgent;
     36import net.sf.basedb.core.SessionControl;
     37import net.sf.basedb.util.SocketUtil;
    3438import net.sf.basedb.util.Values;
    3539import net.sf.basedb.util.jobagent.JobAgentConnection;
     
    3741
    3842/**
    39   This is the controller class for the job agent application. It is responsible for
    40   starting up and managing a running job agent. The agent itself will be started in
    41   separate thread.
     43  This is the command line controller class for the job agent application. It is
     44  responsible for starting up and managing a running job agent. The agent itself will
     45  be started in separate thread.
    4246
    4347  @author nicklas
     
    156160  }
    157161 
    158   private final Properties p;
     162  private final Properties properties;
    159163  private final int port;
    160164  private final int timeout;
     
    167171  public AgentController(Properties p)
    168172  {
    169     this.p = p;
     173    this.properties = p;
    170174    this.port = Values.getInt(p.getProperty("agent.port"), JobAgent.DEFAULT_PORT);
    171175    this.timeout = Values.getInt(p.getProperty("agent.timeout"), 1000);
     
    178182   
    179183    @throws IOException If there is an error
     184    @see JobAgentConnection#sendStart()
    180185  */
    181186  public void startAgent()
     
    207212        // No job agent is running, create a new agent
    208213        log.info("Creating a new job agent on port " + port);
    209         Agent agent = createAgent(p);
     214        Agent agent = createAgent(properties);
    210215        agent.service(null); // Start listening for incoming connections
    211216      }
     
    230235  }
    231236
     237  /**
     238    Stop a running job agent by sending a stop request to the agents remote control
     239    service. The agent may be running in this or in another virtual machine.
     240    @throws IOException If there is an error
     241    @see JobAgentConnection#sendStop()
     242  */
    232243  public void stopAgent()
    233244    throws IOException
     
    253264  }
    254265 
     266  /**
     267    Pause a running job agent by sending a stop request to the agents remote control
     268    service. The agent may be running in this or in another virtual machine.
     269    @throws IOException If there is an error
     270    @see JobAgentConnection#sendPause()
     271  */
    255272  public void pauseAgent()
    256273    throws IOException
     
    276293  }
    277294
     295  /**
     296    Get info about running job agent by sending an info request to the agents remote control
     297    service. The agent may be running in this or in another virtual machine.
     298    @throws IOException If there is an error
     299    @return A <code>JobAgentInfo</code> object
     300    @see JobAgentConnection#getInfo(boolean)
     301  */
    278302  public JobAgentInfo getInfo()
    279303    throws IOException
     
    296320  public void registerAgent()
    297321  {
    298    
     322    Agent agent = createAgent(properties);
     323    log.info("Registering agent '" + agent.getId() + "' with BASE");
     324    SessionControl sc = agent.getSessionControl();
     325    DbControl dc = null;
     326    try
     327    {
     328      dc = sc.newDbControl();
     329      JobAgent jobAgent = null;
     330      try
     331      {
     332        jobAgent = agent.getJobAgent(dc);
     333        log.info("Agent with id '" + agent.getId() + "' is already registered");
     334      }
     335      catch (ItemNotFoundException ex)
     336      {
     337        jobAgent = JobAgent.getNew(dc, agent.getId());
     338        jobAgent.setName(agent.getName());
     339        jobAgent.setDescription(agent.getDescription());
     340        jobAgent.setPort(agent.getPort());
     341        jobAgent.setServer(SocketUtil.getPublicLocalHost().getHostName());
     342        dc.saveItem(jobAgent);
     343        dc.commit();
     344      }
     345    }
     346    finally
     347    {
     348      if (dc != null) dc.close();
     349    }
    299350  }
    300351
    301352  public void unregisterAgent()
    302353  {
    303    
     354    Agent agent = createAgent(properties);
     355    SessionControl sc = agent.getSessionControl();
     356    DbControl dc = null;
     357    try
     358    {
     359      dc = sc.newDbControl();
     360      JobAgent jobAgent = null;
     361      try
     362      {
     363        jobAgent = agent.getJobAgent(dc);
     364        dc.deleteItem(jobAgent);
     365        dc.commit();
     366      }
     367      catch (ItemNotFoundException ex)
     368      {}
     369    }
     370    finally
     371    {
     372      if (dc != null) dc.close();
     373    }
    304374  }
    305375 
    306376  private Agent createAgent(Properties p)
    307     throws IOException
    308377  {
    309378    Agent a = new Agent(p);
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/CmdLine.java

    r2634 r2641  
    2828
    2929/**
     30  Utility class for parsing command line arguments. It supports a very limitied
     31  syntax: [options] [cmd]
     32  <p>
     33  Options starts with hyphen (-) and may have a value following it. The last
     34  parameter is the command unless it starts with a hyphen.
     35  <p>
     36  Examples:
     37  <pre class="code">
     38./jobagent.sh start
     39./jobagent.sh -c agent.properties stop
     40</pre>
    3041
    3142  @author nicklas
     
    3849  private final String cmd;
    3950 
     51  /**
     52    Create a new object for parsing the command line.
     53    @param args The command line arguments sent to the <code>main()</code> method
     54  */
    4055  public CmdLine(String[] args)
    4156  {
     
    7186  }
    7287 
    73  
     88  /**
     89    Get the command parameter
     90    @return The command parameter, or null
     91  */
    7492  public String getCmd()
    7593  {
     
    7795  }
    7896 
     97  /**
     98    Get the value for the specified option
     99    @param option The option to get the value for
     100    @return The value or null
     101  */
    79102  public String getOption(String option)
    80103  {
     
    82105  }
    83106 
     107  /**
     108    Get the value for an option.
     109    @param option The option to get the value for
     110    @param defaultValue A default value if the option wasn't specified
     111    @return The options value or the default value
     112  */
    84113  public String getOption(String option, String defaultValue)
    85114  {
     
    87116  }
    88117 
     118  /**
     119    Check if an option was specified or not.
     120    @param option The option to check
     121    @return TRUE if the option was specified, FALSE otherwise
     122  */
     123  public boolean hasOption(String option)
     124  {
     125    return options.containsKey(option);
     126  }
     127 
    89128}
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/JobQueueChecker.java

    r2634 r2641  
    3434
    3535/**
     36  This class is given the responsibility to check the job queue for
     37  jobs that are awaiting execution. Each agent has one instance of this
     38  class which is registered with the BASE core scheduler {@link Application#getScheduler()}.
     39  <p>
     40  This object should be thread-safe since the scheduler creates a new thread each
     41  time the {@link #run()} method is called.
    3642
    3743  @author nicklas
     
    6369  {
    6470    log.info("Checking for jobs to execute");
    65    
    66     SessionControl sc = agent.getSessionControl();
     71    Job job = findJob();
     72    if (job != null)
     73    {
     74      agent.startJob(job);
     75    }
     76  }
     77  public boolean cancel()
     78  {
     79    log.info("Cancelling job queue checker");
     80    return super.cancel();
     81  }
     82  // -------------------------------------------
     83 
     84  private Job findJob()
     85  {
    6786    DbControl dc = null;
     87    Job job = null;
    6888    try
    6989    {
     90      SessionControl sc = agent.getSessionControl();
    7091      dc = sc.newDbControl();
    7192      JobAgent jobAgent = agent.getJobAgent(dc);
     
    81102      else
    82103      {
    83         Job job = jobs.get(0);
    84        
    85         //  Find a free slot to execute the job
    86         Job.ExecutionTime estimated = job.getEstimatedExecutionTime();
    87         Job.ExecutionTime slotToUse = agent.getSlot(estimated);
    88         if (slotToUse == null)
    89         {
    90           log.info("Couldn't find a free slot for executing job: " + job);
    91         }
    92         else
    93         {
    94           log.info("Starting job " + job + " in slot " + slotToUse);
    95 //          log.debug("Slot: " + slotToUse + "; used: " + usedSlots.get(slotToUse) + "; max: " + maxSlots.get(slotToUse));
    96        
    97           // TODO - start process
    98         }
     104        job = jobs.get(0);
    99105      }
     106    }
     107    catch (Throwable t)
     108    {
     109      log.error(t.getMessage(), t);
    100110    }
    101111    finally
     
    103113      if (dc != null) dc.close();
    104114    }
    105    
     115    return job;
    106116  }
    107117 
    108   public boolean cancel()
    109   {
    110     log.info("Cancelling job queue checker");
    111     return super.cancel();
    112   }
    113   // -------------------------------------------
    114 
     118       
    115119}
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/DefaultRequestHandler.java

    r2634 r2641  
    6767    registerHandler(new InfoRequestHandler(agent), "info", "status");
    6868    registerHandler(new StartRequestHandler(agent), "start");
    69     registerHandler(new StopRequestHandler(agent, false), "stop");
     69    registerHandler(new StopRequestHandler(agent), "stop");
    7070    registerHandler(new PauseRequestHandler(agent), "pause");
    7171  }
     
    8989    String answer = null;
    9090    RequestHandler handler = commandHandlers.get(cmd);
    91     if (!agent.isAllowedControl(remote))
     91    if (!agent.isAllowedControl(remote, cmd))
    9292    {
    9393      answer = "FAILED Permission denied: cmd=" + cmd + "; host=" + remote.toString();
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/InfoRequestHandler.java

    r2634 r2641  
    6262      long totalMemory = runtime.maxMemory();
    6363      long usedMemory = runtime.totalMemory() - runtime.freeMemory();
    64       JobAgentInfo info = new JobAgentInfo(!agent.isRunning(), cpu, totalMemory, usedMemory, null);
     64      JobAgentInfo info = new JobAgentInfo(!agent.isRunning(), cpu, totalMemory, usedMemory, agent.getRunningJobs());
    6565      answer = "OK\n"+info.toString();
    6666    }
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/PauseRequestHandler.java

    r2634 r2641  
    3030
    3131/**
    32   This is a request handler for the <code>pause</code> command. It stops the
    33   job agent.
     32  This is a request handler for the <code>pause</code> command. It pauses the
     33  job agent by calling the {@link Agent#pause()} method.
    3434
    3535  @author nicklas
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/StartRequestHandler.java

    r2634 r2641  
    3131/**
    3232  This is a request handler for the <code>start</code> command. It starts the
    33   job agent.
     33  job agent by calling the {@link Agent#start()} method.
    3434
    3535  @author nicklas
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/StopRequestHandler.java

    r2634 r2641  
    2424package net.sf.basedb.clients.jobagent.handlers;
    2525
    26 import java.net.InetAddress;
    2726import java.net.Socket;
    2827
    2928import net.sf.basedb.clients.jobagent.Agent;
    30 import net.sf.basedb.util.SocketUtil;
    3129import net.sf.basedb.util.jobagent.RequestHandler;
    3230
    3331/**
    34   This is a request handler for the <code>stop</code> command. It shuts dow the
    35   job agent, and closes the {@link JobAgentServerConnection} listening for
    36   incoming connections. Thus, it is not possible to start the job agent again
    37   except from the command line.
     32  This is a request handler for the <code>stop</code> command. It shuts down the
     33  job agent by calling the {@link Agent#stop()} method.
    3834
    3935  @author nicklas
     
    4642
    4743  private final Agent agent;
    48   private final boolean allowRemote;
    4944 
    5045  /**
    5146    Create a new stop request handler.
    5247    @param agent The agent
    53     @param allowRemote TRUE to allow remote stop request, FALSE
    54       to only allow stop requests from the local host
    5548  */
    56   public StopRequestHandler(Agent agent, boolean allowRemote)
     49  public StopRequestHandler(Agent agent)
    5750  {
    5851    this.agent = agent;
    59     this.allowRemote = allowRemote;
    6052  }
    6153 
     
    6961    if ("stop".equals(cmd))
    7062    {
    71       InetAddress remote = incoming.getInetAddress();
    72       if (!allowRemote && !SocketUtil.isLocalHost(remote))
    73       {
    74         answer = "FAILED Permission denied: cmd=" + cmd + "; host=" + remote.toString();
    75       }
    76       else
    77       {
    78         agent.stop();
    79       }
     63      agent.stop();
    8064    }
    8165    else
Note: See TracChangeset for help on using the changeset viewer.