Changeset 6432


Ignore:
Timestamp:
Mar 14, 2014, 8:23:38 AM (10 years ago)
Author:
Nicklas Nordborg
Message:

References #1796: Improve support for jobs running on external servers

Added an extension point (net.sf.basedb.core.signal.job) and ExtensionSignalTransporter that uses this extension point to send signals to jobs. The web interface has been updated to send 'ABORT' and 'STATUS' signals also to jobs in the 'WAITING' state.

The 'WAITING' state is somewhat redefined so that external jobs can use this state to indicate that the job has actually been queued. This means that jobs are not put in 'WAITING' state as soon as parameters has been added to it, but Job.setScheduled() must be called first. This change probably affect a lot of the test code only (must check this), but not regular usage with the request/response api.

Added externalId property to job so that external jobs have a place to store an identifier.

Location:
trunk
Files:
1 added
11 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/core/core-extensions.xml

    r6424 r6432  
    152152  </extension-point>
    153153 
     154  <extension-point
     155    id="net.sf.basedb.core.signal.job">
     156    <action-class>net.sf.basedb.core.signal.SignalHandler</action-class>
     157    <name>Job signal handler</name>
     158    <description>
     159      Extension point for sending signals to jobs. Intended for jobs
     160      running on external servers outside of direct control for BASE.
     161      The ClientContext is populated with the current SessionControl and
     162      Job item, and the "signal-uri" attribute is set to the URI registered
     163      with Job.setSignalTransporter. It is recommended that implementations
     164      return quickly. If the action takes a long time the handler should spawn
     165      a new thread.
     166    </description>
     167  </extension-point>
     168 
    154169  <extension
    155170    id="net.sf.basedb.core.uri.http-connection-manager"
  • trunk/src/core/net/sf/basedb/core/Install.java

    r6424 r6432  
    118118    method.
    119119  */
    120   public static final int NEW_SCHEMA_VERSION = Integer.valueOf(115).intValue();
     120  public static final int NEW_SCHEMA_VERSION = Integer.valueOf(116).intValue();
    121121 
    122122  public static synchronized int createTables(SchemaGenerator.Mode mode, ProgressReporter progress,
  • trunk/src/core/net/sf/basedb/core/Job.java

    r6127 r6432  
    2929import net.sf.basedb.core.data.PluginConfigurationData;
    3030import net.sf.basedb.core.data.PluginDefinitionData;
    31 
    3231import net.sf.basedb.core.plugin.Plugin;
    3332import net.sf.basedb.core.plugin.InteractivePlugin;
     
    3635import net.sf.basedb.core.plugin.Response;
    3736import net.sf.basedb.core.plugin.Plugin.MainType;
     37import net.sf.basedb.core.signal.ExtensionSignalTransporter;
    3838import net.sf.basedb.core.signal.Signal;
    3939import net.sf.basedb.core.signal.SignalTransporter;
     40import net.sf.basedb.util.extensions.ClientContext;
    4041
    4142import java.util.Collection;
     
    396397
    397398  /**
     399    The maximum allowed length of the external ID.
     400    @since 3.3
     401  */
     402  public static final int MAX_EXTERNAL_ID_LENGTH = JobData.MAX_EXTERNAL_ID_LENGTH;
     403
     404  /**
     405    Get the external ID of a job running on an external server.
     406    @since 3.3
     407  */
     408  public String getExternalId()
     409  {
     410    return getData().getExternalId();
     411  }
     412  /**
     413    Set the external ID.
     414    @since 3.3
     415  */
     416  public void setExternalId(String externalId)
     417  {
     418    checkPermission(Permission.WRITE);
     419    getData().setExternalId(StringUtil.setNullableString(externalId, "externalId", MAX_EXTERNAL_ID_LENGTH));
     420  }
     421
     422  /**
    398423    Get the type of job, ie. if it is a job executed by a plugin
    399424    or not.
     
    836861    Note 3! This method always returns the same transporter instance if
    837862    it is called multiple times.
     863    <p>
     864    Note 4! If the signal transporter is a {@link ExtensionSignalTransporter}
     865    the extension point is "net.sf.basedb.core.signal.job". The client
     866    context is populated with a {@link SessionControl} and the current item
     867    is this {@link Job}. The information in the job should be considered read-only,
     868    if the signal handler wants to update information it must create a new
     869    {@link DbControl} from the session control and re-load the job.
    838870   
    839871    @return An initialised signal transporter object, or null
     
    858890          SignalTransporter transporter = (SignalTransporter)Class.forName(transporterClass).newInstance();
    859891          transporter.init(initParams);
     892         
     893          if (transporter instanceof ExtensionSignalTransporter)
     894          {
     895            ExtensionSignalTransporter xtTransporter = (ExtensionSignalTransporter)transporter;
     896            ClientContext context = new ClientContext(getSessionControl(), this);
     897            xtTransporter.setContext(context);
     898            xtTransporter.setExtensionPointId("net.sf.basedb.core.signal.job");
     899          }
     900         
    860901          signalTransporter = new JobSignalTransporter(transporter);
    861902        }
     
    870911 
    871912  /**
     913    Request that the underlying job updates the status information about
     914    itself. This method is intended to be used for external jobs that
     915    are not BASE plug-ins (which can report progress through the plug-in API).
     916
     917    This method sends a {@link Signal#STATUS} signal to the
     918    {@link SignalTransporter} that has been registered for the job.
     919    If there is no signal transporter this method does nothing.
     920   
     921    The signal transporter should deliver the signal as quickly as possible
     922    and then return. The actual progress update may take place at a later
     923    time when it is convenient to do so for the job.
     924   
     925    @return TRUE if a {@link Signal#STATUS} could be sent, FALSE if not
     926    @since 3.3
     927  */
     928  public boolean requestStatusUpdate()
     929  {
     930    boolean signalSent = false;
     931    if (hasSignalTransporter())
     932    {
     933      try
     934      {
     935        SignalTransporter transporter = getSignalTransporter();
     936        Collection<Signal> supported = transporter.getSupportedSignals();
     937        if (supported == null || supported.contains(Signal.STATUS))
     938        {
     939          transporter.send(Signal.STATUS);
     940          signalSent = true;
     941        }
     942      }
     943      catch (RuntimeException ex)
     944      {
     945        ex.printStackTrace();
     946      }
     947    }
     948    return signalSent;
     949  }
     950
     951 
     952  /**
    872953    Get the date and time the job ended.
    873954    @return A date, or null if the job hasn't ended
     
    879960
    880961  /**
     962    Set the job's status to {@link Status#WAITING}. This status is
     963    used to signal that a job has been fully configured and is ready
     964    for execution.
     965   
     966    @param server Name of the server which this job will executed on
     967    @param agent The job agent that is going to execute the
     968      job or null if it is executed by something else, eg. the
     969      internal job queue
     970    @throws PermissionDeniedException If the logged in user doesn't have write permission.
     971    @since 3.3
     972  */
     973  public void setScheduled(String server, JobAgent agent)
     974    throws PermissionDeniedException
     975  {
     976    checkPermission(Permission.WRITE);
     977    Job.Status status = getStatus();
     978    if (status != Status.UNCONFIGURED)
     979    {
     980      throw new PermissionDeniedException("Can't schedule a job with status '"+getStatus()+"': "+toString());
     981    }
     982    JobData data = getData();
     983    data.setScheduled(new Date());
     984    data.setStatus(Status.WAITING.getValue());
     985    data.setServer(StringUtil.setNullableString(server, "server", MAX_SERVER_LENGTH));
     986    getData().setJobAgentId(agent == null ? null : agent.getId());
     987  }
     988 
     989  /**
    881990    Set the job's status to {@link Status#PREPARED}. This status is
    882     used to signal that a job is about to be executed, but the actual excecution
     991    used to signal that a job is about to be executed, but the actual execution
    883992    hasn't started yet. This status prevents two job agents from trying to
    884993    start the same job twice.
     
    13501459      throw new PermissionDeniedException("Cannot configure an executing job: "+toString());
    13511460    }
    1352     getData().setStatus(Status.WAITING.getValue());
    1353     getData().setScheduled(new Date());
     1461    //getData().setStatus(Status.WAITING.getValue());
     1462    //getData().setScheduled(new Date());
    13541463    setParameterValuesInternal(name, label, description, parameterType, Arrays.asList(value), true);
    13551464  }
     
    14021511      throw new PermissionDeniedException("Cannot configure an executing job: "+toString());
    14031512    }
    1404     getData().setStatus(Status.WAITING.getValue());
    1405     getData().setScheduled(new Date());
     1513    //getData().setStatus(Status.WAITING.getValue());
     1514    //getData().setScheduled(new Date());
    14061515    setParameterValuesInternal(name, label, description, parameterType, values, true);
    14071516  }
  • trunk/src/core/net/sf/basedb/core/Update.java

    r6424 r6432  
    208208    </td>
    209209  </tr>
     210 
     211  <tr>
     212    <td>116</td>
     213    <td>
     214      Added {@link JobData#getExternalId()}. No special database update is needed. Only increase
     215      the schema version.
     216    </td>
     217  </tr>
    210218  </table>
    211219
     
    340348      }
    341349     
    342       if (schemaVersion < 115)
    343       {
    344         if (progress != null) progress.display((int)(progress_current), "--Updating schema version: " + schemaVersion + " -> 115...");
    345         // Schemaversion 115 only updates the version number
    346         schemaVersion = setSchemaVersionInTransaction(session, 115);
    347         progress_current += progress_step;
     350      if (schemaVersion < 116)
     351      {
     352        if (progress != null) progress.display((int)(progress_current), "--Updating schema version: " + schemaVersion + " -> 116...");
     353        // Schemaversion 115 and 116 only updates the version number
     354        schemaVersion = setSchemaVersionInTransaction(session, 116);
     355        progress_current += 2 * progress_step;
    348356      }
    349357
  • trunk/src/core/net/sf/basedb/core/data/JobData.java

    r6127 r6432  
    105105  }
    106106 
     107  /**
     108    The maximum allowed length of the external ID.
     109    @since 3.3
     110  */
     111  public static final int MAX_EXTERNAL_ID_LENGTH = 255;
     112  private String externalId;
     113  /**
     114    External ID of a job running on an external system.
     115    @hibernate.property column="`external_id`" type="string" length="255" not-null="false"
     116    @since 3.3
     117  */
     118  public String getExternalId()
     119  {
     120    return externalId;
     121  }
     122  public void setExternalId(String externalId)
     123  {
     124    this.externalId = externalId;
     125  }
     126 
    107127  private PluginDefinitionData pluginDefinition;
    108128  /**
  • trunk/src/core/net/sf/basedb/core/signal/LocalSignalReceiver.java

    r6127 r6432  
    114114  {
    115115    logger.info("Receiving signal " + signal.getId() + " to " + handlerId);
    116     SignalHandler handler = super.getSignalHandler(handlerId);
     116    SignalHandler handler = getSignalHandler(handlerId);
    117117    if (handler != null) handler.handleSignal(signal);
    118118  }
  • trunk/src/core/net/sf/basedb/core/signal/Signal.java

    r5399 r6432  
    6868    );
    6969
     70  /**
     71    The STATUS signal, that requests that the receiver should
     72    update the status on the system. This signal is typically sent
     73    to a job managed by an external job manager when someone
     74    on the BASE side want to have an update about the progress.
     75    @since 3.3
     76  */
     77  public static final Signal STATUS =
     78    registerSignal("STATUS", "Status",
     79      "Sending this signal indicates that the system wants a status " +
     80      "update from the receiver. Exactly what this means is up to the sender " +
     81      "and reciever to agree on. Typically, this signal can be sent to external " +
     82      "job managers when someone on the BASE side wants to have an update about the progress."
     83    );
    7084 
    7185  /**
  • trunk/src/core/net/sf/basedb/core/signal/SignalHandler.java

    r4516 r6432  
    2424import java.util.Collection;
    2525
     26import net.sf.basedb.util.extensions.Action;
     27
    2628/**
    2729  A signal handler is an object that is able to receive and act on signals.
     
    4143*/
    4244public interface SignalHandler
     45  extends Action
    4346{
    4447  /**
    45     Handle the given signal.
     48    Handle the given signal. It is recommended that signals are handled
     49    as quickly as possible. If there is a risk that it may take a long time
     50    it is recommended to spawn a new thread and handle the signal asynchronously.
    4651    @param signal The signal to handle
    4752    @throws UnsupportedSignalException If the signal is not supported
  • trunk/www/views/jobs/ajax.jsp

    r6124 r6432  
    5151    dc = sc.newDbControl();
    5252    Job job = Job.getById(dc, itemId);
     53    job.requestStatusUpdate();
    5354    json.put("id", job.getId());
    5455    json.put("jobStatus", job.getStatus().name());
  • trunk/www/views/jobs/index.jsp

    r6192 r6432  
    263263    dc = sc.newDbControl();
    264264    Job job = Job.getById(dc, cc.getId());
    265     if (job.getStatus() == Job.Status.WAITING)
    266     {
    267       job.doneError("Aborted by user");
     265    SignalTransporter signalTransporter = job.getSignalTransporter();
     266    if (signalTransporter == null)
     267    {
     268      if (job.getStatus() == Job.Status.WAITING)
     269      {
     270        job.doneError("Aborted by user");
     271      }
    268272    }
    269273    else
    270274    {
    271       SignalTransporter signalTransporter = job.getSignalTransporter();
    272       if (signalTransporter != null) signalTransporter.send(Signal.ABORT);
     275      signalTransporter.send(Signal.ABORT);
    273276    }
    274277    dc.commit();
  • trunk/www/views/jobs/view_job.jsp

    r6378 r6432  
    162162  }
    163163 
    164   // Check if the plug-in supports the "Abort" signal
    165   boolean supportsAbort = status == Job.Status.WAITING && writePermission;
    166   if (status == Job.Status.EXECUTING && writePermission)
     164  // Check if the job can be aborted
     165  boolean supportsAbort = false;
     166 
     167  // The status must be executable or EXECUTING
     168  if ((status == Job.Status.EXECUTING || status.isExecutable()) && writePermission)
    167169  {
     170    // Check if a signal-transporter exists
    168171    try
    169172    {
    170173      SignalTransporter signalTransporter = job.getSignalTransporter();
    171       Collection<Signal> supportedSignals = signalTransporter != null ?
    172         signalTransporter.getSupportedSignals() : null;
    173       supportsAbort = signalTransporter != null &&
    174         (supportedSignals == null || supportedSignals.contains(Signal.ABORT));
     174      if (signalTransporter == null)
     175      {
     176        // The job is only abortable if it is waiting
     177        supportsAbort = status == Job.Status.WAITING;
     178      }
     179      else
     180      {
     181        // Otherwise we must check with the signal transporter
     182        Collection<Signal> supportedSignals = signalTransporter.getSupportedSignals();
     183        supportsAbort = supportedSignals == null || supportedSignals.contains(Signal.ABORT);
     184      }
    175185    }
    176186    catch (Exception ex)
    177187    {}
    178188  }
     189  if (autoUpdate) job.requestStatusUpdate();
    179190  %>
    180191  <base:page type="popup" title="<%=title%>" id="view-page"> 
Note: See TracChangeset for help on using the changeset viewer.