Changeset 2643 for trunk/src/clients/jobagent
- Timestamp:
- Sep 15, 2006, 4:08:12 PM (16 years ago)
- Location:
- trunk/src/clients/jobagent
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/clients/jobagent/jobagent.properties.in
r2641 r2643 62 62 # ======================= 63 63 64 # #The name of the executor class that is responsible for starting the job64 # The name of the executor class that is responsible for starting the job 65 65 ## The default is ProcessJobExecutor which starts job in a separate process 66 66 ## The class must implement the net.sf.basedb.clients.JobExecutor interface 67 68 ## Executor that executes the jobs in a separate process (recommended) 67 69 agent.executor.class=net.sf.basedb.clients.jobagent.executors.ProcessJobExecutor 70 71 ## Executor that executes jobs in the same process as the job agent (not recommended) 72 # agent.executor.class=net.sf.basedb.clients.jobagent.executors.ThreadJobExecutor 73 74 ## Executor useful for debugging purposes 75 # agent.executor.class=net.sf.basedb.clients.jobagent.executors.DummyJobExecutor 68 76 69 77 ## Initialisation parameters to the job exector. This string will be passed -
trunk/src/clients/jobagent/jobagent.sh
r2634 r2643 30 30 31 31 # Execute JobAgent 32 java -server -Xmx 40M -Xms40M -cp $CP net.sf.basedb.clients.jobagent.AgentController $* &32 java -server -Xmx50M -Xms50M -cp $CP net.sf.basedb.clients.jobagent.AgentController $* & -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java
r2641 r2643 291 291 public Agent(Properties properties) 292 292 { 293 if (log.isDebugEnabled()) 294 { 295 log.debug("Creating new agent"); 296 for (Map.Entry<Object, Object> entry : properties.entrySet()) 297 { 298 Object value = entry.getValue(); 299 Object key = entry.getKey(); 300 if ("agent.password".equals(key)) 301 { 302 value = "******************"; 303 } 304 log.debug("Property [" + key + " = " + value + "]"); 305 } 306 } 307 293 308 // BASE settings 294 309 this.login = properties.getProperty("agent.user"); … … 504 519 jobExecutor = createJobExecutor(); 505 520 jobQueueChecker = createJobQueueChecker(); 521 522 // Start the BASE application in a new thread so we can return quickly 523 Thread t = new Thread( 524 new Runnable() 525 { 526 public void run() 527 { 528 SessionControl sc = null; 529 try 530 { 531 sc = getSessionControl(); 532 } 533 catch (Throwable t) 534 {} 535 if (sc == null || !sc.isLoggedIn()) 536 { 537 stop(); 538 } 539 } 540 } 541 ); 542 t.start(); 506 543 } 507 544 } … … 633 670 public SessionControl getSessionControl() 634 671 { 635 if (sc == null) 636 { 637 Application.start(false); 638 sc = Application.newSessionControl("net.sf.basedb.clients.jobagent", 639 SocketUtil.getLocalHost().toString(), null); 640 } 641 if (!sc.isLoggedIn()) 642 { 643 sc.login(login, password, null, false); 672 try 673 { 674 if (sc == null) 675 { 676 log.info("Starting BASE application"); 677 Application.start(false); 678 sc = Application.newSessionControl("net.sf.basedb.clients.jobagent", 679 SocketUtil.getLocalHost().toString(), null); 680 } 681 if (!sc.isLoggedIn()) 682 { 683 log.info("Logging in as user: " + login); 684 sc.login(login, password, "Job agent running on host " + getServerName(), false); 685 } 686 } 687 catch (Throwable t) 688 { 689 log.error(t.getMessage(), t); 644 690 } 645 691 return sc; -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/JobExecutor.java
r2641 r2643 62 62 to the job executor to decide how to handle this. It may either decide 63 63 to kill the job or let it continue as if nothing happened. 64 <p> 65 The job sent to this method has it's status set to {@link Job.Status#PREPARED}. 66 The implementation of this method must change the status to either 67 {@link Job.Status#DONE} or {@link Job.Status#ERROR}. If the status hasn't changed 68 the job agent will set the status to signal an unknown error. 64 69 65 70 @param sc A <code>SessionControl</code> where the owner of the job is -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/JobRunner.java
r2641 r2643 24 24 package net.sf.basedb.clients.jobagent; 25 25 26 import java.util.Arrays; 27 28 import net.sf.basedb.core.DbControl; 29 import net.sf.basedb.core.ItemModifiedException; 26 30 import net.sf.basedb.core.Job; 31 import net.sf.basedb.core.SessionControl; 27 32 28 33 /** … … 45 50 */ 46 51 private static final org.apache.log4j.Logger log = 47 org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent. Agent");52 org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.JobRunner"); 48 53 49 54 private final Job job; … … 80 85 return; 81 86 } 87 DbControl dc = null; 88 SessionControl sc = null; 82 89 try 83 90 { 84 91 log.info("Executing job " + job + " using executor " + jobExecutor); 85 jobExecutor.executeJob(agent.getImpersonatedSessionControl(job), agent, job, slotToUse); 92 93 // Log in as the owner of the job 94 sc = agent.getImpersonatedSessionControl(job); 95 dc = sc.newDbControl(); 96 97 // Load a fresh copy of the job 98 Job j = Job.getById(dc, job.getId()); 99 100 // Status may have changed since the queue was checked 101 if (j.getStatus() != Job.Status.WAITING) return; 102 103 // Change the status to Job.Status.PREPARED 104 j.setPrepared(agent.getServerName()); 105 try 106 { 107 dc.commit(); 108 } 109 catch (ItemModifiedException ex) 110 { 111 // Another thread or agent has started the job already, return 112 // without starting the job 113 return; 114 } 115 116 try 117 { 118 jobExecutor.executeJob(sc, agent, j, slotToUse); 119 } 120 catch (Throwable t) 121 { 122 // Something went wrong, update status to ERROR 123 dc = sc.newDbControl(); 124 j = Job.getById(dc, j.getId()); 125 j.doneError(t.getMessage(), Arrays.asList(t)); 126 dc.commit(); 127 throw t; 128 } 129 130 // Check that that the status has changed from prepared 131 dc = sc.newDbControl(); 132 j = Job.getById(dc, j.getId()); 133 if (j.getStatus() == Job.Status.PREPARED) 134 { 135 j.doneError("Unknown error, job was never executed."); 136 log.error("Unknown error: Job " + job + " was never executed by executor " + jobExecutor); 137 } 138 else 139 { 140 log.info("Done executing job: " + job + " with executor " + jobExecutor); 141 } 142 dc.commit(); 86 143 } 87 144 catch (Throwable t) … … 92 149 { 93 150 agent.jobDone(job, slotToUse); 151 if (dc != null) dc.close(); 152 if (sc != null && sc.isLoggedIn()) 153 { 154 try 155 { 156 sc.logout(); 157 } 158 catch (Throwable t) 159 { 160 log.error(t.getMessage(), t); 161 } 162 } 94 163 } 95 164 } -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/DummyJobExecutor.java
r2641 r2643 33 33 This is dummy job executor implementation which just sets the status 34 34 of each job to {@link net.sf.basedb.core.Job.Status#DONE} without actually 35 executin the job. This class is usful for debugginthe job agent application.35 executing the job. This class is usful for debugging the job agent application. 36 36 37 37 @author nicklas … … 71 71 job.doneOk("Not really, but used for testing job agent"); 72 72 dc.commit(); 73 log.info("Done executing: " + job); 73 74 } 74 75 finally -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/ProcessJobExecutor.java
r2641 r2643 24 24 package net.sf.basedb.clients.jobagent.executors; 25 25 26 import java.io.BufferedReader; 27 import java.io.IOException; 28 import java.io.InputStream; 29 import java.io.InputStreamReader; 30 import java.io.OutputStream; 31 import java.io.PrintWriter; 32 import java.io.Reader; 33 import java.io.StringWriter; 34 import java.io.Writer; 35 import java.util.ArrayList; 36 import java.util.Arrays; 37 import java.util.List; 38 26 39 import net.sf.basedb.clients.jobagent.Agent; 27 40 import net.sf.basedb.clients.jobagent.JobExecutor; 41 import net.sf.basedb.core.DbControl; 28 42 import net.sf.basedb.core.Job; 29 43 import net.sf.basedb.core.SessionControl; … … 70 84 { 71 85 log.info("Executing job: " + job); 72 // TODO - start new process for executing the job 86 String classPath = System.getProperty("java.class.path"); 87 88 List<String> cmd = new ArrayList<String>(10); 89 cmd.add("java"); 90 cmd.add("-server"); 91 cmd.add("-cp"); 92 cmd.add(classPath); 93 cmd.add("net.sf.basedb.clients.jobagent.executors.ThreadJobExecutor"); 94 cmd.add(Integer.toString(job.getId())); 95 96 log.debug("Using class path: " + classPath); 97 ProcessBuilder builder = new ProcessBuilder(cmd); 98 builder.redirectErrorStream(true); 99 100 Process process = null; 101 DbControl dc = null; 102 try 103 { 104 try 105 { 106 process = builder.start(); 107 } 108 catch (Throwable t) 109 { 110 log.error("Error executing job: " + job, t); 111 dc = sc.newDbControl(); 112 job = Job.getById(dc, job.getId()); 113 job.doneError(t.getMessage(), Arrays.asList(t)); 114 dc.commit(); 115 return; 116 } 117 118 // Set up threads for reading output 119 StringWriter result = new StringWriter(); 120 Thread t = new Thread(new StreamRedirector( 121 new InputStreamReader(process.getInputStream()), result)); 122 t.start(); 123 124 try 125 { 126 log.info("Waiting for process to end"); 127 int exitCode = process.waitFor(); 128 if (exitCode != 0) 129 { 130 log.error("Process ended with exit code: " + exitCode); 131 log.error("Process output is: " + result.toString()); 132 dc = sc.newDbControl(); 133 job = Job.getById(dc, job.getId()); 134 job.doneError("Job ended with exit code: " + exitCode, 135 Arrays.asList(new Exception(result.toString()))); 136 dc.commit(); 137 } 138 else 139 { 140 log.info("Process ended"); 141 } 142 } 143 catch (InterruptedException ex) 144 { 145 log.info("Job was interrupted: " + job, ex); 146 // Kill the process 147 process.destroy(); 148 dc = sc.newDbControl(); 149 job = Job.getById(dc, job.getId()); 150 job.doneError(ex.getMessage(), Arrays.asList(ex)); 151 dc.commit(); 152 } 153 } 154 finally 155 { 156 if (dc != null) dc.close(); 157 } 158 73 159 } 74 160 … … 76 162 {} 77 163 // ------------------------------------------- 164 165 166 public static class StreamRedirector 167 implements Runnable 168 { 169 170 private final Reader in; 171 private final Writer out; 172 public StreamRedirector(Reader in, Writer out) 173 { 174 this.in = in; 175 this.out = out; 176 } 177 178 public void run() 179 { 180 BufferedReader reader = new BufferedReader(in); 181 PrintWriter writer = new PrintWriter(out); 182 String line = null; 183 try 184 { 185 while ((line = reader.readLine()) != null) 186 { 187 writer.println(line); 188 } 189 } 190 catch (IOException ex) 191 {} 192 } 193 } 194 78 195 } -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/ThreadJobExecutor.java
r2641 r2643 24 24 package net.sf.basedb.clients.jobagent.executors; 25 25 26 import java.util.Arrays; 27 import java.util.List; 28 26 29 import net.sf.basedb.clients.jobagent.Agent; 27 30 import net.sf.basedb.clients.jobagent.JobExecutor; 31 import net.sf.basedb.core.DbControl; 28 32 import net.sf.basedb.core.Job; 33 import net.sf.basedb.core.PluginExecutionRequest; 34 import net.sf.basedb.core.PluginResponse; 29 35 import net.sf.basedb.core.SessionControl; 36 import net.sf.basedb.core.plugin.Response; 30 37 31 38 /** … … 50 57 */ 51 58 private static final org.apache.log4j.Logger log = 52 org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.executors. ProcessJobExecutor");59 org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.executors.ThreadJobExecutor"); 53 60 54 61 … … 66 73 { 67 74 log.info("Executing job: " + job); 68 // TODO - execute job 75 DbControl dc = null; 76 try 77 { 78 dc = sc.newDbControl(); 79 80 // Load a fresh copy of the job item 81 job = Job.getById(dc, job.getId()); 82 PluginExecutionRequest exec = null; 83 84 try 85 { 86 exec = job.execute(null, agent.getServerName()); 87 } 88 catch (Throwable t) 89 { 90 // The job could not be started, set error status and send message to owner 91 job.doneError(t.getMessage(), Arrays.asList(t)); 92 log.error("Error executing job: " + job, t); 93 } 94 dc.commit(); 95 96 PluginResponse response = exec.invoke(); 97 if (response.getStatus() == Response.Status.ERROR) 98 { 99 // There was an error, job status and message is already sent to the user 100 log.error("Error executing job: " + job + "; " + response.getMessage()); 101 List<? extends Throwable> errors = response.getErrorList(); 102 if (errors != null) 103 { 104 for (Throwable t : errors) 105 { 106 log.error(t.getMessage(), t); 107 } 108 } 109 } 110 else 111 { 112 log.info("Done executing: " + job); 113 } 114 } 115 finally 116 { 117 if (dc != null) dc.close(); 118 } 69 119 } 70 120 … … 72 122 {} 73 123 // ------------------------------------------- 124 125 public static void main(String[] args) 126 { 127 System.out.println("Running job with ID: " + args[0]); 128 System.exit(11); 129 } 130 74 131 }
Note: See TracChangeset
for help on using the changeset viewer.