Ignore:
Timestamp:
Oct 19, 2010, 1:02:17 PM (12 years ago)
Author:
Nicklas Nordborg
Message:

References #1528: Make slot assignment pluggable in job agents

This is now implemented. Select among three different slot managers:

  • InternalSlotManager (the default)
  • MasterSlotManager
  • RemoteSlotManager

Documentation will be added to docbook.

Location:
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent
Files:
6 added
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java

    r5446 r5447  
    2525import java.net.InetAddress;
    2626import java.net.UnknownHostException;
     27import java.util.Arrays;
    2728import java.util.Collections;
    2829import java.util.HashMap;
     
    3839import net.sf.basedb.clients.jobagent.handlers.MultiProtocolRequestHandler;
    3940import net.sf.basedb.clients.jobagent.handlers.SignalRequestHandler;
     41import net.sf.basedb.clients.jobagent.slotmanager.InternalSlotManager;
     42import net.sf.basedb.clients.jobagent.slotmanager.Slot;
     43import net.sf.basedb.clients.jobagent.slotmanager.SlotManager;
    4044import net.sf.basedb.core.Application;
    4145import net.sf.basedb.core.DbControl;
     
    222226 
    223227  /**
     228    The default slot manager to use if none has been specified
     229    in the configuration file.
     230  */
     231  public static final Class<? extends SlotManager> DEFAULT_SLOT_MANAGER =
     232    InternalSlotManager.class;
     233 
     234  /**
    224235    The default check interval in seconds.
    225236  */
     
    259270  private final Class<? extends JobExecutor> executorClass;
    260271 
     272  // Slot manager
     273  private final Class<? extends SlotManager> slotManagerClass;
     274 
    261275  private InetAddress serverAddress;
    262276  private JobAgentServerConnection server;
     
    264278  private AgentSignalReceiver signalReceiver;
    265279  private JobExecutor jobExecutor;
     280  private SlotManager slotManager;
    266281
    267282  private TimerTask jobQueueChecker;
     
    270285  private boolean isRunning;
    271286  private SessionControl sc;
    272  
    273   /**
    274     The current number of threads executing in each slot.
    275   */
    276   private final Map<Job.ExecutionTime, Integer> usedSlots;
    277  
    278   /**
    279     The maximum number of threads that are allowed in each slot.
    280   */
    281   private final Map<Job.ExecutionTime, Integer> maxSlots;
    282287 
    283288  /**
     
    346351    this.executorClass = getJobExecutorClass(properties.getProperty("agent.executor.class"));
    347352   
     353    // Slot manager
     354    this.slotManagerClass = getSlotManagerClass(properties.getProperty("agent.slotmanager.class"));
     355   
    348356    // Slots and priorities
    349     this.usedSlots = new HashMap<Job.ExecutionTime, Integer>();
    350     this.maxSlots = new HashMap<Job.ExecutionTime, Integer>();
    351357    this.priorities = new HashMap<Job.ExecutionTime, Integer>();
    352358    for (Job.ExecutionTime et : Job.ExecutionTime.values())
    353359    {
    354       usedSlots.put(et, 0);
    355360      String configName = "agent."+et.name().toLowerCase();
    356       int configuredSlots = Values.getInt(properties.getProperty(configName+".slots"), et.getDefaultSlots());
    357       maxSlots.put(et, configuredSlots);
    358361      int priority = Values.getInt(properties.getProperty(configName+".priority"), et.getDefaultPriority());
    359362      priorities.put(et, priority);
     
    488491  private Map<String, Class<? extends CustomRequestHandler>> getCustomRequestHandlerClasses()
    489492  {
    490     Map<String, Class<? extends CustomRequestHandler>> handlers = new HashMap<String, Class<? extends CustomRequestHandler>>();
     493    log.debug("Loading custom request handler classes");
     494    Map<String, Class<? extends CustomRequestHandler>> handlers =
     495      new HashMap<String, Class<? extends CustomRequestHandler>>();
     496   
    491497    for (String property : properties.stringPropertyNames())
    492498    {
     
    500506          if (!CustomRequestHandler.class.isAssignableFrom(clazz))
    501507          {
    502             log.warn("Class '" + className + "' doesn't implement the CustomRequestHandler interface, ignored!");
     508            log.warn("Class '" + className + "' doesn't implement the CustomRequestHandler interface, ignored.");
    503509          }
    504510          else
     
    510516        catch (Throwable t)
    511517        {
    512           log.warn("Class " + className + " not found, ignored!");
     518          log.warn("Custom request handler class '" + className + "' not found, ignored.");
    513519        }
    514520      }
    515521    }
     522    log.debug("Custom request handler classes lodaed: " + handlers.size());
    516523    return handlers;
    517524  }
     
    559566    try
    560567    {
    561       executor = className == null ?
    562         DEFAULT_JOB_EXECUTOR : (Class<JobExecutor>)Class.forName(className);
    563       if (!JobExecutor.class.isAssignableFrom(executor))
     568      if (className != null)
    564569      {
    565         log.warn("Class " + className + " doesn't implement the JobExecutor interface, using " +
    566           DEFAULT_JOB_EXECUTOR.getName() + " instead");
    567         executor =  DEFAULT_JOB_EXECUTOR;
     570        executor = (Class<JobExecutor>)Class.forName(className);
     571        if (!JobExecutor.class.isAssignableFrom(executor))
     572        {
     573          log.warn("Class '" + className + "' doesn't implement the JobExecutor interface, using '" +
     574            DEFAULT_JOB_EXECUTOR.getName() + "' instead");
     575          executor =  DEFAULT_JOB_EXECUTOR;
     576        }
    568577      }
    569578    }
    570579    catch (Throwable t)
    571580    {
    572       log.warn("Class " + className + " not found, using " + DEFAULT_JOB_EXECUTOR.getName() +
    573         " instead", t);
    574     }
     581      log.warn("Job executor class '" + className + "' not found, using '" + DEFAULT_JOB_EXECUTOR.getName() +
     582        "' instead", t);
     583    }
     584    log.info("Job executor class '" + executor.getName() + "' loaded successfully");
    575585    return executor;
    576586  }
     587 
     588  /**
     589    Get the class object for the configured slot manager. If the
     590    specified class can't be found or doesn't implement the
     591    {@link SlotManager} interface a warning message is logged and
     592    the {@link #DEFAULT_SLOT_MANAGER} is used instead.
     593   
     594    @param className The name of the slot manager class
     595    @return The class object for that class or the default job executor
     596  */
     597  @SuppressWarnings("unchecked")
     598  private Class<? extends SlotManager> getSlotManagerClass(String className)
     599  {
     600    Class<? extends SlotManager> manager = DEFAULT_SLOT_MANAGER;
     601    try
     602    {
     603      if (className != null)
     604      {
     605        manager = (Class<SlotManager>)Class.forName(className);
     606      }
     607      if (!SlotManager.class.isAssignableFrom(manager))
     608      {
     609        log.warn("Class '" + className + "' doesn't implement the SlotManager interface, using '" +
     610          DEFAULT_SLOT_MANAGER.getName() + "' instead");
     611        manager =  DEFAULT_SLOT_MANAGER;
     612      }
     613    }
     614    catch (Throwable t)
     615    {
     616      log.warn("Slot manager class '" + className + "' not found, using '" + DEFAULT_SLOT_MANAGER.getName() +
     617        "' instead", t);
     618    }
     619    log.info("Slot manager class '" + manager.getName() + "' loaded successfully");
     620    return manager;
     621  }
     622
    577623 
    578624  /**
     
    648694      getSessionControl();;
    649695      // Create executor and job queue checker
     696      slotManager = createSlotManager();
    650697      jobExecutor = createJobExecutor();
    651698      jobQueueChecker = createJobQueueChecker();
     
    680727    maybeStopRunningJobs();
    681728    closeJobExecutor();
     729    closeSlotManager();
    682730    closeServer();
    683731    if (sc != null) sc.logout();
     
    790838    if (this.requestHandler == null) return;
    791839    if (handler == null) throw new NullPointerException("handler");
     840    if (log.isDebugEnabled())
     841    {
     842      log.debug("Registering protocol handler " + handler + " for protocols: " + Arrays.asList(protocols));
     843    }
    792844    requestHandler.registerProtocols(handler, protocols);
    793845  }
     
    805857  {
    806858    if (this.requestHandler == null) return;
     859    if (log.isDebugEnabled())
     860    {
     861      log.debug("Unregistering protocol handler for protocols: " + Arrays.asList(protocols));
     862    }
    807863    requestHandler.unregisterProtocols(protocols);
    808864  }
     
    903959    <p>
    904960    Note! This method reserves the slot for the job. It is important that
    905     the {@link #jobDone(Job, Job.ExecutionTime)} method is called once
     961    the {@link #jobDone(Job, Slot)} method is called once
    906962    the job has completed to return the slot to the pool. Failure to do
    907963    so may result in that the agent thinks that all slots are
     
    911967    @return The assigned slot or null if no slot is available
    912968  */
    913   synchronized Job.ExecutionTime getSlot(Job job)
     969  synchronized Slot getSlot(Job job)
    914970  {
    915971    log.debug("Requesting slot for job: " + job);
    916     Job.ExecutionTime requested = job.getEstimatedExecutionTime();
    917     Job.ExecutionTime slotToUse = null;
    918     Job.ExecutionTime[] slots =  Job.ExecutionTime.values();
    919 
    920     // Check all slots from the requested execution time and longer execution times
    921     for (int i = requested.ordinal(); i < slots.length; ++i)
    922     {
    923       if (usedSlots.get(slots[i]) < maxSlots.get(slots[i]))
    924       {
    925         // This slot has free jobs
    926         slotToUse = slots[i];
    927         usedSlots.put(slotToUse, usedSlots.get(slotToUse) + 1);
    928         activeJobs.add(new JobInfo(job, slotToUse));
    929         log.debug("Slot: " + slotToUse + "; used: " + usedSlots.get(slotToUse) + "; max: " + maxSlots.get(slotToUse));
    930         break;
    931       }
    932     }
     972    Slot slotToUse = slotManager.getSlot(job);
     973
     974    if (slotToUse != null)
     975    {
     976      log.debug("Got slot: " + slotToUse);
     977      activeJobs.add(new JobInfo(job, slotToUse.getEstimatedExecutionTime()));
     978    }
     979    else
     980    {
     981      log.debug("No available slot");
     982    }
     983
    933984    // If null we couldn't find a free slot
    934985    return slotToUse;
     
    9601011    @param usedSlot The slot that was used
    9611012  */
    962   synchronized void jobDone(Job job, Job.ExecutionTime usedSlot)
    963   {
    964     usedSlots.put(usedSlot, usedSlots.get(usedSlot) - 1);
    965     activeJobs.remove(new JobInfo(job, usedSlot));
     1013  synchronized void jobDone(Job job, Slot usedSlot)
     1014  {
     1015    if (slotManager != null)
     1016    {
     1017      slotManager.releaseSlot(usedSlot);
     1018      activeJobs.remove(new JobInfo(job, usedSlot.getEstimatedExecutionTime()));
     1019    }
    9661020  }
    9671021 
     
    9691023    Close the service listener.
    9701024  */
    971   private void closeServer()
     1025  private synchronized void closeServer()
    9721026  {
    9731027    log.info("Closing service listener: " + server);
     
    10011055    Close the job queue checker.
    10021056  */
    1003   private void closeJobQueueChecker()
     1057  private synchronized void closeJobQueueChecker()
    10041058  {
    10051059    log.info("Closing job queue checker: " + jobQueueChecker);
     
    10261080    catch (RuntimeException t)
    10271081    {
    1028       log.error("Could not create job executor instance: " + executorClass.getName(), t);
     1082      log.error("Could not create job executor: " + executorClass.getName(), t);
    10291083      executor = null;
    10301084      throw t;
     
    10321086    catch (Exception e)
    10331087    {
    1034       log.error("Could not create job executor instance: " + executorClass.getName(), e);
     1088      log.error("Could not create job executor: " + executorClass.getName(), e);
    10351089      executor = null;
    10361090      throw new RuntimeException(e);
     
    10391093  }
    10401094 
     1095
    10411096  /**
    10421097    Close the job executor.
    10431098  */
    1044   private void closeJobExecutor()
     1099  private synchronized void closeJobExecutor()
    10451100  {
    10461101    log.info("Closing job executor: " + jobExecutor);
     
    10521107  }
    10531108 
     1109 
     1110  /**
     1111    Create a slot manager and initialize it.
     1112    @return A SlotManager instance
     1113    @since 2.16
     1114  */
     1115  private SlotManager createSlotManager()
     1116  {
     1117    log.info("Creating slot manager: " + slotManagerClass.getName());
     1118    SlotManager manager = null;
     1119    try
     1120    {
     1121      manager = slotManagerClass.newInstance();
     1122      manager.init(this);
     1123    }
     1124    catch (RuntimeException t)
     1125    {
     1126      log.error("Could not create slot manager: " + slotManagerClass.getName(), t);
     1127      manager = null;
     1128      throw t;
     1129    }
     1130    catch (Exception e)
     1131    {
     1132      log.error("Could not create slot manager: " + slotManagerClass.getName(), e);
     1133      manager = null;
     1134      throw new RuntimeException(e);
     1135    }
     1136    return manager;
     1137  }
     1138
     1139  /**
     1140    Close the slot manager.
     1141  */
     1142  private synchronized void closeSlotManager()
     1143  {
     1144    log.info("Closing slot manager: " + slotManager);
     1145    if (slotManager != null)
     1146    {
     1147      slotManager.close();
     1148      slotManager = null;
     1149    }
     1150  }
     1151
    10541152  /**
    10551153    Try to stop running jobs by interrupting the threads they are executing in.
    10561154  */
    1057   private void maybeStopRunningJobs()
     1155  private synchronized void maybeStopRunningJobs()
    10581156  {
    10591157    log.info("Stopping running jobs. " + activeJobs.size() + " job(s) still active.");
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/JobRunner.java

    r4512 r5447  
    2424import java.util.Arrays;
    2525
     26import net.sf.basedb.clients.jobagent.slotmanager.Slot;
    2627import net.sf.basedb.core.Application;
    2728import net.sf.basedb.core.DbControl;
     
    8384  public void run()
    8485  {
    85     Job.ExecutionTime slotToUse = agent.getSlot(job);
     86    Slot slotToUse = agent.getSlot(job);
    8687    if (slotToUse == null)
    8788    {
     
    122123      try
    123124      {
    124         jobExecutor.executeJob(sc, agent, j, settings, slotToUse);
     125        jobExecutor.executeJob(sc, agent, j, settings, slotToUse.getEstimatedExecutionTime());
    125126      }
    126127      catch (Throwable t)
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/DummyJobExecutor.java

    r5060 r5447  
    3030import net.sf.basedb.core.JobAgentSettings;
    3131import net.sf.basedb.core.SessionControl;
     32import net.sf.basedb.core.signal.SignalHandler;
    3233import net.sf.basedb.core.signal.SignalReceiver;
    3334import net.sf.basedb.core.signal.ThreadSignalHandler;
     
    9394    boolean aborted = false;
    9495    Throwable error = null;
     96    SignalReceiver signalReceiver = agent.getSignalReceiver();
     97    SignalHandler signalHandler = null;
    9598    try
    9699    {
     
    101104      if (wait > 0)
    102105      {
    103         SignalReceiver signalReceiver = agent.getSignalReceiver();
     106        signalHandler = new ThreadSignalHandler();
    104107        job.setSignalTransporter(signalReceiver.getSignalTransporterClass(),
    105           signalReceiver.registerSignalHandler(new ThreadSignalHandler()));
     108          signalReceiver.registerSignalHandler(signalHandler));
    106109        job.setProgress(50, "Halfway; waiting " + wait + " seconds");
    107110        dc.commit();
     
    142145    finally
    143146    {
     147      if (signalHandler != null) signalReceiver.unregisterSignalHandler(signalHandler);
    144148      if (dc != null) dc.close();
    145149    }
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/MultiProtocolRequestHandler.java

    r5446 r5447  
    2929
    3030import net.sf.basedb.clients.jobagent.Agent;
     31import net.sf.basedb.util.Values;
    3132import net.sf.basedb.util.jobagent.RequestHandler;
    3233
     
    127128    if (log.isInfoEnabled())
    128129    {
    129       log.info("Returning '" + answer.replaceAll("\n", "\\\\n") + "' for command '" + cmd +"' to " + remote);
     130      log.info("Returning '" + Values.getString(answer).replaceAll("\n", "\\\\n") + "' for command '" + cmd +"' to " + remote);
    130131    }
    131132    return answer;
Note: See TracChangeset for help on using the changeset viewer.