Changeset 5447


Ignore:
Timestamp:
Oct 19, 2010, 1:02:17 PM (12 years ago)
Author:
Nicklas Nordborg
Message:

References #1528: Make slot assignment pluggable in job agents

This is now implemented. Select among three different slot managers:

  • InternalSlotManager (the default)
  • MasterSlotManager
  • RemoteSlotManager

Documentation will be added to docbook.

Location:
trunk
Files:
6 added
7 edited

Legend:

Unmodified
Added
Removed
  • trunk/config/dist/jobagent.properties

    r5446 r5447  
    6767
    6868
    69 # =======================
     69# ============================
    7070# Job agent execution settings
    71 # =======================
     71# ============================
    7272
    73 # The name of the executor class that is responsible for starting the job
     73## The name of the executor class that is responsible for starting the job
    7474## The default is ProcessJobExecutor which starts job in a separate process
    7575## The class must implement the net.sf.basedb.clients.JobExecutor interface
     
    103103agent.checkinterval=30
    104104
     105
     106# ============================
     107# Slot manager settings
     108# ============================
     109
     110## The name of the slot manager class that is responsible for assigning a slot
     111## to the job. The default is the InternalSlotManager which assign slots based
     112## on the estimated execution time.
     113agent.slotmanager.class=net.sf.basedb.clients.jobagent.slotmanager.InternalSlotManager
     114
     115# -------------------------------------------------------------------
     116# The master slot manager is like the internal slot manager but also
     117# accepts slot assignment on behalf of other job agents. The other
     118# job agents should use the ExternalSlotManager and connect to the
     119# remote control port of this job agent.
     120# -------------------------------------------------------------------
     121# agent.slotmanager.class=net.sf.basedb.clients.jobagent.slotmanager.MasterSlotManager
     122
     123# -------------------------------------------------------------------
     124# The remote slot manager uses another jobagent to assign slots.
     125# The other job agent should use the MasterSlotManager
     126# Options
     127#  server=The name/ip of the other job agent
     128#  port=The remote control port of the job agent
     129# -------------------------------------------------------------------
     130# agent.slotmanager.class=net.sf.basedb.clients.jobagent.slotmanager.RemoteSlotManager
     131# agent.slotmanager.remote.server=
     132# agent.slotmanager.remote.port=
     133
    105134## Note! A quick job may use a slot from any of the pools reserved for
    106135## slower jobs if there are unused slots. Priority values should be between
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java

    r5446 r5447  
    2525import java.net.InetAddress;
    2626import java.net.UnknownHostException;
     27import java.util.Arrays;
    2728import java.util.Collections;
    2829import java.util.HashMap;
     
    3839import net.sf.basedb.clients.jobagent.handlers.MultiProtocolRequestHandler;
    3940import net.sf.basedb.clients.jobagent.handlers.SignalRequestHandler;
     41import net.sf.basedb.clients.jobagent.slotmanager.InternalSlotManager;
     42import net.sf.basedb.clients.jobagent.slotmanager.Slot;
     43import net.sf.basedb.clients.jobagent.slotmanager.SlotManager;
    4044import net.sf.basedb.core.Application;
    4145import net.sf.basedb.core.DbControl;
     
    222226 
    223227  /**
     228    The default slot manager to use if none has been specified
     229    in the configuration file.
     230  */
     231  public static final Class<? extends SlotManager> DEFAULT_SLOT_MANAGER =
     232    InternalSlotManager.class;
     233 
     234  /**
    224235    The default check interval in seconds.
    225236  */
     
    259270  private final Class<? extends JobExecutor> executorClass;
    260271 
     272  // Slot manager
     273  private final Class<? extends SlotManager> slotManagerClass;
     274 
    261275  private InetAddress serverAddress;
    262276  private JobAgentServerConnection server;
     
    264278  private AgentSignalReceiver signalReceiver;
    265279  private JobExecutor jobExecutor;
     280  private SlotManager slotManager;
    266281
    267282  private TimerTask jobQueueChecker;
     
    270285  private boolean isRunning;
    271286  private SessionControl sc;
    272  
    273   /**
    274     The current number of threads executing in each slot.
    275   */
    276   private final Map<Job.ExecutionTime, Integer> usedSlots;
    277  
    278   /**
    279     The maximum number of threads that are allowed in each slot.
    280   */
    281   private final Map<Job.ExecutionTime, Integer> maxSlots;
    282287 
    283288  /**
     
    346351    this.executorClass = getJobExecutorClass(properties.getProperty("agent.executor.class"));
    347352   
     353    // Slot manager
     354    this.slotManagerClass = getSlotManagerClass(properties.getProperty("agent.slotmanager.class"));
     355   
    348356    // Slots and priorities
    349     this.usedSlots = new HashMap<Job.ExecutionTime, Integer>();
    350     this.maxSlots = new HashMap<Job.ExecutionTime, Integer>();
    351357    this.priorities = new HashMap<Job.ExecutionTime, Integer>();
    352358    for (Job.ExecutionTime et : Job.ExecutionTime.values())
    353359    {
    354       usedSlots.put(et, 0);
    355360      String configName = "agent."+et.name().toLowerCase();
    356       int configuredSlots = Values.getInt(properties.getProperty(configName+".slots"), et.getDefaultSlots());
    357       maxSlots.put(et, configuredSlots);
    358361      int priority = Values.getInt(properties.getProperty(configName+".priority"), et.getDefaultPriority());
    359362      priorities.put(et, priority);
     
    488491  private Map<String, Class<? extends CustomRequestHandler>> getCustomRequestHandlerClasses()
    489492  {
    490     Map<String, Class<? extends CustomRequestHandler>> handlers = new HashMap<String, Class<? extends CustomRequestHandler>>();
     493    log.debug("Loading custom request handler classes");
     494    Map<String, Class<? extends CustomRequestHandler>> handlers =
     495      new HashMap<String, Class<? extends CustomRequestHandler>>();
     496   
    491497    for (String property : properties.stringPropertyNames())
    492498    {
     
    500506          if (!CustomRequestHandler.class.isAssignableFrom(clazz))
    501507          {
    502             log.warn("Class '" + className + "' doesn't implement the CustomRequestHandler interface, ignored!");
     508            log.warn("Class '" + className + "' doesn't implement the CustomRequestHandler interface, ignored.");
    503509          }
    504510          else
     
    510516        catch (Throwable t)
    511517        {
    512           log.warn("Class " + className + " not found, ignored!");
     518          log.warn("Custom request handler class '" + className + "' not found, ignored.");
    513519        }
    514520      }
    515521    }
     522    log.debug("Custom request handler classes lodaed: " + handlers.size());
    516523    return handlers;
    517524  }
     
    559566    try
    560567    {
    561       executor = className == null ?
    562         DEFAULT_JOB_EXECUTOR : (Class<JobExecutor>)Class.forName(className);
    563       if (!JobExecutor.class.isAssignableFrom(executor))
     568      if (className != null)
    564569      {
    565         log.warn("Class " + className + " doesn't implement the JobExecutor interface, using " +
    566           DEFAULT_JOB_EXECUTOR.getName() + " instead");
    567         executor =  DEFAULT_JOB_EXECUTOR;
     570        executor = (Class<JobExecutor>)Class.forName(className);
     571        if (!JobExecutor.class.isAssignableFrom(executor))
     572        {
     573          log.warn("Class '" + className + "' doesn't implement the JobExecutor interface, using '" +
     574            DEFAULT_JOB_EXECUTOR.getName() + "' instead");
     575          executor =  DEFAULT_JOB_EXECUTOR;
     576        }
    568577      }
    569578    }
    570579    catch (Throwable t)
    571580    {
    572       log.warn("Class " + className + " not found, using " + DEFAULT_JOB_EXECUTOR.getName() +
    573         " instead", t);
    574     }
     581      log.warn("Job executor class '" + className + "' not found, using '" + DEFAULT_JOB_EXECUTOR.getName() +
     582        "' instead", t);
     583    }
     584    log.info("Job executor class '" + executor.getName() + "' loaded successfully");
    575585    return executor;
    576586  }
     587 
     588  /**
     589    Get the class object for the configured slot manager. If the
     590    specified class can't be found or doesn't implement the
     591    {@link SlotManager} interface a warning message is logged and
     592    the {@link #DEFAULT_SLOT_MANAGER} is used instead.
     593   
     594    @param className The name of the slot manager class
     595    @return The class object for that class or the default job executor
     596  */
     597  @SuppressWarnings("unchecked")
     598  private Class<? extends SlotManager> getSlotManagerClass(String className)
     599  {
     600    Class<? extends SlotManager> manager = DEFAULT_SLOT_MANAGER;
     601    try
     602    {
     603      if (className != null)
     604      {
     605        manager = (Class<SlotManager>)Class.forName(className);
     606      }
     607      if (!SlotManager.class.isAssignableFrom(manager))
     608      {
     609        log.warn("Class '" + className + "' doesn't implement the SlotManager interface, using '" +
     610          DEFAULT_SLOT_MANAGER.getName() + "' instead");
     611        manager =  DEFAULT_SLOT_MANAGER;
     612      }
     613    }
     614    catch (Throwable t)
     615    {
     616      log.warn("Slot manager class '" + className + "' not found, using '" + DEFAULT_SLOT_MANAGER.getName() +
     617        "' instead", t);
     618    }
     619    log.info("Slot manager class '" + manager.getName() + "' loaded successfully");
     620    return manager;
     621  }
     622
    577623 
    578624  /**
     
    648694      getSessionControl();;
    649695      // Create executor and job queue checker
     696      slotManager = createSlotManager();
    650697      jobExecutor = createJobExecutor();
    651698      jobQueueChecker = createJobQueueChecker();
     
    680727    maybeStopRunningJobs();
    681728    closeJobExecutor();
     729    closeSlotManager();
    682730    closeServer();
    683731    if (sc != null) sc.logout();
     
    790838    if (this.requestHandler == null) return;
    791839    if (handler == null) throw new NullPointerException("handler");
     840    if (log.isDebugEnabled())
     841    {
     842      log.debug("Registering protocol handler " + handler + " for protocols: " + Arrays.asList(protocols));
     843    }
    792844    requestHandler.registerProtocols(handler, protocols);
    793845  }
     
    805857  {
    806858    if (this.requestHandler == null) return;
     859    if (log.isDebugEnabled())
     860    {
     861      log.debug("Unregistering protocol handler for protocols: " + Arrays.asList(protocols));
     862    }
    807863    requestHandler.unregisterProtocols(protocols);
    808864  }
     
    903959    <p>
    904960    Note! This method reserves the slot for the job. It is important that
    905     the {@link #jobDone(Job, Job.ExecutionTime)} method is called once
     961    the {@link #jobDone(Job, Slot)} method is called once
    906962    the job has completed to return the slot to the pool. Failure to do
    907963    so may result in that the agent thinks that all slots are
     
    911967    @return The assigned slot or null if no slot is available
    912968  */
    913   synchronized Job.ExecutionTime getSlot(Job job)
     969  synchronized Slot getSlot(Job job)
    914970  {
    915971    log.debug("Requesting slot for job: " + job);
    916     Job.ExecutionTime requested = job.getEstimatedExecutionTime();
    917     Job.ExecutionTime slotToUse = null;
    918     Job.ExecutionTime[] slots =  Job.ExecutionTime.values();
    919 
    920     // Check all slots from the requested execution time and longer execution times
    921     for (int i = requested.ordinal(); i < slots.length; ++i)
    922     {
    923       if (usedSlots.get(slots[i]) < maxSlots.get(slots[i]))
    924       {
    925         // This slot has free jobs
    926         slotToUse = slots[i];
    927         usedSlots.put(slotToUse, usedSlots.get(slotToUse) + 1);
    928         activeJobs.add(new JobInfo(job, slotToUse));
    929         log.debug("Slot: " + slotToUse + "; used: " + usedSlots.get(slotToUse) + "; max: " + maxSlots.get(slotToUse));
    930         break;
    931       }
    932     }
     972    Slot slotToUse = slotManager.getSlot(job);
     973
     974    if (slotToUse != null)
     975    {
     976      log.debug("Got slot: " + slotToUse);
     977      activeJobs.add(new JobInfo(job, slotToUse.getEstimatedExecutionTime()));
     978    }
     979    else
     980    {
     981      log.debug("No available slot");
     982    }
     983
    933984    // If null we couldn't find a free slot
    934985    return slotToUse;
     
    9601011    @param usedSlot The slot that was used
    9611012  */
    962   synchronized void jobDone(Job job, Job.ExecutionTime usedSlot)
    963   {
    964     usedSlots.put(usedSlot, usedSlots.get(usedSlot) - 1);
    965     activeJobs.remove(new JobInfo(job, usedSlot));
     1013  synchronized void jobDone(Job job, Slot usedSlot)
     1014  {
     1015    if (slotManager != null)
     1016    {
     1017      slotManager.releaseSlot(usedSlot);
     1018      activeJobs.remove(new JobInfo(job, usedSlot.getEstimatedExecutionTime()));
     1019    }
    9661020  }
    9671021 
     
    9691023    Close the service listener.
    9701024  */
    971   private void closeServer()
     1025  private synchronized void closeServer()
    9721026  {
    9731027    log.info("Closing service listener: " + server);
     
    10011055    Close the job queue checker.
    10021056  */
    1003   private void closeJobQueueChecker()
     1057  private synchronized void closeJobQueueChecker()
    10041058  {
    10051059    log.info("Closing job queue checker: " + jobQueueChecker);
     
    10261080    catch (RuntimeException t)
    10271081    {
    1028       log.error("Could not create job executor instance: " + executorClass.getName(), t);
     1082      log.error("Could not create job executor: " + executorClass.getName(), t);
    10291083      executor = null;
    10301084      throw t;
     
    10321086    catch (Exception e)
    10331087    {
    1034       log.error("Could not create job executor instance: " + executorClass.getName(), e);
     1088      log.error("Could not create job executor: " + executorClass.getName(), e);
    10351089      executor = null;
    10361090      throw new RuntimeException(e);
     
    10391093  }
    10401094 
     1095
    10411096  /**
    10421097    Close the job executor.
    10431098  */
    1044   private void closeJobExecutor()
     1099  private synchronized void closeJobExecutor()
    10451100  {
    10461101    log.info("Closing job executor: " + jobExecutor);
     
    10521107  }
    10531108 
     1109 
     1110  /**
     1111    Create a slot manager and initialize it.
     1112    @return A SlotManager instance
     1113    @since 2.16
     1114  */
     1115  private SlotManager createSlotManager()
     1116  {
     1117    log.info("Creating slot manager: " + slotManagerClass.getName());
     1118    SlotManager manager = null;
     1119    try
     1120    {
     1121      manager = slotManagerClass.newInstance();
     1122      manager.init(this);
     1123    }
     1124    catch (RuntimeException t)
     1125    {
     1126      log.error("Could not create slot manager: " + slotManagerClass.getName(), t);
     1127      manager = null;
     1128      throw t;
     1129    }
     1130    catch (Exception e)
     1131    {
     1132      log.error("Could not create slot manager: " + slotManagerClass.getName(), e);
     1133      manager = null;
     1134      throw new RuntimeException(e);
     1135    }
     1136    return manager;
     1137  }
     1138
     1139  /**
     1140    Close the slot manager.
     1141  */
     1142  private synchronized void closeSlotManager()
     1143  {
     1144    log.info("Closing slot manager: " + slotManager);
     1145    if (slotManager != null)
     1146    {
     1147      slotManager.close();
     1148      slotManager = null;
     1149    }
     1150  }
     1151
    10541152  /**
    10551153    Try to stop running jobs by interrupting the threads they are executing in.
    10561154  */
    1057   private void maybeStopRunningJobs()
     1155  private synchronized void maybeStopRunningJobs()
    10581156  {
    10591157    log.info("Stopping running jobs. " + activeJobs.size() + " job(s) still active.");
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/JobRunner.java

    r4512 r5447  
    2424import java.util.Arrays;
    2525
     26import net.sf.basedb.clients.jobagent.slotmanager.Slot;
    2627import net.sf.basedb.core.Application;
    2728import net.sf.basedb.core.DbControl;
     
    8384  public void run()
    8485  {
    85     Job.ExecutionTime slotToUse = agent.getSlot(job);
     86    Slot slotToUse = agent.getSlot(job);
    8687    if (slotToUse == null)
    8788    {
     
    122123      try
    123124      {
    124         jobExecutor.executeJob(sc, agent, j, settings, slotToUse);
     125        jobExecutor.executeJob(sc, agent, j, settings, slotToUse.getEstimatedExecutionTime());
    125126      }
    126127      catch (Throwable t)
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/DummyJobExecutor.java

    r5060 r5447  
    3030import net.sf.basedb.core.JobAgentSettings;
    3131import net.sf.basedb.core.SessionControl;
     32import net.sf.basedb.core.signal.SignalHandler;
    3233import net.sf.basedb.core.signal.SignalReceiver;
    3334import net.sf.basedb.core.signal.ThreadSignalHandler;
     
    9394    boolean aborted = false;
    9495    Throwable error = null;
     96    SignalReceiver signalReceiver = agent.getSignalReceiver();
     97    SignalHandler signalHandler = null;
    9598    try
    9699    {
     
    101104      if (wait > 0)
    102105      {
    103         SignalReceiver signalReceiver = agent.getSignalReceiver();
     106        signalHandler = new ThreadSignalHandler();
    104107        job.setSignalTransporter(signalReceiver.getSignalTransporterClass(),
    105           signalReceiver.registerSignalHandler(new ThreadSignalHandler()));
     108          signalReceiver.registerSignalHandler(signalHandler));
    106109        job.setProgress(50, "Halfway; waiting " + wait + " seconds");
    107110        dc.commit();
     
    142145    finally
    143146    {
     147      if (signalHandler != null) signalReceiver.unregisterSignalHandler(signalHandler);
    144148      if (dc != null) dc.close();
    145149    }
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/MultiProtocolRequestHandler.java

    r5446 r5447  
    2929
    3030import net.sf.basedb.clients.jobagent.Agent;
     31import net.sf.basedb.util.Values;
    3132import net.sf.basedb.util.jobagent.RequestHandler;
    3233
     
    127128    if (log.isInfoEnabled())
    128129    {
    129       log.info("Returning '" + answer.replaceAll("\n", "\\\\n") + "' for command '" + cmd +"' to " + remote);
     130      log.info("Returning '" + Values.getString(answer).replaceAll("\n", "\\\\n") + "' for command '" + cmd +"' to " + remote);
    130131    }
    131132    return answer;
  • trunk/src/core/net/sf/basedb/util/jobagent/JobAgentConnection.java

    r5446 r5447  
    2626import java.net.InetSocketAddress;
    2727import java.net.Socket;
     28import java.util.ArrayList;
     29import java.util.HashMap;
     30import java.util.List;
     31import java.util.Map;
     32import java.util.regex.Matcher;
     33import java.util.regex.Pattern;
    2834
    2935import net.sf.basedb.util.SocketUtil;
     
    4248{
    4349
     50  /**
     51    Regexp used to parse a typical answer.
     52  */
     53  private static final Pattern HEADER_REGEXP = Pattern.compile("(.*):(.*)");
     54 
     55  /**
     56    Utility method for parsing a 'typical' answer from a job agent.
     57    The typical answer is a string that contains a key-value pair on each
     58    line, separated by a colon. This method allows multiple entries for
     59    the same key.
     60   
     61    @param answer The answer
     62    @return A map with the key as index and a list with all values
     63    @since 2.16
     64  */
     65  public static Map<String, List<String>> parseAnswer(String answer)
     66  {
     67    Map<String, List<String>> headers = new HashMap<String, List<String>>();
     68    Matcher m = HEADER_REGEXP.matcher(answer);
     69    while (m.find())
     70    {
     71      String header = m.group(1);
     72      String value = m.group(2);
     73      List<String> values = headers.get(header);
     74      if (values == null)
     75      {
     76        values = new ArrayList<String>();
     77        headers.put(header, values);
     78      }
     79      values.add(value);
     80    }
     81    return headers;
     82  }
     83
     84  /**
     85    Utility method for parsing a 'typical' answer from a job agent.
     86    The typical answer is a string that contains a key-value pair on each
     87    line, separated by a colon. This method allows only a single entry for
     88    the same key.
     89   
     90    @param answer The answer
     91    @return A map with the key as index to the value
     92    @since 2.16
     93  */
     94  public static Map<String, String> parseSimpleAnswer(String answer)
     95  {
     96    Map<String, String> headers = new HashMap<String, String>();
     97    Matcher m = HEADER_REGEXP.matcher(answer);
     98    while (m.find())
     99    {
     100      String header = m.group(1);
     101      String value = m.group(2);
     102      headers.put(header, value);
     103    }
     104    return headers;
     105  }
     106 
     107 
    44108  private final String server;
    45109  private final int port;
  • trunk/src/core/net/sf/basedb/util/jobagent/JobAgentInfo.java

    r4515 r5447  
    2222package net.sf.basedb.util.jobagent;
    2323
    24 import java.util.ArrayList;
    25 import java.util.HashMap;
    2624import java.util.HashSet;
    2725import java.util.List;
    2826import java.util.Map;
    2927import java.util.Set;
    30 import java.util.regex.Matcher;
    31 import java.util.regex.Pattern;
    3228
    3329import net.sf.basedb.util.Values;
     
    4440{
    4541
    46   private static final Pattern HEADER_REGEXP = Pattern.compile("(.*):(.*)");
    47  
    4842  private final long created;
    4943  private final Boolean paused;
     
    9185  public JobAgentInfo(String answer)
    9286  {
    93     Map<String, List<String>> headers = getHeaders(answer);
     87    Map<String, List<String>> headers = JobAgentConnection.parseAnswer(answer);
    9488    this.created = System.currentTimeMillis();
    9589    String tempPaused = getValue(headers, "Status");
     
    201195    return sb.toString();
    202196  }
    203 
    204   private Map<String, List<String>> getHeaders(String answer)
    205   {
    206     Map<String, List<String>> headers = new HashMap<String, List<String>>();
    207     Matcher m = HEADER_REGEXP.matcher(answer);
    208     while (m.find())
    209     {
    210       String header = m.group(1);
    211       String value = m.group(2);
    212       List<String> values = headers.get(header);
    213       if (values == null)
    214       {
    215         values = new ArrayList<String>();
    216         headers.put(header, values);
    217       }
    218       values.add(value);
    219     }
    220     return headers;
    221   }
    222197 
    223198  private String getValue(Map<String, List<String>> headers, String header)
Note: See TracChangeset for help on using the changeset viewer.