Changeset 4074


Ignore:
Timestamp:
Jan 8, 2008, 1:06:44 PM (15 years ago)
Author:
Nicklas Nordborg
Message:

References #636: Kill a running job

Infrastructure is now in place. Implementation for internal job queue has been tested and working. Still need:

  • Implement signal transportation to job agents and via progress reporter
  • Implement support in other core plug-ins (only import plug-ins based on FlatFileParser? are working right now)
  • Document everything
Location:
trunk
Files:
2 added
24 edited

Legend:

Unmodified
Added
Removed
  • trunk/config/dist/base.config

    r3871 r4074  
    7777# If the internal job queue should be enabled or not
    7878jobqueue.internal.enabled            = true
     79
     80# Signal receiver class for sending signals to running jobs (to abort them)
     81jobqueue.internal.signalreceiver.class = net.sf.basedb.core.signal.LocalSignalReceiver
     82jobqueue.internal.signalreceiver.init  = localhost:0
    7983
    8084# If plugins with useInteralJobQueue = false should be executed or not
  • trunk/config/dist/log4j.properties

    r3992 r4074  
    5454#log4j.logger.net.sf.basedb.core.storage=debug
    5555
     56### Log Signal handling events
     57#log4j.logger.net.sf.basedb.core.signal=debug
     58
     59
    5660# -----------------
    5761# Migration loggers
  • trunk/src/core/net/sf/basedb/core/Install.java

    r4034 r4074  
    107107    method.
    108108  */
    109   public static final int NEW_SCHEMA_VERSION = Integer.valueOf(46).intValue();
     109  public static final int NEW_SCHEMA_VERSION = Integer.valueOf(47).intValue();
    110110 
    111111  public static synchronized void createTables(boolean update, final ProgressReporter progress)
  • trunk/src/core/net/sf/basedb/core/InternalJobQueue.java

    r4073 r4074  
    2626
    2727import net.sf.basedb.core.plugin.Response;
     28import net.sf.basedb.core.signal.SignalReceiver;
    2829import net.sf.basedb.core.data.JobData;
    2930import net.sf.basedb.core.data.PluginDefinitionData;
     
    201202  */
    202203  private boolean ignoreUseInternalJobQueueFlag = false;
    203  
     204
     205  /**
     206    Use the local signal receiver only.
     207  */
     208  private static SignalReceiver signalReceiver;
     209
    204210  /**
    205211    Create the job queue. The one and only instance is created
     
    226232    }
    227233    runners = Collections.synchronizedSet(new HashSet<JobRunner>());
     234   
     235    // Signal receiver
     236    String signalReceiverClass = Config.getString("jobqueue.internal.signalreceiver.class");
     237    String signalReceiverInit = Config.getString("jobqueue.internal.signalreceiver.init");
     238    if (signalReceiverClass == null)
     239    {
     240      signalReceiverClass = "net.sf.basedb.core.signal.LocalSignalReceiver";
     241      if (signalReceiverInit == null) signalReceiverInit = "localhost:0";
     242    }
     243    try
     244    {
     245      signalReceiver = (SignalReceiver)Class.forName(signalReceiverClass).newInstance();
     246      signalReceiver.init(signalReceiverInit);
     247    }
     248    catch (Exception ex)
     249    {
     250      throw new BaseException(ex);
     251    }
    228252   
    229253    // Configure thread group
     
    430454          throw t;
    431455        }
    432         //exec.setSignalReceiver(signalReceiver);
     456        exec.registerSignalReceiver(signalReceiver, false);
    433457        dc.commit();
    434458       
  • trunk/src/core/net/sf/basedb/core/Job.java

    r4034 r4074  
    3939import net.sf.basedb.core.plugin.Response;
    4040import net.sf.basedb.core.plugin.Plugin.MainType;
     41import net.sf.basedb.core.signal.SignalTransporter;
    4142
    4243import java.util.Collection;
     
    591592  }
    592593 
     594  public void setSignalTransporter(Class<? extends SignalTransporter> clazz, String initParams)
     595  {
     596    checkPermission(Permission.WRITE);
     597    if (clazz == null) throw new InvalidUseOfNullException("clazz");
     598    if (initParams == null) initParams = "";
     599    getData().setSignalTransporter(clazz.getName() + ":" + initParams);
     600  }
     601 
     602  public boolean hasSignalTransporter()
     603  {
     604    return getData().getSignalTransporter() != null;
     605  }
     606 
     607  public SignalTransporter getSignalTransporter()
     608  {
     609    String tmp = getData().getSignalTransporter();
     610    SignalTransporter transporter = null;
     611    if (tmp != null)
     612    {
     613      try
     614      {
     615        String[] tmp2 = tmp.split(":", 2);
     616        String transporterClass = tmp2[0];
     617        String initParams = tmp2.length > 1 ? tmp2[1] : null;
     618        transporter = (SignalTransporter)Class.forName(transporterClass).newInstance();
     619        transporter.init(initParams);
     620      }
     621      catch (Exception ex)
     622      {
     623        throw new BaseException(ex);
     624      }
     625    }
     626    return transporter;
     627  }
     628 
    593629  /**
    594630    Get the date and time the job ended.
     
    705741    data.setEnded(new Date());
    706742    data.setStackTrace(null);
     743    data.setSignalTransporter(null);
    707744    if (getSendMessage()) sendMessage();
    708745  }
     
    726763    data.setStackTrace(null);
    727764    data.setEnded(new Date());
     765    data.setSignalTransporter(null);
    728766    if (getSendMessage()) sendMessage();
    729767  }
     
    747785    data.setPercentComplete(100);
    748786    data.setStatus(Status.ERROR.getValue());
     787    data.setSignalTransporter(null);
    749788    if (errors != null)
    750789    {
     
    12271266    public void display(int percent, String message)
    12281267    {
    1229       // Do not update unless at least some time has passed
    1230       if (System.currentTimeMillis() - lastUpdate > UPDATE_INTERVAL)
     1268      // Do not update unless at least some time has passed, or the plug-in has finished
     1269      if (System.currentTimeMillis() - lastUpdate > UPDATE_INTERVAL || percent == 100)
    12311270      {
    12321271        lastUpdate = System.currentTimeMillis();
     
    12841323    }
    12851324   
    1286     void setStarted()
     1325    void setStarted(Class<? extends SignalTransporter> signalTransporter, String signalId)
    12871326    {
    12881327      DbControl dc = null;
     
    12911330        dc = sc.newDbControl();
    12921331        Job job = Job.getById(dc, jobId);
    1293         job.start("Starting...", server);       
     1332        if (signalTransporter != null && signalId != null)
     1333        {
     1334          job.setSignalTransporter(signalTransporter, signalId);
     1335        }
     1336        job.start("Starting...", server);
    12941337        dc.commit();
    12951338      }
  • trunk/src/core/net/sf/basedb/core/PluginExecutionRequest.java

    r4073 r4074  
    3131import net.sf.basedb.core.signal.SignalReceiver;
    3232import net.sf.basedb.core.signal.SignalTarget;
     33import net.sf.basedb.core.signal.SignalTransporter;
    3334
    3435/**
     
    6465{
    6566  private Job.ProgressReporterImpl progress;
     67  private SignalReceiver signalReceiver = null;
     68  private boolean forceSignalReceiver = false;
    6669 
    6770  PluginExecutionRequest(SessionControl sc, Plugin plugin, String command,
     
    8790    Request request = new RequestImpl(false);
    8891    Response response = pluginResponse.getResponseImpl();
    89     if (progress != null) progress.setStarted();
     92    SignalHandler signalHandler = null;
    9093    try
    9194    {
    92       getPlugin().run(request, response, progress);
     95      Plugin plugin = getPlugin();
     96      String signalId = null;
     97      Class<? extends SignalTransporter> signalTransporter = null;
     98      if (plugin instanceof SignalTarget)
     99      {
     100        // Register a signal handler, receiver and transporter
     101        signalHandler = ((SignalTarget)plugin).getSignalHandler();
     102        if (!forceSignalReceiver && signalHandler != null)
     103        {
     104          SignalReceiver override = signalHandler.getSignalReceiver();
     105          if (override != null) signalReceiver = override;
     106        }
     107        if (signalReceiver != null && signalHandler != null)
     108        {
     109          signalId = signalReceiver.registerSignalHandler(signalHandler);
     110          signalTransporter = signalReceiver.getSignalTransporterClass();
     111        }
     112      }
     113      if (progress != null) progress.setStarted(signalTransporter, signalId);
     114      plugin.run(request, response, progress);
    93115    }
    94116    catch (Throwable t)
    95117    {
    96118      pluginResponse.setError("Error invoking plugin: " + t.getMessage() , t);
     119    }
     120    finally
     121    {
     122      if (signalReceiver != null && signalHandler != null)
     123      {
     124        signalReceiver.unregisterSignalHandler(signalHandler);
     125      }
    97126    }
    98127    done();
     
    121150  }
    122151 
    123   public void setSignalReceiver(SignalReceiver signalReceiver)
     152  /**
     153    Register a signal receiver that will receive signals for this job.
     154    The plug-in must implement the {@link SignalTarget} interface
     155    and provide a {@link SignalHandler}. If not, the job will not be
     156    registered with the signal receiver and it will not be possible to
     157    send signals to the job.
     158   
     159    @param signalReceiver The signal recevier to use
     160    @param force If true, the specified signal recevier will always be used,
     161      even if the plug-in wants to use another receiver implementation
     162  */
     163  public void registerSignalReceiver(SignalReceiver signalReceiver, boolean force)
    124164  {
    125     Plugin p = getPlugin();
    126     SignalHandler signalHandler = null;
    127     if (p instanceof SignalTarget)
    128     {
    129       signalHandler = ((SignalTarget)p).getSignalHandler();
    130     }
    131     if (signalHandler != null)
    132     {
    133       String ID = signalReceiver.registerSignalHandler(signalHandler);
    134       Job job = getJob();
    135       //job.setSignalReceiverId(ID);
    136     }
     165    this.signalReceiver = signalReceiver;
     166    this.forceSignalReceiver = force;
    137167  }
    138168 
  • trunk/src/core/net/sf/basedb/core/Update.java

    r3979 r4074  
    544544    </td>
    545545  </tr>
     546  <tr>
     547    <td>47</td>
     548    <td>
     549      <ul>
     550      <li>Added {@link net.sf.basedb.core.data.JobData#getSignalTransporter()}.
     551      </ul>
     552      No special database update is needed. Only increase the schema version.
     553    </td>
     554  </tr>
    546555  </table>
    547556
     
    774783        if (progress != null) progress.display((int)(45*progress_factor), "--Updating schema version: " + schemaVersion + " -> 46...");
    775784        schemaVersion = setSchemaVersionInTransaction(session, 46);
     785      }
     786
     787      if (schemaVersion < 47)
     788      {
     789        if (progress != null) progress.display((int)(46*progress_factor), "--Updating schema version: " + schemaVersion + " -> 47...");
     790        schemaVersion = setSchemaVersionInTransaction(session, 47);
    776791      }
    777792
  • trunk/src/core/net/sf/basedb/core/data/JobData.java

    r3948 r4074  
    356356  }
    357357
     358  /**
     359    The maximum allowed length of the signal transporter string.
     360  */
     361  public static final int MAX_SIGNAL_TRANSPORTER_LENGTH = 655536;
     362  private String signalTransporter;
     363  /**
     364    Identifies the signal transporter to use. This is a string with
     365    two parts separated by colon (:). The first part is the class name
     366    of the signal transporter class, the second part is the initialisation
     367    string for the transporter.
     368    @hibernate.property column="`signal_transporter`" type="text" not-null="false"
     369    @since 2.6
     370  */
     371  public String getSignalTransporter()
     372  {
     373    return signalTransporter;
     374  }
     375  public void setSignalTransporter(String signalTransporter)
     376  {
     377    this.signalTransporter = signalTransporter;
     378  }
    358379 
    359380  private Map<String, ParameterValueData<?>> parameters;
  • trunk/src/core/net/sf/basedb/core/signal/AbstractSignalHandler.java

    r4073 r4074  
    7878    return supported != null && supported.contains(signal);
    7979  }
     80  public SignalReceiver getSignalReceiver()
     81  {
     82    return null;
     83  }
    8084  // -------------------------------------------
    8185
  • trunk/src/core/net/sf/basedb/core/signal/AbstractSignalReceiver.java

    r4073 r4074  
    2424package net.sf.basedb.core.signal;
    2525
     26import java.net.URI;
    2627import java.util.Collection;
    2728import java.util.Collections;
     
    3435  return values for the {@link #registerSignalHandler(SignalHandler)} method.
    3536  This class will generate values in the form of {@link java.net.URI}:s.
    36   <code>signal://receiverId/handlerId?supportedSignals</code>.
     37  <code>signal://handlerId@receiverId/?supportedSignals</code>.
    3738 
    3839  <ul>
    3940  <li>The <i>receiverId</i> part is given by the parameter in the
    40     {@link #init(String)} method and must be set by the implementing subclass.
     41    {@link #init(String)} method and must be set by the implementing
     42    subclass.
    4143   
    4244  <li>The <i>handlerId</i> part is given by calling {@link System#identityHashCode(Object)}
     
    5153  as higher-level objects.
    5254  <p>
    53   Subclasses should override the {@link #getSignalHandlerId(SignalHandler)} if they
     55  Subclasses should override the {@link #getGlobalSignalId(SignalHandler)} if they
    5456  want to use a different ID generation scheme.
    5557
     
    6264{
    6365
     66  /**
     67    Log signals processing.
     68  */
     69  private static final org.apache.log4j.Logger logger =
     70    org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.AbstractSignalReceiver");
     71 
     72
     73  /**
     74    Maps local signal handler ID to -> signal handler itself.
     75  */
    6476  private Map<String, SignalHandler> handlers;
    6577  private String receiverId;
     
    7789  /**
    7890    Initialise the signal receiver.
    79     @param params The ID of the signal receiver, needed if the default
     91    @param receiverId The ID of the signal receiver, needed if the default
    8092      ID generation should be used
    8193  */
    82   public void init(String params)
     94  public void init(String receiverId)
    8395  {
    8496    handlers = Collections.synchronizedMap(new HashMap<String, SignalHandler>());
    85     receiverId = params;
    86   }
    87  
     97    this.receiverId = receiverId;
     98    logger.info("Initializing signal receiver: id=" + receiverId);
     99  }
     100  /**
     101    Close this signal receiver.
     102  */
    88103  public void close()
    89104  {
     105    logger.info("Closing signal receiver: receiver id=" + receiverId);
    90106    if (handlers != null) handlers.clear();
    91107    handlers = null;
    92108  }
    93109
     110  /**
     111    Register a signal handler with this receiver.
     112    @param handler The signal handler to register
     113    @return The global ID of the signal handler
     114  */
    94115  public String registerSignalHandler(SignalHandler handler)
    95116  {
    96     String id = getSignalHandlerId(handler);
    97     if (handlers != null) handlers.put(id, handler);
    98     return id;
    99   }
    100 
     117    String globalId = getGlobalSignalId(handler);
     118    String localId = getLocalSignalHandlerId(handler);
     119    if (handlers != null) handlers.put(localId, handler);
     120    logger.info("Register signal handler: recevier id = " + receiverId +
     121      "; global id=" + globalId + "; local id=" + localId);
     122    logger.debug("Current number of registered signal handlers: " + handlers.size());
     123    return globalId;
     124  }
     125
     126  /**
     127    Unregister a signal handler.
     128    @param handler The signal handler to unregister
     129  */
    101130  public void unregisterSignalHandler(SignalHandler handler)
    102131  {
    103     if (handlers != null) handlers.remove(getSignalHandlerId(handler));
     132    String localId = getLocalSignalHandlerId(handler);
     133    logger.info("Unregister signal handler: recevier id = " + receiverId +
     134      "; local id=" + localId);
     135    if (handlers != null) handlers.remove(localId);
     136    logger.debug("Current number of registered signal handlers: " + handlers.size());
    104137  }
    105138  // -------------------------------------------
    106139
    107140  /**
    108     Generate a signal handler ID string. This string is returned by
     141    Generate a signal ID string. This string is returned by
    109142    the {@link #registerSignalHandler(SignalHandler)} method and is used
    110143    in {@link SignalTransporter#init(String)} method to initialise
    111     a transporter object.
     144    a transporter object so that it can send signals to the specified handler.
     145    See the class documentation for a description of the format of the
     146    generated string.
    112147   
    113148    @param handler The signal handler to generate the ID for
    114149    @return The signal handler ID
    115150  */
    116   protected String getSignalHandlerId(SignalHandler handler)
     151  protected String getGlobalSignalId(SignalHandler handler)
    117152  {
    118153    StringBuilder sb = new StringBuilder();
    119     sb.append("signal://").append(receiverId).append("/");
    120     sb.append(System.identityHashCode(handler)).append("?");
     154    sb.append("signal://");
     155    sb.append(getLocalSignalHandlerId(handler)).append("@");
     156    sb.append(receiverId).append("/?");
    121157    Collection<Signal> signals = handler.getSupportedSignals();
    122158    if (signals != null)
     
    134170 
    135171  /**
     172    Get the local signal handler id of the given signal handler.
     173    This implementation simply return the system hashcode for the
     174    handler.
     175    @param handler The handler to get the id for
     176    @return The local handler id
     177  */
     178  protected String getLocalSignalHandlerId(SignalHandler handler)
     179  {
     180    return String.valueOf(System.identityHashCode(handler));
     181  }
     182 
     183  /**
    136184    Get the signal handler with a given ID.
    137     @param handlerId The signal handler ID
     185    @param localId The local signal handler ID as
     186      returned by the {@link #getLocalSignalHandlerId(SignalHandler)}
     187      method
    138188    @return The signal handler, or null if no handler is found
    139189  */
    140   protected SignalHandler getSignalHandler(String handlerId)
    141   {
    142     return handlers == null ? null : handlers.get(handlerId);
    143   }
     190  protected SignalHandler getSignalHandler(String localId)
     191  {
     192    return handlers == null ? null : handlers.get(localId);
     193  }
     194 
     195  /**
     196    Process a signal message. If the message can't be understood or
     197    if no handler can be found this method does nothing. The signal will
     198    be delivered to the signal handler in the current thread.
     199    @param message The message to process, the format of the message
     200      must be compatible with the message that {@link
     201      AbstractSignalTransporter#generateSignalMessage(Signal)} generates
     202  */
     203  protected void processSignalMessage(String message)
     204  {
     205    logger.debug("Processing signal message: " + message);
     206    if (message == null) return;
     207    try
     208    {
     209      URI uri = new URI(message);
     210      String localId = uri.getUserInfo();
     211      Signal signal = Signal.getSignal(uri.getQuery());
     212      SignalHandler signalHandler = getSignalHandler(localId);
     213      if (signalHandler != null)
     214      {
     215        signalHandler.handleSignal(signal);
     216      }
     217      else
     218      {
     219        logger.warn("No signal handler found for id: " + localId);
     220      }
     221    }
     222    catch (Exception ex)
     223    {
     224      // Ignore invalid messages
     225      logger.warn("Could not process signal message: " + message, ex);
     226    }
     227  }
     228 
    144229 
    145230}
  • trunk/src/core/net/sf/basedb/core/signal/AbstractSignalTransporter.java

    r4073 r4074  
    3030
    3131/**
     32  Abstract base class that is useful when implementing signal transporters.
    3233
    3334  @author nicklas
     
    3839  implements SignalTransporter
    3940{
     41  /**
     42    Log signals processing.
     43  */
     44  private static final org.apache.log4j.Logger logger =
     45    org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.AbstractSignalTransporter");
    4046 
    41   private String signalHandlerId;
     47  private String globalSignalId;
    4248  private URI signalHandlerURI;
    4349  private Collection<Signal> signals;
     
    5258  /**
    5359    Initialise the transporter. We expect the parameter to be the one
    54     returned from {@link AbstractSignalReceiver#getSignalHandlerId(SignalHandler)}.
     60    returned from {@link AbstractSignalReceiver#getGlobalSignalId(SignalHandler)}.
    5561    If the receiver subclass overrided that method, the corresponding transporter
    5662    subclass should override this method.
     63    @see AbstractSignalReceiver for a description of the format expected
    5764  */
    5865  public void init(String params)
    5966  {
    60     signalHandlerId = params;
     67    logger.info("Initializing signal transporter: id=" + params);
     68    globalSignalId = params;
    6169  }
    62  
    6370 
    6471  public Collection<Signal> getSupportedSignals()
     
    6774    {
    6875      signals = new HashSet<Signal>();
    69       URI uri = getSignalHandlerURI();
     76      URI uri = getSignalURI();
    7077      String query = uri.getQuery();
    7178      if (query != null)
     
    8390  // -------------------------------------------
    8491
    85   protected String getSignalHandlerId()
     92  /**
     93    Generate a signal message string for the given signal. The string
     94    will have the following format:
     95    <code>signal://handlerId@receiverId/?signal</code>
     96    @see AbstractSignalReceiver#processSignalMessage(String)
     97  */
     98  protected String generateSignalMessage(Signal signal)
    8699  {
    87     return signalHandlerId;
     100    String message = "signal://" + getHandlerId() + "@" + getReceiverId() + "/?" + signal.getId();
     101    return message;
    88102  }
    89103 
    90   protected URI getSignalHandlerURI()
     104  /**
     105    Get the raw ID string that was passed to the {@link #init(String)}
     106    method.
     107  */
     108  protected String getGlobalSignalId()
     109  {
     110    return globalSignalId;
     111  }
     112 
     113  /**
     114    Get the URI representation of the global signal ID.
     115    @return An URI
     116    @throws SignalException If the signal ID is not a valid URI.
     117  */
     118  protected URI getSignalURI()
    91119  {
    92120    if (signalHandlerURI == null)
     
    94122      try
    95123      {
    96         signalHandlerURI = new URI(signalHandlerId);
     124        signalHandlerURI = new URI(globalSignalId);
    97125      }
    98126      catch (URISyntaxException ex)
    99127      {
    100         throw new SignalException(signalHandlerId, ex);
     128        throw new SignalException(globalSignalId, ex);
    101129      }
    102130    }
     
    104132  }
    105133 
     134  /**
     135    Get the receiverId part of the signal URI.
     136    @return The ID of the receiver
     137  */
    106138  protected String getReceiverId()
    107139  {
    108     return getSignalHandlerURI().getAuthority();
     140    URI uri = getSignalURI();
     141    String receiverId = uri.getHost();
     142    if (uri.getPort() >= 0) receiverId += ":" + uri.getPort();
     143    return receiverId;
    109144  }
    110145 
     146  /**
     147    Get the local handler ID part of the signal URI.
     148    @return The local ID of the signal handler
     149  */
    111150  protected String getHandlerId()
    112151  {
    113     return getSignalHandlerURI().getPath();
     152    return getSignalURI().getUserInfo();
    114153  }
    115154 
  • trunk/src/core/net/sf/basedb/core/signal/DelegatingSignalHandler.java

    r4073 r4074  
    4646{
    4747  /**
     48    Log signals processing.
     49  */
     50  private static final org.apache.log4j.Logger logger =
     51    org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.DelegatingSignalHandler");
     52 
     53  /**
    4854    Holds the registered signal handlers.
    4955  */
     
    5157 
    5258  /**
     59    If a specific signal receiver must be used.
     60  */
     61  private SignalReceiver signalReceiver;
     62 
     63  /**
    5364    Create a new delegating signal handler. Signal handlers to
    5465    delegate to should be registered with
    5566    {@link #registerSignalHandler(SignalHandler)}.
    56    */
     67  */
    5768  public DelegatingSignalHandler()
    5869  {
     70    this(null);
     71  }
     72 
     73  /**
     74    Create a new delegating signal handler using a specified signal
     75    receiver. Signal handlers to delegate to should be registered with
     76    {@link #registerSignalHandler(SignalHandler)}.
     77    @param signalReceiver The signal receiver that should receive the signals,
     78      or null to use the system default signal receiver
     79  */
     80  public DelegatingSignalHandler(SignalReceiver signalReceiver)
     81  {
    5982    this.handlers = new HashMap<Signal, Set<SignalHandler>>();
     83    this.signalReceiver = signalReceiver;
    6084  }
    6185 
     
    90114  public void handleSignal(Signal signal)
    91115  {
     116    logger.debug("Got signal: " + signal);
    92117    Set<SignalHandler> all = handlers.get(signal);
    93     if (all == null) throw new UnsupportedSignalException(signal);
     118    if (all == null)
     119    {
     120      logger.debug("Signal not supported " + signal);
     121      throw new UnsupportedSignalException(signal);
     122    }
    94123    for (SignalHandler handler : all)
    95124    {
     125      logger.debug("Sending signal " + signal + " to: " + handler);
    96126      handler.handleSignal(signal);
    97127    }
     128  }
     129 
     130  public SignalReceiver getSignalReceiver()
     131  {
     132    return signalReceiver;
    98133  }
    99134  // -------------------------------------------
     
    106141  public void registerSignalHandler(SignalHandler handler)
    107142  {
     143    logger.debug("Registering signal handler: " + handler);
    108144    for (Signal signal : handler.getSupportedSignals())
    109145    {
     
    124160  public void unregisterSignalHandler(SignalHandler handler)
    125161  {
     162    logger.debug("Unregistering signal handler: " + handler);
    126163    Iterator<Map.Entry<Signal, Set<SignalHandler>>> it = handlers.entrySet().iterator();
    127164    while (it.hasNext())
  • trunk/src/core/net/sf/basedb/core/signal/LocalSignalReceiver.java

    r4073 r4074  
    4040  extends AbstractSignalReceiver
    4141{
     42  /**
     43    Log signals processing.
     44  */
     45  private static final org.apache.log4j.Logger logger =
     46    org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.LocalSignalReceiver");
    4247
     48  /**
     49    Holds all registered signal receivers.
     50  */
    4351  private static Map<String, LocalSignalReceiver> receivers =
    4452    Collections.synchronizedMap(new HashMap<String, LocalSignalReceiver>());
    45  
     53
    4654  /**
    4755    Get a signal receiver with a given ID.
     
    8896  }
    8997 
     98  /**
     99    @return {@link LocalSignalTransporter}
     100  */
    90101  public Class<? extends SignalTransporter> getSignalTransporterClass()
    91102  {
     
    94105  // -------------------------------------------
    95106 
     107  /**
     108    Send the signal to a registered handler. If no handler with the
     109    given ID is found this method does nothing. The signal will be
     110    processed by the handler in the current thread.
     111    @param handlerId The ID of a registered handler
     112    @param signal The signal to send
     113  */
    96114  public void send(String handlerId, Signal signal)
    97115  {
     116    logger.info("Receiving signal " + signal.getId() + " to " + handlerId);
    98117    SignalHandler handler = super.getSignalHandler(handlerId);
    99118    if (handler != null) handler.handleSignal(signal);
  • trunk/src/core/net/sf/basedb/core/signal/LocalSignalTransporter.java

    r4073 r4074  
    2525
    2626/**
     27  A signal transporter implementation that can transport signals within
     28  the local virtual machine only. The corresponding receiver class
     29  is {@link LocalSignalReceiver}.
    2730
    2831  @author nicklas
     
    3336  extends AbstractSignalTransporter
    3437{
     38  /**
     39    Log signals processing.
     40  */
     41  private static final org.apache.log4j.Logger logger =
     42    org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.LocalSignalTransporter");
    3543
     44  /**
     45    Create a new signal transporter instance.
     46  */
    3647  public LocalSignalTransporter()
    3748  {}
     
    4152    -------------------------------------------
    4253  */
     54  /**
     55    Send the signal to a local signal receiver. This method will call
     56    {@link LocalSignalReceiver#getSignalReceiver(String)} to find a registered
     57    signal receiver and then call {@link LocalSignalReceiver#send(String, Signal)}.
     58    The signal is sent and processed by the signal handler in the current thread.
     59  */
    4360  public void send(Signal signal)
    4461  {
     62    logger.info("Sending signal " + signal.getId() + " to " + getGlobalSignalId());
    4563    LocalSignalReceiver receiver = LocalSignalReceiver.getSignalReceiver(getReceiverId());
    46     if (receiver != null) receiver.send(getSignalHandlerId(), signal);
     64    if (receiver != null) receiver.send(getHandlerId(), signal);
    4765  }
    4866  // -------------------------------------------
  • trunk/src/core/net/sf/basedb/core/signal/Signal.java

    r4073 r4074  
    5555    Holds all registered signals.
    5656  */
    57   private static Map<String, Signal> signals = new HashMap<String, Signal>();
     57  private static Map<String, Signal> signals;
    5858 
    5959  /**
     
    6969  public static synchronized Signal registerSignal(String id, String name, String description)
    7070  {
    71     if (signals.containsKey(id))
     71    if (signals == null) signals = new HashMap<String, Signal>();
     72    Signal s = signals.get(id);
     73    if (s == null)
    7274    {
    73       return signals.get(id);
     75      s = new Signal(id, name, description);
     76      signals.put(id, s);
    7477    }
    75     Signal s = new Signal(id, name, description);
    76     signals.put(id, s);
    7778    return s;
    7879  }
  • trunk/src/core/net/sf/basedb/core/signal/SignalHandler.java

    r4073 r4074  
    3333  in most cases notify the target about it. How this is done is up to each
    3434  implementation.
     35  <p>
     36  Signal handler implementations need to be partly thread safe. Once they have
     37  been registered with a {@link SignalReceiver} they may receive multiple signals
     38  in different threads at the same time.
    3539
    3640  @author nicklas
     
    5963  */
    6064  public boolean supports(Signal signal);
     65 
     66  /**
     67    Return a specific signal receiever that must be used with this
     68    signal handler. Null should be returned to let the system select
     69    an appropriate signal receiver. If a non-null value is returned, the
     70    system should use this signal receiver instead of the system default.
     71    @return A signal receiver or null
     72  */
     73  public SignalReceiver getSignalReceiver();
     74 
    6175}
  • trunk/src/core/net/sf/basedb/core/signal/SignalReceiver.java

    r4073 r4074  
    3232  the {@link SocketSignalReceiver} and {@link SocketSignalTransporter} implementations
    3333  which uses network sockets to transport signals.
     34  <p>
     35  Signal receivers must be thread safe since a single receiver may be used to
     36  handle signals for multiple jobs at the same time.
    3437 
    3538  @author nicklas
    3639  @version 2.6
     40  @see SignalTransporter
     41  @see SignalHandler
    3742  @base.modified $Date$
    3843*/
     
    7580    @return A string that allows a transporter instance locate and send a signal to
    7681      the given signal handler
     82    @see AbstractSignalReceiver
    7783  */
    7884  public String registerSignalHandler(SignalHandler handler);
  • trunk/src/core/net/sf/basedb/core/signal/SignalTransporter.java

    r4073 r4074  
    3838  and {@link SocketSignalTransporter} uses a {@link java.net.URI} that contains
    3939  the IP number and port of the receiver and an ID for identifying the signal handler.
     40  <p>
     41  Signal transporters need not be thread safe since a new instance is created
     42  for each signal that is going to be sent.
    4043
    4144  @author nicklas
  • trunk/src/core/net/sf/basedb/core/signal/SocketSignalReceiver.java

    r4073 r4074  
    2828import java.net.InetSocketAddress;
    2929import java.net.ServerSocket;
     30import java.net.Socket;
     31import java.net.UnknownHostException;
    3032import java.nio.channels.ClosedByInterruptException;
    3133import java.nio.channels.ServerSocketChannel;
    3234import java.nio.channels.SocketChannel;
    33 
     35import java.util.HashSet;
     36import java.util.List;
     37import java.util.Set;
     38
     39import net.sf.basedb.core.Application;
     40import net.sf.basedb.util.QueryParameters;
    3441import net.sf.basedb.util.SocketUtil;
    3542import net.sf.basedb.util.Values;
     
    3845  A signal receiver implementation that listens on a socket for incoming signals.
    3946  Accordingly, this receiver supports sending signals between different virtual
    40   machines running on different servers. The initialisation parameter
    41   for this should be the port number the receiver should listen on. If null, the
    42   receiver will open a port number at random.
     47  machines running on different servers. The initialisation string for this 
     48  class should be of the format:
     49 
     50  <p>
     51  <code>port=xx&allow=ip-address&allow=ip-address...</code>
     52
     53  <p>
     54  where <code>port</code> is the port number the signal receiver will listen on and
     55  the <code>allow</code> parts are the ip name or numbers of hosts that are allowed
     56  to send signals to the receiver. Except for the special case <code>allow=*</code>,
     57  which allows any remote host to send signals an exact match is required. The
     58  local host is always allowed to send signals.
     59  <p>
     60  If no port is given, the signal receiver will randomly choose a free port
     61  <p>
     62  If no <code>allow</code> tags are given only allow connections from the
     63  local host are allowed.
    4364 
    4465  @author nicklas
     
    4970  extends AbstractSignalReceiver
    5071{
     72  /**
     73    Log signals processing.
     74  */
     75  private static final org.apache.log4j.Logger logger =
     76    org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.SocketSignalReceiver");
    5177
    5278  private InetAddress ip;
    5379  private int port;
    5480  private Thread listener;
     81  private Set<InetAddress> allow;
     82  private boolean allowAll;
    5583 
    5684  /**
    5785    Create a new socket signal receiver. Before it can be used it must
    5886    be initialised with {@link #init(String)}.
    59    */
     87  */
    6088  public SocketSignalReceiver()
    6189  {}
     
    6997    receiver should listen on. If the parameter can't be parsed as a numeric value,
    7098    a random free port will be used. This method will open the socket and
     99    start a separate thread that listens for incoming signals.
    71100  */
    72101  public void init(String params)
    73102  {
    74     port = Values.getInt(params, 0);
    75    
     103    QueryParameters qp = QueryParameters.parseQueryString(params);
     104    // Get the port number we listen on
     105    port = Values.getInt(qp.getValue("port"), 0);
     106   
     107    // Get allowed hosts
     108    allow = new HashSet<InetAddress>();
     109    allowAll = false;
     110    List<String> allowIp = qp.getValues("allow");
     111    if (allowIp != null)
     112    {
     113      for (String ip : allowIp)
     114      {
     115        if ("*".equals(ip))
     116        {
     117          logger.debug("Allow all hosts enabled!");
     118          allowAll = true;
     119        }
     120        else
     121        {
     122          try
     123          {
     124            logger.debug("Adding host " + ip + " to list of allowed hosts.");
     125            allow.add(InetAddress.getByName(ip));
     126          }
     127          catch (UnknownHostException ex)
     128          {
     129            logger.warn("Unknown host: " + ip, ex);
     130          }
     131        }
     132      }
     133    }
     134   
     135    logger.info("Starting socket signal receiver on port: " + port);
    76136    try
    77137    {
     
    81141      port = socket.getLocalPort();
    82142      ip = socket.getInetAddress();
    83       listener = new Thread(new ListenerThread(channel),
    84           "ListenerThread."+this.toString());
     143      listener = new Thread(new ListenerThread(channel), "ListenerThread."+port);
    85144      listener.start();
     145      logger.info("Socket signal receiver is listening on port: " + port);
    86146    }
    87147    catch (IOException ex)
    88148    {
     149      logger.error("Could not initialise socket signal receiver", ex);
    89150      throw new SignalException(ex);
    90151    }
    91     super.init(ip.getHostAddress() + ":" + port);
    92   }
    93 
    94  
     152    super.init(Application.getHostName() + ":" + port);
     153  }
     154
     155  /**
     156    @return {@link SocketSignalTransporter}
     157  */
    95158  public Class<? extends SignalTransporter> getSignalTransporterClass()
    96159  {
     
    98161  }
    99162
     163  /**
     164    Close this receiver and the socket it is listening on.
     165  */
    100166  public void close()
    101167  {
     168    logger.info("Interrupting socket signal receiver on port: " + port);
    102169    listener.interrupt();
    103170  }
     
    133200        {
    134201          SocketChannel incoming = socket.accept();
     202          Socket inSocket = incoming.socket();
     203          InetAddress remoteHost = inSocket.getInetAddress();
    135204         
    136           String cmd = SocketUtil.read(incoming.socket(), true);
    137           String[] tmp = cmd.split("#");
    138           String signalHandlerId = tmp[0];
    139           Signal signal = Signal.getSignal(tmp[1]);
    140           SignalHandler signalHandler = getSignalHandler(signalHandlerId);
    141           signalHandler.handleSignal(signal);
     205          logger.debug("Incoming signal: remote host=" + remoteHost);
     206         
     207          // Check if we are allowed to accept connections from the remote host
     208          if (allowAll || SocketUtil.isLocalHost(remoteHost) || allow.contains(remoteHost))
     209          {
     210            // Read the incoming message
     211            String message = SocketUtil.read(inSocket, true);
     212            logger.debug("Incoming signal accepted: remote host=" + remoteHost +
     213              "; message=" + message);
     214            processSignalMessage(message);
     215          }
     216          else
     217          {
     218            logger.debug("Incoming signal rejected: remote host=" + remoteHost);
     219          }
     220         
    142221          interrupted = Thread.interrupted();
    143222        }
    144223        catch (ClosedByInterruptException ex)
    145224        {
     225          logger.info("Shutting down socket signal receiver on port: " + port);
    146226          interrupted = true;
    147227        }
    148         catch (Throwable t)
    149         {
    150           interrupted = true;
     228        catch (IOException ex)
     229        {
     230          logger.warn("Error on socket signal receiver on port: " + port, ex);
    151231        }
    152232      }
  • trunk/src/core/net/sf/basedb/core/signal/SocketSignalTransporter.java

    r4073 r4074  
    2727import java.net.URI;
    2828
    29 import net.sf.basedb.core.BaseException;
    3029import net.sf.basedb.util.SocketUtil;
    3130
     
    4039{
    4140
     41  /**
     42    Log signals processing.
     43  */
     44  private static final org.apache.log4j.Logger logger =
     45    org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.SocketSignalTransporter");
     46 
     47  /**
     48    Create a new socket signal transporter. Before it can be used it must
     49    be initialised with {@link #init(String)}.
     50  */
     51  public SocketSignalTransporter()
     52  {}
     53 
    4254  /*
    4355    From the SignalTransporter interface
    4456    -------------------------------------------
    4557  */
     58  /**
     59    Connect to the remote host and send the signal.
     60  */
    4661  public void send(Signal signal)
    4762  {
    48     URI uri = getSignalHandlerURI();
    49     String host = uri.getAuthority();
     63    logger.info("Sending signal " + signal.getId() + " to " + getGlobalSignalId());
     64    URI uri = getSignalURI();
     65    String host = uri.getHost();
    5066    int port = uri.getPort();
    51    
     67    String message = generateSignalMessage(signal);
     68    logger.debug("The message is: " + message);
     69    Socket s = null;
    5270    try
    5371    {
    54       Socket s = new Socket(host, port);
    55       SocketUtil.send(s, getSignalHandlerId() + "#" + signal.getId(), true);
    56       SocketUtil.close(s);
     72      s = new Socket(host, port);
     73      SocketUtil.send(s, message, true);
    5774    }
    5875    catch (Exception ex)
    5976    {
    60       throw new BaseException(ex);
     77      logger.error("Error sending signal " + signal.getId() + " to " + getGlobalSignalId(), ex);
     78      throw new SignalException(ex);
    6179    }
    62 
     80    finally
     81    {
     82      if (s != null) SocketUtil.close(s);
     83    }
    6384  }
    6485  // -------------------------------------------
  • trunk/src/core/net/sf/basedb/core/signal/ThreadSignalHandler.java

    r4073 r4074  
    7878{
    7979
     80  /**
     81    Log signals processing.
     82  */
     83  private static final org.apache.log4j.Logger logger =
     84    org.apache.log4j.LogManager.getLogger("net.sf.basedb.core.signal.ThreadSignalHandler");
     85
     86 
    8087  private static final Set<Signal> supported = Collections.singleton(Signal.ABORT);
    8188
     
    122129  public void handleSignal(Signal signal)
    123130  {
     131    logger.debug("Got signal: " + signal);
    124132    if (!workerThread.isAlive()) return;
    125133    if (Signal.ABORT.equals(signal))
    126134    {
     135      logger.debug("Sending signal " + signal + " to thread: " + workerThread);
    127136      workerThread.interrupt();
    128137    }
    129138    else
    130139    {
     140      logger.debug("Signal not supported " + signal);
    131141      throw new UnsupportedSignalException(signal);
    132142    }
  • trunk/src/plugins/core/net/sf/basedb/plugins/AbstractFlatFileImporter.java

    r4073 r4074  
    5252import net.sf.basedb.core.plugin.Response;
    5353import net.sf.basedb.core.plugin.Plugin;
     54import net.sf.basedb.core.signal.SignalException;
    5455import net.sf.basedb.core.signal.SignalHandler;
    5556import net.sf.basedb.core.signal.SignalTarget;
     
    598599            }
    599600            // In case the server is shutting down... throw exception, rollback and quit
    600             if (Thread.interrupted()) throw new BaseException("Thread was interrupted.");
     601            if (Thread.interrupted()) throw new SignalException("Aborted by user.");
    601602          }
    602603        }
     
    641642            }
    642643            // In case the server is shutting down... throw exception, rollback and quit
    643             if (Thread.interrupted()) throw new BaseException("Thread was interrupted.");
     644            if (Thread.interrupted()) throw new SignalException("Aborted by user.");
    644645          }
    645646        }
  • trunk/www/views/jobs/index.jsp

    r3679 r4074  
    3939  import="net.sf.basedb.core.PermissionDeniedException"
    4040  import="net.sf.basedb.core.ItemAlreadyExistsException"
     41  import="net.sf.basedb.core.signal.SignalTransporter"
     42  import="net.sf.basedb.core.signal.Signal"
    4143  import="net.sf.basedb.util.RemovableUtil"
    4244  import="net.sf.basedb.util.ShareableUtil"
     
    238240    redirect = viewPage;
    239241  }
     242  else if ("AbortJob".equals(cmd))
     243  {
     244    ItemContext cc = Base.getAndSetCurrentContext(sc, itemType, pageContext, defaultContext);
     245    dc = sc.newDbControl();
     246    Job job = Job.getById(dc, cc.getId());
     247    if (job.getStatus() == Job.Status.WAITING)
     248    {
     249      job.doneError("Aborted by user");
     250    }
     251    else
     252    {
     253      SignalTransporter signalTransporter = job.getSignalTransporter();
     254      if (signalTransporter != null) signalTransporter.send(Signal.ABORT);
     255    }
     256    dc.commit();
     257    Thread.sleep(500);
     258    redirect = viewPage;
     259  }
    240260  else
    241261  {
  • trunk/www/views/jobs/view_job.jsp

    r4003 r4074  
    4747  import="net.sf.basedb.core.plugin.GuiContext"
    4848  import="net.sf.basedb.core.plugin.Plugin"
     49  import="net.sf.basedb.core.signal.SignalTransporter"
     50  import="net.sf.basedb.core.signal.Signal"
    4951  import="net.sf.basedb.clients.web.Base"
    5052  import="net.sf.basedb.clients.web.util.HTML"
     
    5759  import="java.util.List"
    5860  import="java.util.Collections"
     61  import="java.util.Collection"
    5962%>
    6063<%@ taglib prefix="base" uri="/WEB-INF/base.tld" %>
     
    103106  Formatter<Date> dateFormatter = FormatterFactory.getDateFormatter(sc);
    104107  Formatter<Date> dateTimeFormatter = FormatterFactory.getDateTimeFormatter(sc);
     108 
     109  // Check if the plug-in supports the "Abort" signal
     110  boolean supportsAbort = status == Job.Status.WAITING;
     111  if (status == Job.Status.EXECUTING)
     112  {
     113    try
     114    {
     115      SignalTransporter signalTransporter = job.getSignalTransporter();
     116      Collection<Signal> supportedSignals = signalTransporter != null ?
     117        signalTransporter.getSupportedSignals() : null;
     118      supportsAbort = supportedSignals == null || supportedSignals.contains(Signal.ABORT);
     119    }
     120    catch (Exception ex)
     121    {}
     122  }
    105123  %>
    106124
     
    114132    {
    115133      setTimeout('location.reload()', 10000);
     134    }
     135  }
     136  function abortJob()
     137  {
     138    if (confirm('Are you sure? This action may not be undone'))
     139    {
     140      location.href = 'index.jsp?ID=<%=ID%>&cmd=AbortJob&item_id=<%=itemId%>';
    116141    }
    117142  }
     
    289314      </table>
    290315      </t:tab>
    291      
    292316      <%
    293317      if (job.getStackTrace() != null)
     
    474498      %>
    475499      <%
     500      if (supportsAbort)
     501      {
     502        %>
     503        <base:button onclick="abortJob()" title="Abort&hellip;" image="abort.png" />
     504        <%
     505      }
     506      %>
     507      <%
    476508      if (job.getStatus() == Job.Status.ERROR && job.getJobType() == Job.Type.RUN_PLUGIN)
    477509      {
Note: See TracChangeset for help on using the changeset viewer.