Changeset 5447
- Timestamp:
- Oct 19, 2010, 1:02:17 PM (12 years ago)
- Location:
- trunk
- Files:
-
- 6 added
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/config/dist/jobagent.properties
r5446 r5447 67 67 68 68 69 # ======================= 69 # ============================ 70 70 # Job agent execution settings 71 # ======================= 71 # ============================ 72 72 73 # The name of the executor class that is responsible for starting the job73 ## The name of the executor class that is responsible for starting the job 74 74 ## The default is ProcessJobExecutor which starts job in a separate process 75 75 ## The class must implement the net.sf.basedb.clients.JobExecutor interface … … 103 103 agent.checkinterval=30 104 104 105 106 # ============================ 107 # Slot manager settings 108 # ============================ 109 110 ## The name of the slot manager class that is responsible for assigning a slot 111 ## to the job. The default is the InternalSlotManager which assign slots based 112 ## on the estimated execution time. 113 agent.slotmanager.class=net.sf.basedb.clients.jobagent.slotmanager.InternalSlotManager 114 115 # ------------------------------------------------------------------- 116 # The master slot manager is like the internal slot manager but also 117 # accepts slot assignment on behalf of other job agents. The other 118 # job agents should use the ExternalSlotManager and connect to the 119 # remote control port of this job agent. 120 # ------------------------------------------------------------------- 121 # agent.slotmanager.class=net.sf.basedb.clients.jobagent.slotmanager.MasterSlotManager 122 123 # ------------------------------------------------------------------- 124 # The remote slot manager uses another jobagent to assign slots. 125 # The other job agent should use the MasterSlotManager 126 # Options 127 # server=The name/ip of the other job agent 128 # port=The remote control port of the job agent 129 # ------------------------------------------------------------------- 130 # agent.slotmanager.class=net.sf.basedb.clients.jobagent.slotmanager.RemoteSlotManager 131 # agent.slotmanager.remote.server= 132 # agent.slotmanager.remote.port= 133 105 134 ## Note! A quick job may use a slot from any of the pools reserved for 106 135 ## slower jobs if there are unused slots. Priority values should be between -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java
r5446 r5447 25 25 import java.net.InetAddress; 26 26 import java.net.UnknownHostException; 27 import java.util.Arrays; 27 28 import java.util.Collections; 28 29 import java.util.HashMap; … … 38 39 import net.sf.basedb.clients.jobagent.handlers.MultiProtocolRequestHandler; 39 40 import net.sf.basedb.clients.jobagent.handlers.SignalRequestHandler; 41 import net.sf.basedb.clients.jobagent.slotmanager.InternalSlotManager; 42 import net.sf.basedb.clients.jobagent.slotmanager.Slot; 43 import net.sf.basedb.clients.jobagent.slotmanager.SlotManager; 40 44 import net.sf.basedb.core.Application; 41 45 import net.sf.basedb.core.DbControl; … … 222 226 223 227 /** 228 The default slot manager to use if none has been specified 229 in the configuration file. 230 */ 231 public static final Class<? extends SlotManager> DEFAULT_SLOT_MANAGER = 232 InternalSlotManager.class; 233 234 /** 224 235 The default check interval in seconds. 225 236 */ … … 259 270 private final Class<? extends JobExecutor> executorClass; 260 271 272 // Slot manager 273 private final Class<? extends SlotManager> slotManagerClass; 274 261 275 private InetAddress serverAddress; 262 276 private JobAgentServerConnection server; … … 264 278 private AgentSignalReceiver signalReceiver; 265 279 private JobExecutor jobExecutor; 280 private SlotManager slotManager; 266 281 267 282 private TimerTask jobQueueChecker; … … 270 285 private boolean isRunning; 271 286 private SessionControl sc; 272 273 /**274 The current number of threads executing in each slot.275 */276 private final Map<Job.ExecutionTime, Integer> usedSlots;277 278 /**279 The maximum number of threads that are allowed in each slot.280 */281 private final Map<Job.ExecutionTime, Integer> maxSlots;282 287 283 288 /** … … 346 351 this.executorClass = getJobExecutorClass(properties.getProperty("agent.executor.class")); 347 352 353 // Slot manager 354 this.slotManagerClass = getSlotManagerClass(properties.getProperty("agent.slotmanager.class")); 355 348 356 // Slots and priorities 349 this.usedSlots = new HashMap<Job.ExecutionTime, Integer>();350 this.maxSlots = new HashMap<Job.ExecutionTime, Integer>();351 357 this.priorities = new HashMap<Job.ExecutionTime, Integer>(); 352 358 for (Job.ExecutionTime et : Job.ExecutionTime.values()) 353 359 { 354 usedSlots.put(et, 0);355 360 String configName = "agent."+et.name().toLowerCase(); 356 int configuredSlots = Values.getInt(properties.getProperty(configName+".slots"), et.getDefaultSlots());357 maxSlots.put(et, configuredSlots);358 361 int priority = Values.getInt(properties.getProperty(configName+".priority"), et.getDefaultPriority()); 359 362 priorities.put(et, priority); … … 488 491 private Map<String, Class<? extends CustomRequestHandler>> getCustomRequestHandlerClasses() 489 492 { 490 Map<String, Class<? extends CustomRequestHandler>> handlers = new HashMap<String, Class<? extends CustomRequestHandler>>(); 493 log.debug("Loading custom request handler classes"); 494 Map<String, Class<? extends CustomRequestHandler>> handlers = 495 new HashMap<String, Class<? extends CustomRequestHandler>>(); 496 491 497 for (String property : properties.stringPropertyNames()) 492 498 { … … 500 506 if (!CustomRequestHandler.class.isAssignableFrom(clazz)) 501 507 { 502 log.warn("Class '" + className + "' doesn't implement the CustomRequestHandler interface, ignored !");508 log.warn("Class '" + className + "' doesn't implement the CustomRequestHandler interface, ignored."); 503 509 } 504 510 else … … 510 516 catch (Throwable t) 511 517 { 512 log.warn("C lass " + className + " not found, ignored!");518 log.warn("Custom request handler class '" + className + "' not found, ignored."); 513 519 } 514 520 } 515 521 } 522 log.debug("Custom request handler classes lodaed: " + handlers.size()); 516 523 return handlers; 517 524 } … … 559 566 try 560 567 { 561 executor = className == null ? 562 DEFAULT_JOB_EXECUTOR : (Class<JobExecutor>)Class.forName(className); 563 if (!JobExecutor.class.isAssignableFrom(executor)) 568 if (className != null) 564 569 { 565 log.warn("Class " + className + " doesn't implement the JobExecutor interface, using " + 566 DEFAULT_JOB_EXECUTOR.getName() + " instead"); 567 executor = DEFAULT_JOB_EXECUTOR; 570 executor = (Class<JobExecutor>)Class.forName(className); 571 if (!JobExecutor.class.isAssignableFrom(executor)) 572 { 573 log.warn("Class '" + className + "' doesn't implement the JobExecutor interface, using '" + 574 DEFAULT_JOB_EXECUTOR.getName() + "' instead"); 575 executor = DEFAULT_JOB_EXECUTOR; 576 } 568 577 } 569 578 } 570 579 catch (Throwable t) 571 580 { 572 log.warn("Class " + className + " not found, using " + DEFAULT_JOB_EXECUTOR.getName() + 573 " instead", t); 574 } 581 log.warn("Job executor class '" + className + "' not found, using '" + DEFAULT_JOB_EXECUTOR.getName() + 582 "' instead", t); 583 } 584 log.info("Job executor class '" + executor.getName() + "' loaded successfully"); 575 585 return executor; 576 586 } 587 588 /** 589 Get the class object for the configured slot manager. If the 590 specified class can't be found or doesn't implement the 591 {@link SlotManager} interface a warning message is logged and 592 the {@link #DEFAULT_SLOT_MANAGER} is used instead. 593 594 @param className The name of the slot manager class 595 @return The class object for that class or the default job executor 596 */ 597 @SuppressWarnings("unchecked") 598 private Class<? extends SlotManager> getSlotManagerClass(String className) 599 { 600 Class<? extends SlotManager> manager = DEFAULT_SLOT_MANAGER; 601 try 602 { 603 if (className != null) 604 { 605 manager = (Class<SlotManager>)Class.forName(className); 606 } 607 if (!SlotManager.class.isAssignableFrom(manager)) 608 { 609 log.warn("Class '" + className + "' doesn't implement the SlotManager interface, using '" + 610 DEFAULT_SLOT_MANAGER.getName() + "' instead"); 611 manager = DEFAULT_SLOT_MANAGER; 612 } 613 } 614 catch (Throwable t) 615 { 616 log.warn("Slot manager class '" + className + "' not found, using '" + DEFAULT_SLOT_MANAGER.getName() + 617 "' instead", t); 618 } 619 log.info("Slot manager class '" + manager.getName() + "' loaded successfully"); 620 return manager; 621 } 622 577 623 578 624 /** … … 648 694 getSessionControl();; 649 695 // Create executor and job queue checker 696 slotManager = createSlotManager(); 650 697 jobExecutor = createJobExecutor(); 651 698 jobQueueChecker = createJobQueueChecker(); … … 680 727 maybeStopRunningJobs(); 681 728 closeJobExecutor(); 729 closeSlotManager(); 682 730 closeServer(); 683 731 if (sc != null) sc.logout(); … … 790 838 if (this.requestHandler == null) return; 791 839 if (handler == null) throw new NullPointerException("handler"); 840 if (log.isDebugEnabled()) 841 { 842 log.debug("Registering protocol handler " + handler + " for protocols: " + Arrays.asList(protocols)); 843 } 792 844 requestHandler.registerProtocols(handler, protocols); 793 845 } … … 805 857 { 806 858 if (this.requestHandler == null) return; 859 if (log.isDebugEnabled()) 860 { 861 log.debug("Unregistering protocol handler for protocols: " + Arrays.asList(protocols)); 862 } 807 863 requestHandler.unregisterProtocols(protocols); 808 864 } … … 903 959 <p> 904 960 Note! This method reserves the slot for the job. It is important that 905 the {@link #jobDone(Job, Job.ExecutionTime)} method is called once961 the {@link #jobDone(Job, Slot)} method is called once 906 962 the job has completed to return the slot to the pool. Failure to do 907 963 so may result in that the agent thinks that all slots are … … 911 967 @return The assigned slot or null if no slot is available 912 968 */ 913 synchronized Job.ExecutionTimegetSlot(Job job)969 synchronized Slot getSlot(Job job) 914 970 { 915 971 log.debug("Requesting slot for job: " + job); 916 Job.ExecutionTime requested = job.getEstimatedExecutionTime(); 917 Job.ExecutionTime slotToUse = null; 918 Job.ExecutionTime[] slots = Job.ExecutionTime.values(); 919 920 // Check all slots from the requested execution time and longer execution times 921 for (int i = requested.ordinal(); i < slots.length; ++i) 922 { 923 if (usedSlots.get(slots[i]) < maxSlots.get(slots[i])) 924 { 925 // This slot has free jobs 926 slotToUse = slots[i]; 927 usedSlots.put(slotToUse, usedSlots.get(slotToUse) + 1); 928 activeJobs.add(new JobInfo(job, slotToUse)); 929 log.debug("Slot: " + slotToUse + "; used: " + usedSlots.get(slotToUse) + "; max: " + maxSlots.get(slotToUse)); 930 break; 931 } 932 } 972 Slot slotToUse = slotManager.getSlot(job); 973 974 if (slotToUse != null) 975 { 976 log.debug("Got slot: " + slotToUse); 977 activeJobs.add(new JobInfo(job, slotToUse.getEstimatedExecutionTime())); 978 } 979 else 980 { 981 log.debug("No available slot"); 982 } 983 933 984 // If null we couldn't find a free slot 934 985 return slotToUse; … … 960 1011 @param usedSlot The slot that was used 961 1012 */ 962 synchronized void jobDone(Job job, Job.ExecutionTime usedSlot) 963 { 964 usedSlots.put(usedSlot, usedSlots.get(usedSlot) - 1); 965 activeJobs.remove(new JobInfo(job, usedSlot)); 1013 synchronized void jobDone(Job job, Slot usedSlot) 1014 { 1015 if (slotManager != null) 1016 { 1017 slotManager.releaseSlot(usedSlot); 1018 activeJobs.remove(new JobInfo(job, usedSlot.getEstimatedExecutionTime())); 1019 } 966 1020 } 967 1021 … … 969 1023 Close the service listener. 970 1024 */ 971 private void closeServer()1025 private synchronized void closeServer() 972 1026 { 973 1027 log.info("Closing service listener: " + server); … … 1001 1055 Close the job queue checker. 1002 1056 */ 1003 private void closeJobQueueChecker()1057 private synchronized void closeJobQueueChecker() 1004 1058 { 1005 1059 log.info("Closing job queue checker: " + jobQueueChecker); … … 1026 1080 catch (RuntimeException t) 1027 1081 { 1028 log.error("Could not create job executor instance: " + executorClass.getName(), t);1082 log.error("Could not create job executor: " + executorClass.getName(), t); 1029 1083 executor = null; 1030 1084 throw t; … … 1032 1086 catch (Exception e) 1033 1087 { 1034 log.error("Could not create job executor instance: " + executorClass.getName(), e);1088 log.error("Could not create job executor: " + executorClass.getName(), e); 1035 1089 executor = null; 1036 1090 throw new RuntimeException(e); … … 1039 1093 } 1040 1094 1095 1041 1096 /** 1042 1097 Close the job executor. 1043 1098 */ 1044 private void closeJobExecutor()1099 private synchronized void closeJobExecutor() 1045 1100 { 1046 1101 log.info("Closing job executor: " + jobExecutor); … … 1052 1107 } 1053 1108 1109 1110 /** 1111 Create a slot manager and initialize it. 1112 @return A SlotManager instance 1113 @since 2.16 1114 */ 1115 private SlotManager createSlotManager() 1116 { 1117 log.info("Creating slot manager: " + slotManagerClass.getName()); 1118 SlotManager manager = null; 1119 try 1120 { 1121 manager = slotManagerClass.newInstance(); 1122 manager.init(this); 1123 } 1124 catch (RuntimeException t) 1125 { 1126 log.error("Could not create slot manager: " + slotManagerClass.getName(), t); 1127 manager = null; 1128 throw t; 1129 } 1130 catch (Exception e) 1131 { 1132 log.error("Could not create slot manager: " + slotManagerClass.getName(), e); 1133 manager = null; 1134 throw new RuntimeException(e); 1135 } 1136 return manager; 1137 } 1138 1139 /** 1140 Close the slot manager. 1141 */ 1142 private synchronized void closeSlotManager() 1143 { 1144 log.info("Closing slot manager: " + slotManager); 1145 if (slotManager != null) 1146 { 1147 slotManager.close(); 1148 slotManager = null; 1149 } 1150 } 1151 1054 1152 /** 1055 1153 Try to stop running jobs by interrupting the threads they are executing in. 1056 1154 */ 1057 private void maybeStopRunningJobs()1155 private synchronized void maybeStopRunningJobs() 1058 1156 { 1059 1157 log.info("Stopping running jobs. " + activeJobs.size() + " job(s) still active."); -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/JobRunner.java
r4512 r5447 24 24 import java.util.Arrays; 25 25 26 import net.sf.basedb.clients.jobagent.slotmanager.Slot; 26 27 import net.sf.basedb.core.Application; 27 28 import net.sf.basedb.core.DbControl; … … 83 84 public void run() 84 85 { 85 Job.ExecutionTimeslotToUse = agent.getSlot(job);86 Slot slotToUse = agent.getSlot(job); 86 87 if (slotToUse == null) 87 88 { … … 122 123 try 123 124 { 124 jobExecutor.executeJob(sc, agent, j, settings, slotToUse );125 jobExecutor.executeJob(sc, agent, j, settings, slotToUse.getEstimatedExecutionTime()); 125 126 } 126 127 catch (Throwable t) -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/DummyJobExecutor.java
r5060 r5447 30 30 import net.sf.basedb.core.JobAgentSettings; 31 31 import net.sf.basedb.core.SessionControl; 32 import net.sf.basedb.core.signal.SignalHandler; 32 33 import net.sf.basedb.core.signal.SignalReceiver; 33 34 import net.sf.basedb.core.signal.ThreadSignalHandler; … … 93 94 boolean aborted = false; 94 95 Throwable error = null; 96 SignalReceiver signalReceiver = agent.getSignalReceiver(); 97 SignalHandler signalHandler = null; 95 98 try 96 99 { … … 101 104 if (wait > 0) 102 105 { 103 SignalReceiver signalReceiver = agent.getSignalReceiver();106 signalHandler = new ThreadSignalHandler(); 104 107 job.setSignalTransporter(signalReceiver.getSignalTransporterClass(), 105 signalReceiver.registerSignalHandler( new ThreadSignalHandler()));108 signalReceiver.registerSignalHandler(signalHandler)); 106 109 job.setProgress(50, "Halfway; waiting " + wait + " seconds"); 107 110 dc.commit(); … … 142 145 finally 143 146 { 147 if (signalHandler != null) signalReceiver.unregisterSignalHandler(signalHandler); 144 148 if (dc != null) dc.close(); 145 149 } -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/MultiProtocolRequestHandler.java
r5446 r5447 29 29 30 30 import net.sf.basedb.clients.jobagent.Agent; 31 import net.sf.basedb.util.Values; 31 32 import net.sf.basedb.util.jobagent.RequestHandler; 32 33 … … 127 128 if (log.isInfoEnabled()) 128 129 { 129 log.info("Returning '" + answer.replaceAll("\n", "\\\\n") + "' for command '" + cmd +"' to " + remote);130 log.info("Returning '" + Values.getString(answer).replaceAll("\n", "\\\\n") + "' for command '" + cmd +"' to " + remote); 130 131 } 131 132 return answer; -
trunk/src/core/net/sf/basedb/util/jobagent/JobAgentConnection.java
r5446 r5447 26 26 import java.net.InetSocketAddress; 27 27 import java.net.Socket; 28 import java.util.ArrayList; 29 import java.util.HashMap; 30 import java.util.List; 31 import java.util.Map; 32 import java.util.regex.Matcher; 33 import java.util.regex.Pattern; 28 34 29 35 import net.sf.basedb.util.SocketUtil; … … 42 48 { 43 49 50 /** 51 Regexp used to parse a typical answer. 52 */ 53 private static final Pattern HEADER_REGEXP = Pattern.compile("(.*):(.*)"); 54 55 /** 56 Utility method for parsing a 'typical' answer from a job agent. 57 The typical answer is a string that contains a key-value pair on each 58 line, separated by a colon. This method allows multiple entries for 59 the same key. 60 61 @param answer The answer 62 @return A map with the key as index and a list with all values 63 @since 2.16 64 */ 65 public static Map<String, List<String>> parseAnswer(String answer) 66 { 67 Map<String, List<String>> headers = new HashMap<String, List<String>>(); 68 Matcher m = HEADER_REGEXP.matcher(answer); 69 while (m.find()) 70 { 71 String header = m.group(1); 72 String value = m.group(2); 73 List<String> values = headers.get(header); 74 if (values == null) 75 { 76 values = new ArrayList<String>(); 77 headers.put(header, values); 78 } 79 values.add(value); 80 } 81 return headers; 82 } 83 84 /** 85 Utility method for parsing a 'typical' answer from a job agent. 86 The typical answer is a string that contains a key-value pair on each 87 line, separated by a colon. This method allows only a single entry for 88 the same key. 89 90 @param answer The answer 91 @return A map with the key as index to the value 92 @since 2.16 93 */ 94 public static Map<String, String> parseSimpleAnswer(String answer) 95 { 96 Map<String, String> headers = new HashMap<String, String>(); 97 Matcher m = HEADER_REGEXP.matcher(answer); 98 while (m.find()) 99 { 100 String header = m.group(1); 101 String value = m.group(2); 102 headers.put(header, value); 103 } 104 return headers; 105 } 106 107 44 108 private final String server; 45 109 private final int port; -
trunk/src/core/net/sf/basedb/util/jobagent/JobAgentInfo.java
r4515 r5447 22 22 package net.sf.basedb.util.jobagent; 23 23 24 import java.util.ArrayList;25 import java.util.HashMap;26 24 import java.util.HashSet; 27 25 import java.util.List; 28 26 import java.util.Map; 29 27 import java.util.Set; 30 import java.util.regex.Matcher;31 import java.util.regex.Pattern;32 28 33 29 import net.sf.basedb.util.Values; … … 44 40 { 45 41 46 private static final Pattern HEADER_REGEXP = Pattern.compile("(.*):(.*)");47 48 42 private final long created; 49 43 private final Boolean paused; … … 91 85 public JobAgentInfo(String answer) 92 86 { 93 Map<String, List<String>> headers = getHeaders(answer);87 Map<String, List<String>> headers = JobAgentConnection.parseAnswer(answer); 94 88 this.created = System.currentTimeMillis(); 95 89 String tempPaused = getValue(headers, "Status"); … … 201 195 return sb.toString(); 202 196 } 203 204 private Map<String, List<String>> getHeaders(String answer)205 {206 Map<String, List<String>> headers = new HashMap<String, List<String>>();207 Matcher m = HEADER_REGEXP.matcher(answer);208 while (m.find())209 {210 String header = m.group(1);211 String value = m.group(2);212 List<String> values = headers.get(header);213 if (values == null)214 {215 values = new ArrayList<String>();216 headers.put(header, values);217 }218 values.add(value);219 }220 return headers;221 }222 197 223 198 private String getValue(Map<String, List<String>> headers, String header)
Note: See TracChangeset
for help on using the changeset viewer.