Ignore:
Timestamp:
Sep 15, 2006, 4:08:12 PM (16 years ago)
Author:
Nicklas Nordborg
Message:

References #351: External job server usage

In-process job executor is now working. Work has started on the out-of-process job executor.
Added sendMessage and stackTrace property to job for improved error reporting.

Location:
trunk/src/clients/jobagent
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/clients/jobagent/jobagent.properties.in

    r2641 r2643  
    6262# =======================
    6363
    64 ## The name of the executor class that is responsible for starting the job
     64# The name of the executor class that is responsible for starting the job
    6565## The default is ProcessJobExecutor which starts job in a separate process
    6666## The class must implement the net.sf.basedb.clients.JobExecutor interface
     67
     68## Executor that executes the jobs in a separate process (recommended)
    6769agent.executor.class=net.sf.basedb.clients.jobagent.executors.ProcessJobExecutor
     70
     71## Executor that executes jobs in the same process as the job agent (not recommended)
     72# agent.executor.class=net.sf.basedb.clients.jobagent.executors.ThreadJobExecutor
     73
     74## Executor useful for debugging purposes
     75# agent.executor.class=net.sf.basedb.clients.jobagent.executors.DummyJobExecutor
    6876
    6977## Initialisation parameters to the job exector. This string will be passed
  • trunk/src/clients/jobagent/jobagent.sh

    r2634 r2643  
    3030
    3131# Execute JobAgent
    32 java -server -Xmx40M -Xms40M -cp $CP net.sf.basedb.clients.jobagent.AgentController $* &
     32java -server -Xmx50M -Xms50M -cp $CP net.sf.basedb.clients.jobagent.AgentController $* &
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/Agent.java

    r2641 r2643  
    291291  public Agent(Properties properties)
    292292  {
     293    if (log.isDebugEnabled())
     294    {
     295      log.debug("Creating new agent");
     296      for (Map.Entry<Object, Object> entry : properties.entrySet())
     297      {
     298        Object value = entry.getValue();
     299        Object key = entry.getKey();
     300        if ("agent.password".equals(key))
     301        {
     302          value = "******************";
     303        }
     304        log.debug("Property [" + key + " = " + value + "]");
     305      }
     306    }
     307   
    293308    // BASE settings
    294309    this.login = properties.getProperty("agent.user");
     
    504519      jobExecutor = createJobExecutor();
    505520      jobQueueChecker = createJobQueueChecker();
     521     
     522      // Start the BASE application in a new thread so we can return quickly
     523      Thread t = new Thread(
     524        new Runnable()
     525        {
     526          public void run()
     527          {
     528            SessionControl sc = null;
     529            try
     530            {
     531              sc = getSessionControl();
     532            }
     533            catch (Throwable t)
     534            {}
     535            if (sc == null || !sc.isLoggedIn())
     536            {
     537              stop();
     538            }
     539          }
     540        }
     541      );
     542      t.start();
    506543    }
    507544  }
     
    633670  public SessionControl getSessionControl()
    634671  {
    635     if (sc == null)
    636     {
    637       Application.start(false);
    638       sc = Application.newSessionControl("net.sf.basedb.clients.jobagent",
    639         SocketUtil.getLocalHost().toString(), null);
    640     }
    641     if (!sc.isLoggedIn())
    642     {
    643       sc.login(login, password, null, false);
     672    try
     673    {
     674      if (sc == null)
     675      {
     676        log.info("Starting BASE application");
     677        Application.start(false);
     678        sc = Application.newSessionControl("net.sf.basedb.clients.jobagent",
     679          SocketUtil.getLocalHost().toString(), null);
     680      }
     681      if (!sc.isLoggedIn())
     682      {
     683        log.info("Logging in as user: " + login);
     684        sc.login(login, password, "Job agent running on host " + getServerName(), false);
     685      }
     686    }
     687    catch (Throwable t)
     688    {
     689      log.error(t.getMessage(), t);
    644690    }
    645691    return sc;
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/JobExecutor.java

    r2641 r2643  
    6262    to the job executor to decide how to handle this. It may either decide
    6363    to kill the job or let it continue as if nothing happened.
     64    <p>
     65    The job sent to this method has it's status set to {@link Job.Status#PREPARED}.
     66    The implementation of this method must change the status to either
     67    {@link Job.Status#DONE} or {@link Job.Status#ERROR}. If the status hasn't changed
     68    the job agent will set the status to signal an unknown error.
    6469   
    6570    @param sc A <code>SessionControl</code> where the owner of the job is
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/JobRunner.java

    r2641 r2643  
    2424package net.sf.basedb.clients.jobagent;
    2525
     26import java.util.Arrays;
     27
     28import net.sf.basedb.core.DbControl;
     29import net.sf.basedb.core.ItemModifiedException;
    2630import net.sf.basedb.core.Job;
     31import net.sf.basedb.core.SessionControl;
    2732
    2833/**
     
    4550  */
    4651  private static final org.apache.log4j.Logger log =
    47     org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.Agent");
     52    org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.JobRunner");
    4853
    4954  private final Job job;
     
    8085      return;
    8186    }
     87    DbControl dc = null;
     88    SessionControl sc = null;
    8289    try
    8390    {
    8491      log.info("Executing job " + job + " using executor " + jobExecutor);
    85       jobExecutor.executeJob(agent.getImpersonatedSessionControl(job), agent, job, slotToUse);
     92     
     93      // Log in as the owner of the job
     94      sc = agent.getImpersonatedSessionControl(job);
     95      dc = sc.newDbControl();
     96     
     97      // Load a fresh copy of the job
     98      Job j = Job.getById(dc, job.getId());
     99     
     100      // Status may have changed since the queue was checked
     101      if (j.getStatus() != Job.Status.WAITING) return;
     102     
     103      // Change the status to Job.Status.PREPARED
     104      j.setPrepared(agent.getServerName());
     105      try
     106      {
     107        dc.commit();
     108      }
     109      catch (ItemModifiedException ex)
     110      {
     111        // Another thread or agent has started the job already, return
     112        // without starting the job
     113        return;
     114      }
     115     
     116      try
     117      {
     118        jobExecutor.executeJob(sc, agent, j, slotToUse);
     119      }
     120      catch (Throwable t)
     121      {
     122        // Something went wrong, update status to ERROR
     123        dc = sc.newDbControl();
     124        j = Job.getById(dc, j.getId());
     125        j.doneError(t.getMessage(), Arrays.asList(t));
     126        dc.commit();
     127        throw t;
     128      }
     129     
     130      // Check that that the status has changed from prepared
     131      dc = sc.newDbControl();
     132      j = Job.getById(dc, j.getId());
     133      if (j.getStatus() == Job.Status.PREPARED)
     134      {
     135        j.doneError("Unknown error, job was never executed.");
     136        log.error("Unknown error: Job " + job + " was never executed by executor " + jobExecutor);
     137      }
     138      else
     139      {
     140        log.info("Done executing job: " + job + " with executor " + jobExecutor);
     141      }
     142      dc.commit();
    86143    }
    87144    catch (Throwable t)
     
    92149    {
    93150      agent.jobDone(job, slotToUse);
     151      if (dc != null) dc.close();
     152      if (sc != null && sc.isLoggedIn())
     153      {
     154        try
     155        {
     156          sc.logout();
     157        }
     158        catch (Throwable t)
     159        {
     160          log.error(t.getMessage(), t);
     161        }
     162      }
    94163    }
    95164  }
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/DummyJobExecutor.java

    r2641 r2643  
    3333  This is dummy job executor implementation which just sets the status
    3434  of each job to {@link net.sf.basedb.core.Job.Status#DONE} without actually
    35   executin the job. This class is usful for debuggin the job agent application.
     35  executing the job. This class is usful for debugging the job agent application.
    3636
    3737  @author nicklas
     
    7171      job.doneOk("Not really, but used for testing job agent");
    7272      dc.commit();
     73      log.info("Done executing: " + job);
    7374    }
    7475    finally
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/ProcessJobExecutor.java

    r2641 r2643  
    2424package net.sf.basedb.clients.jobagent.executors;
    2525
     26import java.io.BufferedReader;
     27import java.io.IOException;
     28import java.io.InputStream;
     29import java.io.InputStreamReader;
     30import java.io.OutputStream;
     31import java.io.PrintWriter;
     32import java.io.Reader;
     33import java.io.StringWriter;
     34import java.io.Writer;
     35import java.util.ArrayList;
     36import java.util.Arrays;
     37import java.util.List;
     38
    2639import net.sf.basedb.clients.jobagent.Agent;
    2740import net.sf.basedb.clients.jobagent.JobExecutor;
     41import net.sf.basedb.core.DbControl;
    2842import net.sf.basedb.core.Job;
    2943import net.sf.basedb.core.SessionControl;
     
    7084  {
    7185    log.info("Executing job: " + job);
    72     // TODO - start new process for executing the job
     86    String classPath = System.getProperty("java.class.path");
     87   
     88    List<String> cmd = new ArrayList<String>(10);
     89    cmd.add("java");
     90    cmd.add("-server");
     91    cmd.add("-cp");
     92    cmd.add(classPath);
     93    cmd.add("net.sf.basedb.clients.jobagent.executors.ThreadJobExecutor");
     94    cmd.add(Integer.toString(job.getId()));
     95
     96    log.debug("Using class path: " + classPath);
     97    ProcessBuilder builder = new ProcessBuilder(cmd);
     98    builder.redirectErrorStream(true);
     99   
     100    Process process = null;
     101    DbControl dc = null;
     102    try
     103    {
     104      try
     105      {
     106        process = builder.start();
     107      }
     108      catch (Throwable t)
     109      {
     110        log.error("Error executing job: " + job, t);
     111        dc = sc.newDbControl();
     112        job = Job.getById(dc, job.getId());
     113        job.doneError(t.getMessage(), Arrays.asList(t));
     114        dc.commit();
     115        return;
     116      }
     117     
     118      // Set up threads for reading output
     119      StringWriter result = new StringWriter();
     120      Thread t = new Thread(new StreamRedirector(
     121        new InputStreamReader(process.getInputStream()), result));
     122      t.start();
     123
     124      try
     125      {
     126        log.info("Waiting for process to end");
     127        int exitCode = process.waitFor();
     128        if (exitCode != 0)
     129        {
     130          log.error("Process ended with exit code: " + exitCode);
     131          log.error("Process output is: " + result.toString());
     132          dc = sc.newDbControl();
     133          job = Job.getById(dc, job.getId());
     134          job.doneError("Job ended with exit code: " + exitCode,
     135            Arrays.asList(new Exception(result.toString())));
     136          dc.commit();
     137        }
     138        else
     139        {
     140          log.info("Process ended");
     141        }
     142      }
     143      catch (InterruptedException ex)
     144      {
     145        log.info("Job was interrupted: " + job, ex);
     146        // Kill the process
     147        process.destroy();
     148        dc = sc.newDbControl();
     149        job = Job.getById(dc, job.getId());
     150        job.doneError(ex.getMessage(), Arrays.asList(ex));
     151        dc.commit();
     152      }
     153    }
     154    finally
     155    {
     156      if (dc != null) dc.close();
     157    }
     158   
    73159  }
    74160
     
    76162  {}
    77163  // -------------------------------------------
     164 
     165 
     166  public static class StreamRedirector
     167    implements Runnable
     168  {
     169 
     170    private final Reader in;
     171    private final Writer out;
     172    public StreamRedirector(Reader in, Writer out)
     173    {
     174      this.in = in;
     175      this.out = out;
     176    }
     177   
     178    public void run()
     179    {
     180      BufferedReader reader = new BufferedReader(in);
     181      PrintWriter writer = new PrintWriter(out);
     182      String line = null;
     183      try
     184      {
     185        while ((line = reader.readLine()) != null)
     186        {
     187          writer.println(line);
     188        }
     189      }
     190      catch (IOException ex)
     191      {}
     192    }
     193  }
     194 
    78195}
  • trunk/src/clients/jobagent/net/sf/basedb/clients/jobagent/executors/ThreadJobExecutor.java

    r2641 r2643  
    2424package net.sf.basedb.clients.jobagent.executors;
    2525
     26import java.util.Arrays;
     27import java.util.List;
     28
    2629import net.sf.basedb.clients.jobagent.Agent;
    2730import net.sf.basedb.clients.jobagent.JobExecutor;
     31import net.sf.basedb.core.DbControl;
    2832import net.sf.basedb.core.Job;
     33import net.sf.basedb.core.PluginExecutionRequest;
     34import net.sf.basedb.core.PluginResponse;
    2935import net.sf.basedb.core.SessionControl;
     36import net.sf.basedb.core.plugin.Response;
    3037
    3138/**
     
    5057  */
    5158  private static final org.apache.log4j.Logger log =
    52     org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.executors.ProcessJobExecutor");
     59    org.apache.log4j.LogManager.getLogger("net.sf.basedb.clients.jobagent.executors.ThreadJobExecutor");
    5360 
    5461 
     
    6673  {
    6774    log.info("Executing job: " + job);
    68     // TODO - execute job
     75    DbControl dc = null;
     76    try
     77    {
     78      dc = sc.newDbControl();
     79     
     80      // Load a fresh copy of the job item
     81      job = Job.getById(dc, job.getId());
     82      PluginExecutionRequest exec = null;
     83     
     84      try
     85      {
     86        exec = job.execute(null, agent.getServerName());
     87      }
     88      catch (Throwable t)
     89      {
     90        //  The job could not be started, set error status and send message to owner
     91        job.doneError(t.getMessage(), Arrays.asList(t));
     92        log.error("Error executing job: " + job, t);
     93      }
     94      dc.commit();
     95
     96      PluginResponse response = exec.invoke();
     97      if (response.getStatus() == Response.Status.ERROR)
     98      {
     99        // There was an error, job status and message is already sent to the user
     100        log.error("Error executing job: " + job + "; " + response.getMessage());
     101        List<? extends Throwable> errors = response.getErrorList();
     102        if (errors != null)
     103        {
     104          for (Throwable t : errors)
     105          {
     106            log.error(t.getMessage(), t);
     107          }
     108        }
     109      }
     110      else
     111      {
     112        log.info("Done executing: " + job);
     113      }
     114    }
     115    finally
     116    {
     117      if (dc != null) dc.close();
     118    }
    69119  }
    70120
     
    72122  {}
    73123  // -------------------------------------------
     124 
     125  public static void main(String[] args)
     126  {
     127    System.out.println("Running job with ID: " + args[0]);
     128    System.exit(11);
     129  }
     130 
    74131}
Note: See TracChangeset for help on using the changeset viewer.