Changeset 2634 for trunk/src/clients/jobagent
- Timestamp:
- Sep 12, 2006, 2:24:28 PM (16 years ago)
- Location:
- trunk/src/clients/jobagent
- Files:
-
- 10 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java
r2632 r2634 22 22 Boston, MA 02111-1307, USA. 23 23 */ 24 package sf.basedb.clients.jobagent; 25 24 package net.sf.basedb.clients.jobagent; 25 26 import java.io.IOException; 27 import java.net.InetAddress; 26 28 import java.util.Properties; 27 28 import net.sf.basedb.util.jobagent.JobAgentInfo; 29 import java.util.TimerTask; 30 31 import net.sf.basedb.clients.jobagent.handlers.DefaultRequestHandler; 32 import net.sf.basedb.core.Application; 33 import net.sf.basedb.core.DbControl; 34 import net.sf.basedb.core.Job; 35 import net.sf.basedb.core.JobAgent; 36 import net.sf.basedb.core.SessionControl; 37 38 import net.sf.basedb.util.SocketUtil; 39 import net.sf.basedb.util.Values; 29 40 import net.sf.basedb.util.jobagent.JobAgentServerConnection; 30 41 import net.sf.basedb.util.jobagent.RequestHandler; 42 import net.sf.basedb.util.timer.Scheduler; 31 43 32 44 /** … … 39 51 public class Agent 40 52 { 53 54 /** 55 Log job agent events. 56 */ 57 private static final org.apache.log4j.Logger log = 58 org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.Agent"); 59 41 60 private final String login; 42 61 private final String password; … … 45 64 private final String description; 46 65 private final Integer port; 47 private final JobAgentServerConnection server; 48 49 Agent(Properties properties) 66 67 68 private JobAgentServerConnection server; 69 private RequestHandler requestHandler; 70 71 private TimerTask jobQueueChecker; 72 private long checkInterval = 10000; 73 74 75 private boolean isRunning; 76 77 78 private SessionControl sc; 79 80 81 public Agent(int port) 82 { 83 this.port = port; 84 this.login = null; 85 this.password = null; 86 this.externalId = null; 87 this.name = null; 88 this.description = null; 89 } 90 91 public Agent(Properties properties) 50 92 { 51 93 this.login = properties.getProperty("agent.user"); … … 54 96 this.name = properties.getProperty("agent.name"); 55 97 this.description = properties.getProperty("agent.description"); 56 this.port = Integer.parseInt(properties.getProperty("agent.port"));98 this.port = Values.getInt(properties.getProperty("agent.port"), JobAgent.DEFAULT_PORT); 57 99 validate(); 58 this.server = new JobAgentServerConnection(port, new RequestHandlerImpl());100 isRunning = false; 59 101 } 60 102 … … 62 104 {} 63 105 64 public void start() 65 { 66 67 } 68 69 public void stop() 70 { 71 72 } 73 74 public void pause() 75 { 76 77 } 78 79 private class RequestHandlerImpl 80 implements RequestHandler 81 { 82 public String handleCmd(String cmd) 83 { 84 String answer = null; 85 if ("info".equals(cmd)) 86 { 87 Runtime runtime = Runtime.getRuntime(); 88 int cpu = (int)(Math.random()*100); 89 long totalMemory = runtime.maxMemory(); 90 long usedMemory = runtime.totalMemory() - runtime.freeMemory(); 91 JobAgentInfo info = new JobAgentInfo(cpu, totalMemory, usedMemory, null); 92 answer = info.toString(); 93 } 94 return answer; 106 public synchronized void service(RequestHandler requestHandler) 107 throws IOException 108 { 109 log.info("Starting listener service for job agent on port " + port); 110 if (server == null) 111 { 112 this.requestHandler = requestHandler == null ? new DefaultRequestHandler(this) : requestHandler; 113 this.server = new JobAgentServerConnection(port, this.requestHandler); 114 server.open(); 115 } 116 } 117 118 public synchronized void start() 119 { 120 log.info("Starting job agent on port " + port); 121 if (!isRunning) 122 { 123 isRunning = true; 124 Scheduler scheduler = Application.getScheduler(); 125 jobQueueChecker = scheduler.schedule(new JobQueueChecker(this), checkInterval, checkInterval, false); 126 } 127 } 128 129 public synchronized void stop() 130 { 131 log.info("Stopping job agent on port " + port); 132 isRunning = false; 133 closeJobQueueChecker(); 134 closeServer(); 135 Application.stop(); 136 } 137 138 public synchronized void pause() 139 { 140 log.info("Pausing job agent on port " + port); 141 isRunning = false; 142 closeJobQueueChecker(); 143 } 144 145 public boolean isRunning() 146 { 147 return isRunning; 148 } 149 150 /** 151 Check if the computer specified by the given address is allowed to 152 control this job agent. 153 @param remote The address to the remote computer 154 @return TRUE if the computer is allowed, FALSE otherwise 155 */ 156 public boolean isAllowedControl(InetAddress remote) 157 { 158 return true; // TODO - implement 159 } 160 161 public SessionControl getSessionControl() 162 { 163 if (sc == null) 164 { 165 Application.start(false); 166 sc = Application.newSessionControl("net.sf.basedb.clients.jobagent", 167 SocketUtil.getLocalHost().toString(), null); 168 sc.login(login, password, null, false); 169 } 170 return sc; 171 } 172 173 public JobAgent getJobAgent(DbControl dc) 174 { 175 return JobAgent.getByExternalId(dc, externalId); 176 } 177 178 public Job.ExecutionTime getSlot(Job.ExecutionTime estimated) 179 { 180 return Job.ExecutionTime.SHORT; // TODO - implement 181 } 182 183 private void closeServer() 184 { 185 if (server != null) 186 { 187 server.close(); 188 server = null; 189 } 190 } 191 192 private void closeJobQueueChecker() 193 { 194 if (jobQueueChecker != null) 195 { 196 jobQueueChecker.cancel(); 197 jobQueueChecker = null; 95 198 } 96 199 } -
trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/AgentController.java
r2632 r2634 22 22 Boston, MA 02111-1307, USA. 23 23 */ 24 package sf.basedb.clients.jobagent;24 package net.sf.basedb.clients.jobagent; 25 25 26 26 import java.io.File; 27 27 import java.io.FileInputStream; 28 28 import java.io.IOException; 29 import java.net.ConnectException; 30 import java.net.SocketTimeoutException; 29 31 import java.util.Properties; 30 32 33 import net.sf.basedb.core.JobAgent; 34 import net.sf.basedb.util.Values; 31 35 import net.sf.basedb.util.jobagent.JobAgentConnection; 36 import net.sf.basedb.util.jobagent.JobAgentInfo; 32 37 33 38 /** 34 This is the main class for the job agent application. It is responsible for 35 starting up and managing the running job agent. 39 This is the controller class for the job agent application. It is responsible for 40 starting up and managing a running job agent. The agent itself will be started in 41 separate thread. 36 42 37 43 @author nicklas … … 43 49 44 50 /** 45 @param args 46 */ 51 Log job agent events. 52 */ 53 private static final org.apache.log4j.Logger log = 54 org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.AgentController"); 55 47 56 public static void main(String[] args) 48 57 { 49 Properties p = new Properties(); 50 try 51 { 52 p.load(new FileInputStream(new File("jobagent.properties"))); 53 } 54 catch (IOException ex) 55 {} 56 String cmd = "start"; 57 58 AgentController ac = new AgentController(p); 59 if ("start".equals(cmd)) 60 { 61 ac.startAgent(); 62 } 63 else if ("stop".equals(cmd)) 64 { 65 ac.stopAgent(); 66 } 67 else if ("register".equals(cmd)) 68 { 69 ac.registerAgent(); 70 } 71 else if ("unregister".equals(cmd)) 72 { 73 ac.unregisterAgent(); 74 } 75 else if ("pause".equals(cmd)) 76 { 77 // ac.pasueAgent(); 58 CmdLine cmdLine = new CmdLine(args); 59 String cmd = cmdLine.getCmd(); 60 String propertiesFile = cmdLine.getOption("-c", "jobagent.properties"); 61 62 if ("help".equals(cmd)) 63 { 64 printUsage(); 65 printHelp(); 78 66 } 79 67 else 80 68 { 81 // Unknown command 82 } 83 84 } 85 69 Properties p = new Properties(); 70 try 71 { 72 p.load(new FileInputStream(new File(propertiesFile))); 73 } 74 catch (Throwable t) 75 { 76 String message = "Could not load configuration file " + propertiesFile; 77 System.out.println(message + ": " + t.getMessage()); 78 printUsage(); 79 log.debug(message, t); 80 return; 81 } 82 83 try 84 { 85 AgentController ac = new AgentController(p); 86 if ("start".equals(cmd)) 87 { 88 System.out.println("Starting job agent..."); 89 ac.startAgent(); 90 System.out.println("Job agent is up and running."); 91 } 92 else if ("stop".equals(cmd)) 93 { 94 System.out.println("Stopping job agent..."); 95 ac.stopAgent(); 96 System.out.println("The job agent has been stopped"); 97 } 98 else if ("info".equals(cmd)) 99 { 100 System.out.println("Getting info from job agent..."); 101 JobAgentInfo info = ac.getInfo(); 102 System.out.println(info.toString()); 103 } 104 else if ("register".equals(cmd)) 105 { 106 System.out.println("Registering job agent with BASE..."); 107 ac.registerAgent(); 108 System.out.println("Done"); 109 } 110 else if ("unregister".equals(cmd)) 111 { 112 System.out.println("Unregistering job agent from BASE..."); 113 ac.unregisterAgent(); 114 System.out.println("Done"); 115 } 116 else if ("pause".equals(cmd)) 117 { 118 System.out.println("Pausing job agent..."); 119 ac.pauseAgent(); 120 System.out.println("The job agent is now paused"); 121 } 122 else 123 { 124 System.out.println("Unknown command: " + cmd); 125 printUsage(); 126 printHelp(); 127 } 128 } 129 catch (Throwable t) 130 { 131 String message = "Exception when executing command " + cmd; 132 System.out.println(message + ": " + t.getMessage()); 133 log.error(message, t); 134 } 135 } 136 } 137 138 private static void printUsage() 139 { 140 System.out.println("Usage : jobagent.sh [options] cmd"); 141 System.out.println(" options : -c <configuration file>"); 142 System.out.println(" cmd : start | stop | pause | info | register | unregister | help"); 143 } 144 145 private static void printHelp() 146 { 147 System.out.println("Commands"); 148 System.out.println("==========="); 149 System.out.println("start : Starts the job agent"); 150 System.out.println("stop : Stops the job agent"); 151 System.out.println("pause : Pauses the job agent"); 152 System.out.println("info : Get info about the job agent"); 153 System.out.println("register : Registers this job agent with the BASE server"); 154 System.out.println("unregister: Unregisters this job agent from the BASE server"); 155 System.out.println("help : Display this help message"); 156 } 157 86 158 private final Properties p; 87 159 private final int port; 160 private final int timeout; 161 162 /** 163 Create a new controller for controlling the job agent specified by the 164 given properties. 165 @param p The properties for the job agent to control 166 */ 88 167 public AgentController(Properties p) 89 168 { 90 169 this.p = p; 91 this.port = Integer.parseInt(p.getProperty("agent.port")); 92 } 93 170 this.port = Values.getInt(p.getProperty("agent.port"), JobAgent.DEFAULT_PORT); 171 this.timeout = Values.getInt(p.getProperty("agent.timeout"), 1000); 172 } 173 174 /** 175 Start the agent this controller is controlling. If no agent is running 176 a new one is created in another thread in this virtual machine. The thread 177 must be stopped before the virtual machine exits. 178 179 @throws IOException If there is an error 180 */ 94 181 public void startAgent() 95 { 96 JobAgentConnection conn = new JobAgentConnection(port, 1000); 182 throws IOException 183 { 184 log.info("Starting job agent on port " + port); 185 JobAgentConnection conn = new JobAgentConnection(port, timeout); 97 186 try 98 187 { 99 if (conn.isRunning()) 100 { 101 // Error: Already running 188 JobAgentInfo info = null; 189 try 190 { 191 log.info("Sending info command to job agent on port " + port); 192 info = conn.getInfo(false); 193 } 194 catch (ConnectException t) 195 { 196 // Connection was refused, probably due to nobody listening on the port 197 log.debug(t.getMessage(), t); 198 } 199 catch (SocketTimeoutException t) 200 { 201 // Connection timed out, maybe because nobody listens 202 log.debug(t.getMessage(), t); 203 } 204 205 if (info == null) 206 { 207 // No job agent is running, create a new agent 208 log.info("Creating a new job agent on port " + port); 209 Agent agent = createAgent(p); 210 agent.service(null); // Start listening for incoming connections 102 211 } 103 212 else 104 213 { 105 createAgent(p, true); 214 log.info("An existing job agent is already running on port " + port + 215 " (" + (info.isPaused() ? "paused" : "running") + ")"); 216 } 217 218 // If we reach this part, we should at least have a job agent listening for connections 219 if (info == null || info.isPaused()) 220 { 221 // It is a new agent or an existing that is paused --> Start it 222 log.info("Sending start command to job agent on port " + port); 106 223 conn.sendStart(); 107 224 } 108 225 } 109 catch (Throwable t)110 {111 // TODO112 }113 226 finally 114 227 { … … 118 231 119 232 public void stopAgent() 120 { 121 JobAgentConnection conn = new JobAgentConnection(port, 1000); 233 throws IOException 234 { 235 log.info("Stopping job agent on port " + port); 236 JobAgentConnection conn = new JobAgentConnection(port, timeout); 122 237 try 123 238 { 124 if (!conn.isRunning()) 125 { 126 // Error: Not running 127 } 128 else 129 { 130 conn.sendStop(); 131 } 132 } 133 catch (Throwable t) 134 { 135 // TODO 239 log.info("Sending stop command to job agent on port " + port); 240 conn.sendStop(); 241 } 242 catch (ConnectException t) 243 { 244 // Connection was refused, probably due to nobody listening on the port 245 // Do nothing 246 log.debug(t.getMessage(), t); 247 log.info("No job agent is running on port " + port); 136 248 } 137 249 finally … … 141 253 } 142 254 255 public void pauseAgent() 256 throws IOException 257 { 258 log.info("Pausing job agent on port " + port); 259 JobAgentConnection conn = new JobAgentConnection(port, timeout); 260 try 261 { 262 log.info("Sending stop command to job agent on port " + port); 263 conn.sendPause(); 264 } 265 catch (ConnectException t) 266 { 267 // Connection was refused, probably due to nobody listening on the port 268 // Do nothing 269 log.debug(t.getMessage(), t); 270 log.info("No job agent is running on port " + port); 271 } 272 finally 273 { 274 conn.close(); 275 } 276 } 277 278 public JobAgentInfo getInfo() 279 throws IOException 280 { 281 log.info("Getting info from agent on port " + port); 282 JobAgentConnection conn = new JobAgentConnection(port, timeout); 283 JobAgentInfo info = null; 284 try 285 { 286 log.info("Sending info command to job agent on port " + port); 287 info = conn.getInfo(true); 288 } 289 finally 290 { 291 conn.close(); 292 } 293 return info; 294 } 295 143 296 public void registerAgent() 144 297 { … … 151 304 } 152 305 153 private Agent createAgent(Properties p, boolean thread) 306 private Agent createAgent(Properties p) 307 throws IOException 154 308 { 155 309 Agent a = new Agent(p);
Note: See TracChangeset
for help on using the changeset viewer.