Changeset 2641
- Timestamp:
- Sep 14, 2006, 12:17:01 PM (17 years ago)
- Location:
- trunk
- Files:
-
- 7 added
- 15 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/build.xml
r2639 r2641 646 646 </copy> 647 647 <chmod dir="${dist.bin}" includes="*.sh" perm="a+x"/> 648 <copy tofile="${dist.bin}/jobagent.properties" file="${jobagent.src}/jobagent.properties.in" overwrite="true"/> 649 <chmod file="${dist.bin}/jobagent.properties" perm="600"/> 648 650 </target> 649 651 -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java
r2634 r2641 26 26 import java.io.IOException; 27 27 import java.net.InetAddress; 28 import java.net.UnknownHostException; 29 import java.util.Collections; 30 import java.util.HashMap; 31 import java.util.HashSet; 32 import java.util.Map; 28 33 import java.util.Properties; 34 import java.util.Set; 29 35 import java.util.TimerTask; 30 36 37 import net.sf.basedb.clients.jobagent.executors.ProcessJobExecutor; 31 38 import net.sf.basedb.clients.jobagent.handlers.DefaultRequestHandler; 32 39 import net.sf.basedb.core.Application; 33 40 import net.sf.basedb.core.DbControl; 41 import net.sf.basedb.core.InvalidDataException; 42 import net.sf.basedb.core.InvalidUseOfNullException; 34 43 import net.sf.basedb.core.Job; 35 44 import net.sf.basedb.core.JobAgent; 45 import net.sf.basedb.core.Project; 36 46 import net.sf.basedb.core.SessionControl; 37 47 … … 43 53 44 54 /** 45 This is the actual agent that is checking the database for jobs. 55 This is the actual job agent application. It delegates the actual 56 checking of the job queue to the {@link JobQueueChecker} class. 57 It includes a listener service that can be used for remote control 58 of the agent. It is for example possible to send <code>start</code>, 59 <code>stop</code> and <code>pause</code> requests. 60 <p> 61 This class is responsible for creating a {@link JobExecutor} 62 object and to delegate the actual execution of a job to it. The agent keep 63 track of the running jobs and makes sure that only the configured 64 number of jobs are running at the same time. 65 <p> 66 The agent is configured at construction time with parameters from 67 a {@link Properties} object. 68 69 <table> 70 <tr> 71 <th>Parameter</th> 72 <th>Default value</th> 73 <th>Description</th> 74 </tr> 75 <tr> 76 <td>agent.user</td> 77 <td>-</td> 78 <td>The username to use for logging in to BASE (required)</td> 79 </tr> 80 <tr> 81 <td>agent.password</td> 82 <td>-</td> 83 <td>The password to use for logging in to BASE (required)</td> 84 </tr> 85 <tr> 86 <td>agent.id</td> 87 <td>-</td> 88 <td rowspan="3"> 89 The external ID (required), name and description properties of the 90 corresponding {@link JobAgent} item in the BASE database 91 </td> 92 </tr> 93 <tr> 94 <td>agent.name</td> 95 <td>-</td> 96 </tr> 97 <tr> 98 <td>agent.description</td> 99 <td>-</td> 100 </tr> 101 <tr> 102 <td>agent.port</td> 103 <td>47822</td> 104 <td>The port the remote control service listener is listening to</td> 105 </tr> 106 <tr> 107 <td>agent.remotecontrol</td> 108 <td>-</td> 109 <td> 110 A comma-separated list of computer that are allowed remote control. It is recommended 111 that the web server is put in this list. The local host is always allowed control 112 and doesn't have to be in this list. 113 </td> 114 </tr> 115 <tr> 116 <td>agent.allowremote.stop</td> 117 <td>false</td> 118 <td> 119 If <code>stop</code> requests should be allowed from remote hosts or not. 120 Note! A stop request shuts down the agent making it impossible to start it 121 again using remote control. 122 </td> 123 </tr> 124 <tr> 125 <td>agent.allowremote.start</td> 126 <td>true</td> 127 <td>If <code>start</code> requests should be allowed from remote hosts or not</td> 128 </tr> 129 <tr> 130 <td>agent.allowremote.pause</td> 131 <td>true</td> 132 <td>If <code>pause</code> requests should be allowed from remote hosts or not</td> 133 </tr> 134 <tr> 135 <td>agent.executor.class</td> 136 <td>{@link ProcessJobExecutor}</td> 137 <td> 138 The name of a class that is responsible for starting the a job once 139 the agent has determined that is allowed to be exected. The class must 140 implement the {@link JobExecutor} interface and provide a public noargument 141 constructor. Note that only one instance of this class exists for an agent. 142 It must be thread-safe since the jobs are executed in parallel threads. 143 </td> 144 </tr> 145 <tr> 146 <td>agent.executor.init</td> 147 <td>-</td> 148 <td> 149 Initialisation parameters for the executor class. They are sent to the 150 {@link JobExecutor#init(String)} method. For a meaning and syntax description 151 of this string see the excutor implementation you are using. 152 </td> 153 </tr> 154 <tr> 155 <td>agent.checkinterval</td> 156 <td>30</td> 157 <td>Number of seconds between checks to the database for new jobs.</td> 158 </tr> 159 <tr> 160 <td>agent.shortest.slots</td> 161 <td>1</td> 162 <td>Number of slots to reserve for jobs that take < 1 minute to execute</td> 163 </tr> 164 <tr> 165 <td>agent.shortest.priority</td> 166 <td>4</td> 167 <td>The thread priority of jobs in this slot. See {@link Thread#setPriority(int)}.</td> 168 </tr> 169 <tr> 170 <td>agent.short.slots</td> 171 <td>1</td> 172 <td>Number of slots to reserve for jobs that take < 10 minute to execute</td> 173 </tr> 174 <tr> 175 <td>agent.short.priority</td> 176 <td>4</td> 177 <td>The thread priority of jobs in this slot. See {@link Thread#setPriority(int)}.</td> 178 </tr> 179 <tr> 180 <td>agent.medium.slots</td> 181 <td>2</td> 182 <td>Number of slots to reserve for jobs that take < 1 hour to execute</td> 183 </tr> 184 <tr> 185 <td>agent.medium.priority</td> 186 <td>3</td> 187 <td>The thread priority of jobs in this slot. See {@link Thread#setPriority(int)}.</td> 188 </tr> 189 <tr> 190 <td>agent.long.slots</td> 191 <td>2</td> 192 <td>Number of slots to reserve for jobs that take > 1 hour to execute</td> 193 </tr> 194 <tr> 195 <td>agent.long.priority</td> 196 <td>3</td> 197 <td>The thread priority of jobs in this slot. See {@link Thread#setPriority(int)}.</td> 198 </tr> 199 </table> 46 200 47 201 @author nicklas … … 53 207 54 208 /** 209 The default job executor to use if none has been specified 210 in the configuration file. 211 */ 212 public static final Class<? extends JobExecutor> DEFAULT_JOB_EXECUTOR = 213 ProcessJobExecutor.class; 214 215 /** 216 The default check interval in seconds. 217 */ 218 public static final int DEFAULT_CHECK_INTERVAL = 30; 219 220 /** 55 221 Log job agent events. 56 222 */ 57 223 private static final org.apache.log4j.Logger log = 58 224 org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.Agent"); 59 225 226 /** 227 Log job agent server events. 228 */ 229 private static final org.apache.log4j.Logger logServer = 230 org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.JobAgentServerConnection"); 231 232 // Base settings 60 233 private final String login; 61 234 private final String password; … … 63 236 private final String name; 64 237 private final String description; 238 239 // Service listener settings 65 240 private final Integer port; 66 67 241 private final Set<InetAddress> remote; 242 private final boolean allowRemoteStop; 243 private final boolean allowRemotePause; 244 private final boolean allowRemoteStart; 245 246 // Job execution settings 247 private final long checkInterval; 248 private final Class<? extends JobExecutor> executorClass; 249 private final String executorInitParameters; 250 251 private InetAddress serverAddress; 68 252 private JobAgentServerConnection server; 69 253 private RequestHandler requestHandler; 254 private JobExecutor jobExecutor; 70 255 71 256 private TimerTask jobQueueChecker; 72 private long checkInterval = 10000;73 257 74 258 75 259 private boolean isRunning; 76 77 78 260 private SessionControl sc; 79 261 80 81 public Agent(int port) 82 { 83 this.port = port; 84 this.login = null; 85 this.password = null; 86 this.externalId = null; 87 this.name = null; 88 this.description = null; 89 } 90 262 /** 263 The current number of threads executing in each slot. 264 */ 265 private final Map<Job.ExecutionTime, Integer> usedSlots; 266 267 /** 268 The maximum number of threads that are allowed in each slot. 269 */ 270 private final Map<Job.ExecutionTime, Integer> maxSlots; 271 272 /** 273 The thread priority to use when executing jobs in each slot. 274 */ 275 private final Map<Job.ExecutionTime, Integer> priorities; 276 277 private final Set<Integer> activeJobs; 278 279 /** 280 The group were all job runners are placed. 281 */ 282 private final ThreadGroup runnersGroup; 283 284 /** 285 Create a new job agent. See class documentation for information about 286 configuration parameters. 287 288 @param properties A properties object containing the configuration 289 parameters for the agent 290 */ 91 291 public Agent(Properties properties) 92 292 { 293 // BASE settings 93 294 this.login = properties.getProperty("agent.user"); 94 295 this.password = properties.getProperty("agent.password"); … … 96 297 this.name = properties.getProperty("agent.name"); 97 298 this.description = properties.getProperty("agent.description"); 299 300 // Listener service settings 98 301 this.port = Values.getInt(properties.getProperty("agent.port"), JobAgent.DEFAULT_PORT); 99 validate(); 302 this.remote = getRemoteAddresses(properties.getProperty("agent.remotecontrol")); 303 this.allowRemoteStop = Values.getBoolean(properties.getProperty("agent.allowremote.stop"), false); 304 this.allowRemotePause = Values.getBoolean(properties.getProperty("agent.allowremote.pause"), true); 305 this.allowRemoteStart = Values.getBoolean(properties.getProperty("agent.allowremote.start"), true); 306 307 // Job execution settings 308 this.checkInterval = Values.getInt(properties.getProperty("agent.checkinterval"), DEFAULT_CHECK_INTERVAL); 309 this.executorClass = getJobExecutorClass(properties.getProperty("agent.executor.class")); 310 this.executorInitParameters = properties.getProperty("agent.executor.init"); 311 312 // Slots and priorities 313 this.usedSlots = new HashMap<Job.ExecutionTime, Integer>(); 314 this.maxSlots = new HashMap<Job.ExecutionTime, Integer>(); 315 this.priorities = new HashMap<Job.ExecutionTime, Integer>(); 316 for (Job.ExecutionTime et : Job.ExecutionTime.values()) 317 { 318 usedSlots.put(et, 0); 319 String configName = "agent."+et.name().toLowerCase(); 320 int configuredSlots = Values.getInt(properties.getProperty(configName+".slots"), et.getDefaultSlots()); 321 maxSlots.put(et, configuredSlots); 322 int priority = Values.getInt(properties.getProperty(configName+".priority"), et.getDefaultPriority()); 323 priorities.put(et, priority); 324 } 325 326 validateConfiguration(); 327 328 // Configure the job runners thread group 329 runnersGroup = new ThreadGroup("Agent."+externalId); 330 runnersGroup.setDaemon(false); 331 332 this.activeJobs = new HashSet<Integer>(); 100 333 isRunning = false; 101 334 } 102 335 103 private void validate() 104 {} 105 336 /** 337 Get the <code>agent.id</code> configuration value. 338 @return The ID 339 */ 340 public String getId() 341 { 342 return externalId; 343 } 344 345 /** 346 Get the <code>agent.name</code> configuration value. 347 @return The name or the ID if the name is null 348 */ 349 public String getName() 350 { 351 return name == null ? externalId : name; 352 } 353 354 /** 355 Get the <code>agent.description</code> configuration value. 356 @return The description 357 */ 358 public String getDescription() 359 { 360 return description; 361 } 362 363 /** 364 Get the <code>agent.port</code> configuration value. 365 @return The port number 366 */ 367 public int getPort() 368 { 369 return port; 370 } 371 372 /** 373 Get the host name of the server where the job agent is running. 374 @see SocketUtil#getPublicLocalHost() 375 */ 376 public String getServerName() 377 { 378 if (serverAddress == null) 379 { 380 serverAddress = SocketUtil.getPublicLocalHost(); 381 } 382 return serverAddress.getHostName(); 383 } 384 385 /** 386 Split the string at commas and try to create an {@link InetAddress} 387 for each part. Parts that turn out to be invalid are skipped and 388 a warning message is logged. 389 @param config The string to split 390 @return A set of <code>InetAdress</code> object 391 */ 392 private Set<InetAddress> getRemoteAddresses(String config) 393 { 394 Set<InetAddress> remote = new HashSet<InetAddress>(); 395 if (config != null) 396 { 397 for (String host : config.split(",")) 398 { 399 try 400 { 401 remote.add(InetAddress.getByName(host)); 402 } 403 catch (UnknownHostException ex) 404 { 405 log.warn("Unknown host: " + host, ex); 406 } 407 } 408 } 409 return remote; 410 } 411 412 /** 413 Get the class object for the configured job executor. If the 414 specified class can't be found or doesn't implement the 415 {@link JobExecutor} interface a warning message is logged and 416 the {@link #DEFAULT_JOB_EXECUTOR} is used instead. 417 418 @param className The name of the job executor class 419 @return The class object for that class or the default job executor 420 */ 421 @SuppressWarnings("unchecked") 422 private Class<? extends JobExecutor> getJobExecutorClass(String className) 423 { 424 Class<? extends JobExecutor> executor = DEFAULT_JOB_EXECUTOR; 425 try 426 { 427 executor = (Class<JobExecutor>)Class.forName(className); 428 if (!JobExecutor.class.isAssignableFrom(executor)) 429 { 430 log.warn("Class " + className + " doesn't implement the JobExecutor interface, using " + 431 DEFAULT_JOB_EXECUTOR.getName() + " instead"); 432 executor = DEFAULT_JOB_EXECUTOR; 433 } 434 } 435 catch (ClassNotFoundException ex) 436 { 437 log.warn("Class " + className + " not found, using " + DEFAULT_JOB_EXECUTOR.getName() + 438 " instead", ex); 439 } 440 return executor; 441 } 442 443 /** 444 Validate that all required configuration parameters have been specified. 445 @throws InvalidDataException If parameters are missing or have incorrect values 446 */ 447 private void validateConfiguration() 448 throws InvalidDataException 449 { 450 if (login == null) throw new InvalidUseOfNullException("agent.user"); 451 if (password == null) throw new InvalidUseOfNullException("agent.password"); 452 if (externalId == null) throw new InvalidUseOfNullException("agent.id"); 453 } 454 455 /** 456 Start the listener service that listens for control commands such 457 as <code>start</code>, <code>stop</code>, <code>pause</code> and 458 <code>info</code>. The listener service is only required if the job 459 agent needs to respond to remote control commands. 460 <p> 461 Note! The {@link AgentController} which is the default controller for 462 agents always uses the listener service for communicating with the job 463 agent. 464 <p> 465 Note! The listener service is started in a separate thread and this method 466 returns as soon as the network connections are set up. 467 468 @param requestHandler A {@link RequestHandler} that handles the 469 incoming requsts, or null to use the {@link DefaultRequestHandler} 470 @throws IOException If there is an error when starting the service 471 */ 106 472 public synchronized void service(RequestHandler requestHandler) 107 473 throws IOException … … 111 477 { 112 478 this.requestHandler = requestHandler == null ? new DefaultRequestHandler(this) : requestHandler; 113 this.server = new JobAgentServerConnection(port, this.requestHandler );479 this.server = new JobAgentServerConnection(port, this.requestHandler, logServer); 114 480 server.open(); 115 481 } 116 482 } 117 483 484 /** 485 Start the job agent. This method will register a {@link JobQueueChecker} 486 object with the BASE core scheduler {@link Application#getScheduler()}. The 487 timer will call the {@link JobQueueChecker#run()} method at intervals 488 specified by the <code>agent.checkinterval</code> configuration settings. 489 <p> 490 Note! The <code>JobQueueChecker</code> will run in a separate thread 491 and this method return immediately after registering the object with the 492 scheduler. 493 <p> 494 Note! This method also creats a single instance of a {@link JobExecutor}. 495 The actual class to use is specified by the <code>agent.jobexecutor.class</code> 496 configuration setting. The default job executor is {@link ProcessJobExecutor}. 497 */ 118 498 public synchronized void start() 119 499 { … … 122 502 { 123 503 isRunning = true; 124 Scheduler scheduler = Application.getScheduler(); 125 jobQueueChecker = scheduler.schedule(new JobQueueChecker(this), checkInterval, checkInterval, false); 126 } 127 } 128 504 jobExecutor = createJobExecutor(); 505 jobQueueChecker = createJobQueueChecker(); 506 } 507 } 508 509 /** 510 Stop the job agent. This method will: 511 512 <ul> 513 <li>Cancel the {@link JobQueueChecker} that was registered with the BASE 514 core scheduler by the {@link #start()} method. 515 <li>Call the {@link JobExecutor#close()} method on the job executor 516 created by the {@link #start()} method. 517 <li>Close the service listener started by the {@link #service(RequestHandler)} 518 method. 519 <li>Try to stop all running jobs by calling {@link Thread#interrupt()} 520 on all job threads. 521 <li>Logout and stop the BASE {@link Application}. 522 </ul> 523 524 Unless no other things are running in the same virtual machine as this 525 job agent the virtual machine should exit as a result from calling this 526 method. 527 */ 129 528 public synchronized void stop() 130 529 { … … 132 531 isRunning = false; 133 532 closeJobQueueChecker(); 533 maybeStopRunningJobs(); 534 closeJobExecutor(); 134 535 closeServer(); 536 if (sc != null) sc.logout(); 135 537 Application.stop(); 136 538 } 137 539 540 /** 541 Pause the job agent. This method will: 542 543 <ul> 544 <li>Cancel the {@link JobQueueChecker} that was registered with the BASE 545 core scheduler by the {@link #start()} method. 546 <li>Call the {@link JobExecutor#close()} method on the job executor 547 created by the {@link #start()} method. 548 <li>Logout and stop the BASE {@link Application}. 549 </ul> 550 551 This method will not try to stop the running jobs or shut down the 552 BASE application. Calling {@link #start()} again will start the job 553 agent again. 554 555 @see #stop() 556 @see #start() 557 */ 138 558 public synchronized void pause() 139 559 { … … 141 561 isRunning = false; 142 562 closeJobQueueChecker(); 143 } 144 563 closeJobExecutor(); 564 if (sc != null) sc.logout(); 565 } 566 567 /** 568 Check if the job agent is running or not. 569 @return TRUE if the job agent is running, FALSE otherwise 570 */ 145 571 public boolean isRunning() 146 572 { … … 149 575 150 576 /** 577 Get a set containing the ID:s of the jobs that are currently 578 beeing executed by this job agent. 579 @return A set of integers 580 */ 581 public Set<Integer> getRunningJobs() 582 { 583 return Collections.unmodifiableSet(activeJobs); 584 } 585 586 /** 151 587 Check if the computer specified by the given address is allowed to 152 control this job agent. 588 control this job agent. A computer is allowed to control this 589 job agent if it's name or ip-address is listed in the 590 <code>agent.remotecontrol</code> property. This method is called from 591 the {@link DefaultRequestHandler#handleCmd(Socket, String)} method 592 to determine if a service request should be accepted or not. 593 <p> 594 Note! The <code>stop</code>, <code>start</code> and <code>pause</code> 595 commands are only allowed from the local host unless the <code>agent.allowremote.stop</code>, 596 <code>agent.allowremote.start</code> and <code>agent.allowremote.pause</code> 597 are set to a true values. 598 <p> 599 Note! The local host doesn't have to be listed in the <code>agent.remotecontrol</code> 600 property. Requests are always allowed from the local host. 601 153 602 @param remote The address to the remote computer 603 @param cmd The command the remote computer wants to execute 154 604 @return TRUE if the computer is allowed, FALSE otherwise 155 605 */ 156 public boolean isAllowedControl(InetAddress remote) 157 { 158 return true; // TODO - implement 159 } 160 606 public boolean isAllowedControl(InetAddress remote, String cmd) 607 { 608 boolean allow = false; 609 boolean isLocal = SocketUtil.isLocalHost(remote); 610 if ("stop".equals(cmd) && !allowRemoteStop) 611 { 612 allow = isLocal; 613 } 614 else if ("start".equals(cmd) && !allowRemoteStart) 615 { 616 allow = isLocal; 617 } 618 else if ("pause".equals(cmd) && !allowRemotePause) 619 { 620 allow = isLocal; 621 } 622 else 623 { 624 allow = isLocal || this.remote.contains(remote); 625 } 626 return allow; 627 } 628 629 /** 630 Get a session control with the configured user logged in. 631 @return A session control 632 */ 161 633 public SessionControl getSessionControl() 162 634 { … … 166 638 sc = Application.newSessionControl("net.sf.basedb.clients.jobagent", 167 639 SocketUtil.getLocalHost().toString(), null); 640 } 641 if (!sc.isLoggedIn()) 642 { 168 643 sc.login(login, password, null, false); 169 644 } … … 171 646 } 172 647 648 /** 649 Get a session control where the owner of the job has been impersonated and 650 the active project has been set if needed. 651 @param job The job to get the impersonated session control for 652 @return A session control object 653 */ 654 public SessionControl getImpersonatedSessionControl(Job job) 655 { 656 SessionControl sc = getSessionControl(); 657 SessionControl impersonated = null; 658 DbControl dc = null; 659 try 660 { 661 // Reload job and impersonate the owner 662 dc = sc.newDbControl(); 663 job = Job.getById(dc, job.getId()); 664 impersonated = sc.impersonateLogin(job, "Running job: " + job.getName()); 665 dc.close(); 666 667 // Set the active project if any 668 int projectId = job.getActiveProjectId(); 669 if (projectId != 0) 670 { 671 try 672 { 673 dc = impersonated.newDbControl(); 674 Project activeProject = Project.getById(dc, projectId); 675 impersonated.setActiveProject(activeProject); 676 dc.close(); 677 } 678 catch (Throwable t) 679 { 680 log.error("Exception while setting active project to " + projectId + 681 ". Continuing with no active project.", t); 682 } 683 } 684 } 685 finally 686 { 687 if (dc != null) dc.close(); 688 } 689 return impersonated; 690 } 691 692 /** 693 Get the {@link JobAgent} item corresponding to this agent. The 694 job agent is looked up by the {@link JobAgent#getByExternalId(DbControl, String)} 695 method where the external ID is given by the <code>agent.id</code> property. 696 @param dc The DbControl to use for database acces 697 @return A JobAgent item 698 */ 173 699 public JobAgent getJobAgent(DbControl dc) 174 700 { … … 176 702 } 177 703 178 public Job.ExecutionTime getSlot(Job.ExecutionTime estimated) 179 { 180 return Job.ExecutionTime.SHORT; // TODO - implement 181 } 182 704 /** 705 Find a free slot for executing a job. If there is a free slot 706 in the requested pool the requested slot is returned, otherwise 707 the method checks the slower-running pools for free slots 708 and returns one of those. If there are no free slot available 709 null is returned. 710 <p> 711 Note! This method reserves the slot for the job. It is important that 712 the {@link #jobDone(Job, Job.ExecutionTime)} method is called once 713 the job has completed to return the slot to the pool. Failure to do 714 so may result in that the agent thinks that all slots are 715 used when they are not. 716 717 @param requested The slot the job requested 718 @return The assigned slot or null if no slot is available 719 */ 720 synchronized Job.ExecutionTime getSlot(Job.ExecutionTime requested) 721 { 722 log.debug("Requesting slot for job: " + requested); 723 Job.ExecutionTime slotToUse = null; 724 Job.ExecutionTime[] slots = Job.ExecutionTime.values(); 725 726 // Check all slots from the requested execution time and longer execution times 727 for (int i = requested.ordinal(); i < slots.length; ++i) 728 { 729 if (usedSlots.get(slots[i]) < maxSlots.get(slots[i])) 730 { 731 // This slot has free jobs 732 slotToUse = slots[i]; 733 usedSlots.put(slotToUse, usedSlots.get(slotToUse) + 1); 734 log.debug("Slot: " + slotToUse + "; used: " + usedSlots.get(slotToUse) + "; max: " + maxSlots.get(slotToUse)); 735 break; 736 } 737 } 738 // If null we couldn't find a free slot 739 return slotToUse; 740 } 741 742 /** 743 Start a job. This method will create a new thread for running the job 744 and return immediately. If the job for some reason can't be started, 745 for example, if there are no available slots, a log messsage will be 746 written but nothing else will happen. No exception will be thrown to 747 the caller of this method. 748 749 @param job The job to start 750 */ 751 public void startJob(Job job) 752 { 753 JobRunner runner = new JobRunner(this, job, jobExecutor); 754 Thread t = new Thread(runnersGroup, runner); 755 t.setDaemon(false); 756 t.setPriority(priorities.get(job.getEstimatedExecutionTime())); 757 t.start(); 758 } 759 760 /** 761 Used by {@link JobRunner} to tell that a job has finished executing and 762 that the used slot should be released. 763 764 @param job The job that has finished 765 @param usedSlot The slot that was used 766 */ 767 synchronized void jobDone(Job job, Job.ExecutionTime usedSlot) 768 { 769 usedSlots.put(usedSlot, usedSlots.get(usedSlot) - 1); 770 activeJobs.remove(job.getId()); 771 } 772 773 /** 774 Close the service listener. 775 */ 183 776 private void closeServer() 184 777 { 778 log.info("Closing service listener: " + server); 185 779 if (server != null) 186 780 { … … 190 784 } 191 785 786 /** 787 Create a job queue checker. and register it with the BASE core 788 scheduler. 789 @see Application#getScheduler() 790 @see JobQueueChecker 791 */ 792 private TimerTask createJobQueueChecker() 793 { 794 log.info("Creating job queue checker; checkInterval=" + checkInterval + " s"); 795 Scheduler scheduler = Application.getScheduler(); 796 return scheduler.schedule( 797 new JobQueueChecker(this), 1000*checkInterval, 1000*checkInterval, false); 798 } 799 800 /** 801 Close the job queue checker. 802 */ 192 803 private void closeJobQueueChecker() 193 804 { 805 log.info("Closing job queue checker: " + jobQueueChecker); 194 806 if (jobQueueChecker != null) 195 807 { … … 198 810 } 199 811 } 812 813 /** 814 Create a job executor and initialise it. 815 @return A JobExecutor instance or null if none could be created 816 */ 817 private JobExecutor createJobExecutor() 818 { 819 log.info("Creating job executor: " + executorClass.getName()); 820 JobExecutor executor = null; 821 try 822 { 823 executor = executorClass.newInstance(); 824 executor.init(executorInitParameters); 825 } 826 catch (Throwable t) 827 { 828 log.error("Could not create job executor instance: " + executorClass.getName(), t); 829 executor = null; 830 } 831 return executor; 832 } 833 834 /** 835 Close the job executor. 836 */ 837 private void closeJobExecutor() 838 { 839 log.info("Closing job executor: " + jobExecutor); 840 if (jobExecutor != null) 841 { 842 jobExecutor.close(); 843 jobExecutor = null; 844 } 845 } 846 847 /** 848 Try to stop running jobs by interrupting the threads thaey are executing in. 849 */ 850 private void maybeStopRunningJobs() 851 { 852 log.info("Stopping running jobs. " + activeJobs.size() + " job(s) still active."); 853 // Interrupt all threads. Hopefully they will do as we tell them. 854 runnersGroup.interrupt(); 855 } 856 200 857 201 858 } -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/AgentController.java
r2634 r2641 31 31 import java.util.Properties; 32 32 33 import net.sf.basedb.core.DbControl; 34 import net.sf.basedb.core.ItemNotFoundException; 33 35 import net.sf.basedb.core.JobAgent; 36 import net.sf.basedb.core.SessionControl; 37 import net.sf.basedb.util.SocketUtil; 34 38 import net.sf.basedb.util.Values; 35 39 import net.sf.basedb.util.jobagent.JobAgentConnection; … … 37 41 38 42 /** 39 This is the co ntroller class for the job agent application. It is responsible for40 starting up and managing a running job agent. The agent itself will be started in41 separate thread.43 This is the command line controller class for the job agent application. It is 44 responsible for starting up and managing a running job agent. The agent itself will 45 be started in separate thread. 42 46 43 47 @author nicklas … … 156 160 } 157 161 158 private final Properties p ;162 private final Properties properties; 159 163 private final int port; 160 164 private final int timeout; … … 167 171 public AgentController(Properties p) 168 172 { 169 this.p = p;173 this.properties = p; 170 174 this.port = Values.getInt(p.getProperty("agent.port"), JobAgent.DEFAULT_PORT); 171 175 this.timeout = Values.getInt(p.getProperty("agent.timeout"), 1000); … … 178 182 179 183 @throws IOException If there is an error 184 @see JobAgentConnection#sendStart() 180 185 */ 181 186 public void startAgent() … … 207 212 // No job agent is running, create a new agent 208 213 log.info("Creating a new job agent on port " + port); 209 Agent agent = createAgent(p );214 Agent agent = createAgent(properties); 210 215 agent.service(null); // Start listening for incoming connections 211 216 } … … 230 235 } 231 236 237 /** 238 Stop a running job agent by sending a stop request to the agents remote control 239 service. The agent may be running in this or in another virtual machine. 240 @throws IOException If there is an error 241 @see JobAgentConnection#sendStop() 242 */ 232 243 public void stopAgent() 233 244 throws IOException … … 253 264 } 254 265 266 /** 267 Pause a running job agent by sending a stop request to the agents remote control 268 service. The agent may be running in this or in another virtual machine. 269 @throws IOException If there is an error 270 @see JobAgentConnection#sendPause() 271 */ 255 272 public void pauseAgent() 256 273 throws IOException … … 276 293 } 277 294 295 /** 296 Get info about running job agent by sending an info request to the agents remote control 297 service. The agent may be running in this or in another virtual machine. 298 @throws IOException If there is an error 299 @return A <code>JobAgentInfo</code> object 300 @see JobAgentConnection#getInfo(boolean) 301 */ 278 302 public JobAgentInfo getInfo() 279 303 throws IOException … … 296 320 public void registerAgent() 297 321 { 298 322 Agent agent = createAgent(properties); 323 log.info("Registering agent '" + agent.getId() + "' with BASE"); 324 SessionControl sc = agent.getSessionControl(); 325 DbControl dc = null; 326 try 327 { 328 dc = sc.newDbControl(); 329 JobAgent jobAgent = null; 330 try 331 { 332 jobAgent = agent.getJobAgent(dc); 333 log.info("Agent with id '" + agent.getId() + "' is already registered"); 334 } 335 catch (ItemNotFoundException ex) 336 { 337 jobAgent = JobAgent.getNew(dc, agent.getId()); 338 jobAgent.setName(agent.getName()); 339 jobAgent.setDescription(agent.getDescription()); 340 jobAgent.setPort(agent.getPort()); 341 jobAgent.setServer(SocketUtil.getPublicLocalHost().getHostName()); 342 dc.saveItem(jobAgent); 343 dc.commit(); 344 } 345 } 346 finally 347 { 348 if (dc != null) dc.close(); 349 } 299 350 } 300 351 301 352 public void unregisterAgent() 302 353 { 303 354 Agent agent = createAgent(properties); 355 SessionControl sc = agent.getSessionControl(); 356 DbControl dc = null; 357 try 358 { 359 dc = sc.newDbControl(); 360 JobAgent jobAgent = null; 361 try 362 { 363 jobAgent = agent.getJobAgent(dc); 364 dc.deleteItem(jobAgent); 365 dc.commit(); 366 } 367 catch (ItemNotFoundException ex) 368 {} 369 } 370 finally 371 { 372 if (dc != null) dc.close(); 373 } 304 374 } 305 375 306 376 private Agent createAgent(Properties p) 307 throws IOException308 377 { 309 378 Agent a = new Agent(p); -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/CmdLine.java
r2634 r2641 28 28 29 29 /** 30 Utility class for parsing command line arguments. It supports a very limitied 31 syntax: [options] [cmd] 32 <p> 33 Options starts with hyphen (-) and may have a value following it. The last 34 parameter is the command unless it starts with a hyphen. 35 <p> 36 Examples: 37 <pre class="code"> 38 ./jobagent.sh start 39 ./jobagent.sh -c agent.properties stop 40 </pre> 30 41 31 42 @author nicklas … … 38 49 private final String cmd; 39 50 51 /** 52 Create a new object for parsing the command line. 53 @param args The command line arguments sent to the <code>main()</code> method 54 */ 40 55 public CmdLine(String[] args) 41 56 { … … 71 86 } 72 87 73 88 /** 89 Get the command parameter 90 @return The command parameter, or null 91 */ 74 92 public String getCmd() 75 93 { … … 77 95 } 78 96 97 /** 98 Get the value for the specified option 99 @param option The option to get the value for 100 @return The value or null 101 */ 79 102 public String getOption(String option) 80 103 { … … 82 105 } 83 106 107 /** 108 Get the value for an option. 109 @param option The option to get the value for 110 @param defaultValue A default value if the option wasn't specified 111 @return The options value or the default value 112 */ 84 113 public String getOption(String option, String defaultValue) 85 114 { … … 87 116 } 88 117 118 /** 119 Check if an option was specified or not. 120 @param option The option to check 121 @return TRUE if the option was specified, FALSE otherwise 122 */ 123 public boolean hasOption(String option) 124 { 125 return options.containsKey(option); 126 } 127 89 128 } -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/JobQueueChecker.java
r2634 r2641 34 34 35 35 /** 36 This class is given the responsibility to check the job queue for 37 jobs that are awaiting execution. Each agent has one instance of this 38 class which is registered with the BASE core scheduler {@link Application#getScheduler()}. 39 <p> 40 This object should be thread-safe since the scheduler creates a new thread each 41 time the {@link #run()} method is called. 36 42 37 43 @author nicklas … … 63 69 { 64 70 log.info("Checking for jobs to execute"); 65 66 SessionControl sc = agent.getSessionControl(); 71 Job job = findJob(); 72 if (job != null) 73 { 74 agent.startJob(job); 75 } 76 } 77 public boolean cancel() 78 { 79 log.info("Cancelling job queue checker"); 80 return super.cancel(); 81 } 82 // ------------------------------------------- 83 84 private Job findJob() 85 { 67 86 DbControl dc = null; 87 Job job = null; 68 88 try 69 89 { 90 SessionControl sc = agent.getSessionControl(); 70 91 dc = sc.newDbControl(); 71 92 JobAgent jobAgent = agent.getJobAgent(dc); … … 81 102 else 82 103 { 83 Job job = jobs.get(0); 84 85 // Find a free slot to execute the job 86 Job.ExecutionTime estimated = job.getEstimatedExecutionTime(); 87 Job.ExecutionTime slotToUse = agent.getSlot(estimated); 88 if (slotToUse == null) 89 { 90 log.info("Couldn't find a free slot for executing job: " + job); 91 } 92 else 93 { 94 log.info("Starting job " + job + " in slot " + slotToUse); 95 // log.debug("Slot: " + slotToUse + "; used: " + usedSlots.get(slotToUse) + "; max: " + maxSlots.get(slotToUse)); 96 97 // TODO - start process 98 } 104 job = jobs.get(0); 99 105 } 106 } 107 catch (Throwable t) 108 { 109 log.error(t.getMessage(), t); 100 110 } 101 111 finally … … 103 113 if (dc != null) dc.close(); 104 114 } 105 115 return job; 106 116 } 107 117 108 public boolean cancel() 109 { 110 log.info("Cancelling job queue checker"); 111 return super.cancel(); 112 } 113 // ------------------------------------------- 114 118 115 119 } -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/DefaultRequestHandler.java
r2634 r2641 67 67 registerHandler(new InfoRequestHandler(agent), "info", "status"); 68 68 registerHandler(new StartRequestHandler(agent), "start"); 69 registerHandler(new StopRequestHandler(agent , false), "stop");69 registerHandler(new StopRequestHandler(agent), "stop"); 70 70 registerHandler(new PauseRequestHandler(agent), "pause"); 71 71 } … … 89 89 String answer = null; 90 90 RequestHandler handler = commandHandlers.get(cmd); 91 if (!agent.isAllowedControl(remote ))91 if (!agent.isAllowedControl(remote, cmd)) 92 92 { 93 93 answer = "FAILED Permission denied: cmd=" + cmd + "; host=" + remote.toString(); -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/InfoRequestHandler.java
r2634 r2641 62 62 long totalMemory = runtime.maxMemory(); 63 63 long usedMemory = runtime.totalMemory() - runtime.freeMemory(); 64 JobAgentInfo info = new JobAgentInfo(!agent.isRunning(), cpu, totalMemory, usedMemory, null);64 JobAgentInfo info = new JobAgentInfo(!agent.isRunning(), cpu, totalMemory, usedMemory, agent.getRunningJobs()); 65 65 answer = "OK\n"+info.toString(); 66 66 } -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/PauseRequestHandler.java
r2634 r2641 30 30 31 31 /** 32 This is a request handler for the <code>pause</code> command. It stops the33 job agent .32 This is a request handler for the <code>pause</code> command. It pauses the 33 job agent by calling the {@link Agent#pause()} method. 34 34 35 35 @author nicklas -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/StartRequestHandler.java
r2634 r2641 31 31 /** 32 32 This is a request handler for the <code>start</code> command. It starts the 33 job agent .33 job agent by calling the {@link Agent#start()} method. 34 34 35 35 @author nicklas -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/handlers/StopRequestHandler.java
r2634 r2641 24 24 package net.sf.basedb.clients.jobagent.handlers; 25 25 26 import java.net.InetAddress;27 26 import java.net.Socket; 28 27 29 28 import net.sf.basedb.clients.jobagent.Agent; 30 import net.sf.basedb.util.SocketUtil;31 29 import net.sf.basedb.util.jobagent.RequestHandler; 32 30 33 31 /** 34 This is a request handler for the <code>stop</code> command. It shuts dow the 35 job agent, and closes the {@link JobAgentServerConnection} listening for 36 incoming connections. Thus, it is not possible to start the job agent again 37 except from the command line. 32 This is a request handler for the <code>stop</code> command. It shuts down the 33 job agent by calling the {@link Agent#stop()} method. 38 34 39 35 @author nicklas … … 46 42 47 43 private final Agent agent; 48 private final boolean allowRemote;49 44 50 45 /** 51 46 Create a new stop request handler. 52 47 @param agent The agent 53 @param allowRemote TRUE to allow remote stop request, FALSE54 to only allow stop requests from the local host55 48 */ 56 public StopRequestHandler(Agent agent , boolean allowRemote)49 public StopRequestHandler(Agent agent) 57 50 { 58 51 this.agent = agent; 59 this.allowRemote = allowRemote;60 52 } 61 53 … … 69 61 if ("stop".equals(cmd)) 70 62 { 71 InetAddress remote = incoming.getInetAddress(); 72 if (!allowRemote && !SocketUtil.isLocalHost(remote)) 73 { 74 answer = "FAILED Permission denied: cmd=" + cmd + "; host=" + remote.toString(); 75 } 76 else 77 { 78 agent.stop(); 79 } 63 agent.stop(); 80 64 } 81 65 else -
trunk/src/core/net/sf/basedb/core/JobAgent.java
r2634 r2641 68 68 /** 69 69 The default port (47822) where job agents are listening for 70 connections. If 87422 is converted to a hexadecimal number it70 connections. If 47822 is converted to a hexadecimal number it 71 71 becomes BACE! 72 72 */ … … 342 342 343 343 // Create restrictions for owner of the job 344 query.include(Include.MINE, Include.OTHERS); 344 345 Restriction userRestriction = Restrictions.in( 345 346 Hql.property("owner"), -
trunk/src/core/net/sf/basedb/core/SessionControl.java
r2343 r2641 25 25 26 26 27 import net.sf.basedb.core.data.OwnableData; 27 28 import net.sf.basedb.core.data.UserData; 28 29 import net.sf.basedb.core.data.PasswordData; … … 535 536 if (session != null) HibernateUtil.close(session); 536 537 } 538 } 539 540 /** 541 Log in as the owner of the specified item. 542 @see #impersonateLogin(int, String) 543 */ 544 public SessionControl impersonateLogin(Ownable item, String comment) 545 { 546 UserData owner = ((OwnableData)((BasicItem)item).getData()).getOwner(); 547 return impersonateLogin(owner.getId(), comment); 537 548 } 538 549 -
trunk/src/core/net/sf/basedb/util/SocketUtil.java
r2634 r2641 34 34 import java.net.ServerSocket; 35 35 import java.net.Socket; 36 import java.net.UnknownHostException; 36 37 import java.nio.channels.ServerSocketChannel; 37 38 … … 210 211 211 212 /** 212 Get the address of the local host.213 Get the local address of the local host. 213 214 */ 214 215 public static InetAddress getLocalHost() … … 217 218 } 218 219 220 /** 221 Get the external address of the local host. 222 */ 223 public static InetAddress getPublicLocalHost() 224 { 225 String hostName = LOCAL_HOST.getCanonicalHostName(); 226 InetAddress adress = LOCAL_HOST; 227 try 228 { 229 adress = InetAddress.getByName(hostName); 230 } 231 catch (UnknownHostException ex) 232 { 233 // TODO - how to handle? 234 ex.printStackTrace(); 235 } 236 return adress; 237 } 238 239 219 240 } -
trunk/src/core/net/sf/basedb/util/jobagent/JobAgentServerConnection.java
r2634 r2641 30 30 import java.nio.channels.ServerSocketChannel; 31 31 import java.nio.channels.SocketChannel; 32 33 import org.apache.log4j.Logger; 32 34 33 35 import net.sf.basedb.util.SocketUtil; … … 45 47 private final int port; 46 48 private final RequestHandler requestHandler; 49 private final Logger logger; 47 50 private Thread listener; 48 51 … … 55 58 The handler must be thread-safe and able to handle multiple 56 59 requests at the same time 57 */ 58 public JobAgentServerConnection(int port, RequestHandler requestHandler) 60 @param logger A logger object for logging debug and other information 61 or null if no logging is wanted 62 */ 63 public JobAgentServerConnection(int port, RequestHandler requestHandler, Logger logger) 59 64 { 60 65 this.port = port; 61 66 this.requestHandler = requestHandler; 67 this.logger = logger; 62 68 } 63 69 … … 73 79 { 74 80 if (listener != null) return; 81 if (logger != null) logger.info("Opening listener on port " + port); 75 82 76 83 ServerSocketChannel channel = ServerSocketChannel.open(); 77 84 channel.socket().bind(new InetSocketAddress(port)); 78 85 79 listener = new Thread(new ListenerThread(channel, requestHandler ),80 "ListenerThread :"+this.toString());86 listener = new Thread(new ListenerThread(channel, requestHandler, logger), 87 "ListenerThread."+this.toString()); 81 88 listener.start(); 89 if (logger != null) logger.info("Now listening on port " + port); 82 90 } 83 91 … … 89 97 { 90 98 if (listener == null) return; 99 if (logger != null) logger.info("Stopping listener on port " + port); 91 100 listener.interrupt(); 92 101 listener = null; … … 113 122 private final ServerSocketChannel socket; 114 123 private final RequestHandler requestHandler; 115 116 private ListenerThread(ServerSocketChannel socket, RequestHandler requestHandler) 124 private final Logger logger; 125 126 private ListenerThread(ServerSocketChannel socket, RequestHandler requestHandler, Logger logger) 117 127 { 118 128 this.socket = socket; 119 129 this.requestHandler = requestHandler; 130 this.logger = logger; 120 131 } 121 132 … … 133 144 { 134 145 SocketChannel incoming = socket.accept(); 135 Thread request = new Thread(new RequestHandlerThread(incoming.socket(), requestHandler), "RequestHandlerThread: " + this.toString()); 146 if (logger != null && logger.isDebugEnabled()) 147 { 148 logger.debug("Accepted incoming connection from: " + 149 incoming.socket().getInetAddress().toString()); 150 } 151 Thread request = new Thread(new RequestHandlerThread(incoming.socket(), 152 requestHandler, logger), "RequestHandlerThread." + this.toString()); 136 153 request.start(); 137 154 interrupted = Thread.interrupted(); … … 139 156 catch (ClosedByInterruptException ex) 140 157 { 158 if (logger != null) logger.info("Listener service was interrupted by another thread"); 141 159 interrupted = true; 142 160 } 143 161 catch (Throwable t) 144 162 { 145 // TODO - better logging 146 t.printStackTrace(); 147 return; 163 if (logger != null) logger.error(t.getMessage(), t); 164 interrupted = true; 148 165 } 149 166 } 167 if (logger != null) logger.info("Closing socket on port " + socket.socket().getLocalPort()); 150 168 SocketUtil.close(socket); 151 169 } … … 163 181 private final Socket incoming; 164 182 private final RequestHandler requestHandler; 165 166 private RequestHandlerThread(Socket incoming, RequestHandler requestHandler) 183 private final Logger logger; 184 185 private RequestHandlerThread(Socket incoming, RequestHandler requestHandler, Logger logger) 167 186 { 168 187 this.incoming = incoming; 169 188 this.requestHandler = requestHandler; 189 this.logger = logger; 170 190 } 171 191 … … 187 207 { 188 208 answer = "FAILED " + t.getClass().getName() + ": " + t.getMessage(); 209 if (logger != null) logger.error("Eception in request handler "+ requestHandler + 210 ": " + t.getMessage(), t); 189 211 } 190 212 SocketUtil.send(incoming, answer, true); … … 193 215 catch (Throwable t) 194 216 { 195 // TODO - better logging 196 t.printStackTrace(); 217 if (logger != null) logger.error(t.getMessage(), t); 197 218 } 198 219 } -
trunk/src/test/TestJobAgent.java
r2634 r2641 26 26 import java.net.Socket; 27 27 import java.util.Date; 28 import java.util.Properties; 28 29 29 30 import net.sf.basedb.clients.jobagent.Agent; … … 66 67 write_header(); 67 68 // Standard tests: create, load, list 68 create_fake_jobagent(8888 );69 create_fake_jobagent(8889 );69 create_fake_jobagent(8888, "net.sf.baseb.clients.agent.test.1"); 70 create_fake_jobagent(8889, "net.sf.baseb.clients.agent.test.2"); 70 71 71 72 int id = test_create("net.sf.baseb.clients.agent.test.1", 8888, true); … … 409 410 } 410 411 411 static void create_fake_jobagent(final int port )412 static void create_fake_jobagent(final int port, String externalId) 412 413 { 413 414 try 414 415 { 415 416 // The request handler for incoming requests 416 Agent agent = new Agent(port); 417 Properties p = new Properties(); 418 p.setProperty("agent.port", Integer.toString(port)); 419 p.setProperty("agent.user", TestUtil.getLogin()); 420 p.setProperty("agent.password", TestUtil.getPassword()); 421 p.setProperty("agent.id", externalId); 422 Agent agent = new Agent(p); 417 423 RequestHandler requestHandler = new DefaultRequestHandler(agent) 418 424 {
Note: See TracChangeset
for help on using the changeset viewer.