Changeset 1304


Ignore:
Timestamp:
Mar 3, 2011, 9:19:00 AM (11 years ago)
Author:
Nicklas Nordborg
Message:

Added support for aborting a torrent upload. Added support for resuming uploads in case of a server restart.

Location:
extensions/net.sf.basedb.torrent/trunk
Files:
1 added
4 edited

Legend:

Unmodified
Added
Removed
  • extensions/net.sf.basedb.torrent/trunk/META-INF/extensions.xml

    r1255 r1304  
    6060      <factory-class>net.sf.basedb.clients.web.extensions.toolbar.FixedButtonFactory</factory-class>
    6161      <parameters>
    62         <title>Upload torrent</title>
    63         <onClick>Main.openPopup('$HOME$/upload_torrent.jsp?ID=' + getSessionId(), 'UploadTorrent', 600, 400)</onClick>
     62        <title>Upload torrent&amp;hellip;</title>
     63        <onClick>Main.openPopup('$HOME$/upload_torrent.jsp?ID=' + getSessionId(), 'UploadTorrent', 640, 480)</onClick>
     64        <icon>~/bittorrent.png</icon>
    6465      </parameters>
    6566    </action-factory>
  • extensions/net.sf.basedb.torrent/trunk/resources/upload_torrent.jsp

    r1255 r1304  
    7070      <table class="form" cellspacing=0>
    7171      <tr>
    72         <td class="prompt">Directory</td>
     72        <td class="prompt">Save in</td>
    7373        <td><%=HTML.encodeTags(directory.getPath().toString())%></td>
    7474      </tr>
  • extensions/net.sf.basedb.torrent/trunk/src/main/net/sf/basedb/clients/torrent/service/TorrentManager.java

    r1299 r1304  
    3131import java.io.InputStream;
    3232import java.io.OutputStream;
     33import java.io.PrintWriter;
     34import java.io.Writer;
    3335import java.util.Arrays;
     36import java.util.Collection;
     37import java.util.Collections;
    3438import java.util.List;
     39import java.util.Properties;
    3540
    3641import org.slf4j.Logger;
     
    4045import net.sf.basedb.core.Directory;
    4146import net.sf.basedb.core.Job;
     47import net.sf.basedb.core.signal.Signal;
     48import net.sf.basedb.core.signal.SignalException;
     49import net.sf.basedb.core.signal.SignalHandler;
    4250import net.sf.basedb.util.ChainedProgressReporter;
    4351import net.sf.basedb.util.FileUtil;
     
    5664*/
    5765public class TorrentManager
     66  implements SignalHandler
    5867{
    5968 
    6069  private static final Logger log = LoggerFactory.getLogger(TorrentManager.class);
    6170 
     71  private final Collection<Signal> supportedSignals;
    6272  private final TorrentService service;
    6373  private final File workDir;
    6474  private final String name;
    6575 
    66   private TorrentState state;
     76  private volatile TorrentState state;
    6777  private int jobId;
    6878  private Torrent torrent;
     79  private volatile Thread workerThread;
     80  private volatile Signal receivedSignal;
    6981 
    7082  /**
     
    7890    this.name = name;
    7991    this.state = TorrentState.INITIAL;
    80   }
    81 
     92    this.supportedSignals = Collections.unmodifiableList(Arrays.asList(Signal.ABORT, Signal.SHUTDOWN));
     93  }
     94
     95  /**
     96    Create a new torrent manager for an already existing job. The manager will be
     97    set in READY_TO_DOWNLOAD state and the download should automatically be
     98    resumed where it was aborted. If the download is already completed this will be
     99    detected and the files will be uploaded to BASE.
     100  */
     101  TorrentManager(TorrentService service, File workDir, String name, int jobId)
     102  {
     103    this(service, workDir, name);
     104    this.jobId = jobId;
     105    this.state = TorrentState.READY_TO_DOWNLOAD;
     106  }
     107 
     108  /*
     109    From the SignalHandler interface
     110    --------------------------------
     111  */
     112  @Override
     113  public Collection<Signal> getSupportedSignals()
     114  {
     115    return supportedSignals;
     116  }
     117
     118  @Override
     119  public void handleSignal(Signal signal)
     120  {
     121    Thread worker = workerThread;
     122    if (signal == Signal.ABORT)
     123    {
     124      if (worker != null && worker.isAlive())
     125      {
     126        // Interrupt the worker thread and assume that it takes care
     127        // of exception handling
     128        receivedSignal = signal;
     129        worker.interrupt();
     130      }
     131      else
     132      {
     133        // Close, report an error to BASE and cleanup.
     134        close(new SignalException("Aborted by user."));
     135      }
     136    }
     137    else if (signal == Signal.SHUTDOWN)
     138    {
     139      // We need to notify the worker thread
     140      if (worker != null && worker.isAlive())
     141      {
     142        receivedSignal = signal;
     143        worker.interrupt();
     144      }
     145    }
     146  }
     147
     148  @Override
     149  public boolean supports(Signal signal)
     150  {
     151    return supportedSignals.contains(signal);
     152  }
     153  // ------------------------------------
    82154  /**
    83155    Get the current state of the torrent.
     
    217289  */
    218290  public void queueDownload()
     291    throws IOException
    219292  {
    220293    if (getState() != TorrentState.INITIAL)
     
    235308    }
    236309 
     310    writeProperties();
     311   
    237312    log.debug("Torrent manager '" + getName() + "' is READY_TO_DOWNLOAD");
    238313    this.state = TorrentState.READY_TO_DOWNLOAD;
    239314  }
    240315
     316  private void writeProperties()
     317    throws IOException
     318  {
     319    // Create job.properties file to store info about the related job
     320    Writer writer = new PrintWriter(new File(getWorkDir(), "job.properties"), "UTF-8");
     321    try
     322    {
     323      Properties p = new Properties();
     324      p.setProperty("job.id", Integer.toString(jobId));
     325      p.setProperty("original-filename", getName());
     326      p.store(writer, null);
     327    }
     328    finally
     329    {
     330      if (writer != null) writer.close();
     331    }
     332  }
     333
     334 
    241335  /**
    242336    Called by the TorrentService when the download has been started.
     
    269363    downloading, nothing is done.
    270364  */
    271   public void reportDownloadProgress(DbControl dc)
     365  public synchronized void reportDownloadProgress(DbControl dc)
    272366  {
    273367    TorrentState state = getState();
     
    297391 
    298392  /**
    299     Copy the downloaded files to BASE.
    300   */
    301   public void copyToBase(DbControl dc)
     393    Copy the downloaded files to BASE. This method is synchronized but may
     394    be interrupted by calling {@link Thread#interrupt()} on the worker
     395    thread. If the torrent hasn't been completely download nothing
     396    is done.
     397  */
     398  public synchronized void copyToBase(DbControl dc)
    302399    throws IOException
    303400  {
     
    305402    if (state != TorrentState.DOWNLOAD_COMPLETE) return;
    306403   
    307     this.state = TorrentState.UPLOADING;
    308     File downloadDir = getDownloadDir();
    309     Job job = getJob(dc);
    310     Directory uploadDir = (Directory)job.getParameterValue("directory");
    311     ChainedProgressReporter progress = new ChainedProgressReporter(job.getProgressReporter(null));
    312     progress.setRange(90, 100);
    313     List files = FileUtil.uploadFiles(dc, uploadDir, downloadDir, null, true, progress);
    314     dc.refreshItem(job); // Needed since the progress reporter has modified the state in the database
    315     job.doneOk(files.size() + " file(s) uploaded successfully");
    316   }
    317  
    318   public void copyToBaseAsync()
     404    try
     405    {
     406      this.state = TorrentState.UPLOADING;
     407      this.workerThread = Thread.currentThread();
     408
     409      File downloadDir = getDownloadDir();
     410      Job job = getJob(dc);
     411      Directory uploadDir = (Directory)job.getParameterValue("directory");
     412      ChainedProgressReporter progress = new ChainedProgressReporter(job.getProgressReporter(null));
     413      progress.setRange(90, 100);
     414      List files = FileUtil.uploadFiles(dc, uploadDir, downloadDir, null, true, progress);
     415      dc.refreshItem(job); // Needed since the progress reporter has modified the state in the database
     416      job.doneOk(files.size() + " file(s) uploaded successfully");
     417    }
     418    finally
     419    {
     420      this.workerThread = null;
     421    }
     422  }
     423 
     424  /**
     425    Copy the downloaded files to BASE asynchronoulsy. This method will start a
     426    new thread that calls {@link #copyToBase(DbControl)} and then immediately
     427    return. To abort the upload set the interrupt flag on the thread using
     428    {@link Thread#interrupt()}, or send the {@link Signal#ABORT} to
     429    {@link #handleSignal(Signal)}.
     430  */
     431  public Thread copyToBaseAsync()
    319432  {
    320433    Thread t = new Thread(new Runnable()
     
    332445        catch (Exception ex)
    333446        {
    334           close(ex);
     447          // Is it an error or ABORT signal?
     448          if (receivedSignal == null || receivedSignal == Signal.ABORT)
     449          {
     450            close(ex);
     451          }
     452          else if (receivedSignal == Signal.SHUTDOWN)
     453          {
     454            // Do nothing
     455          }
    335456        }
    336457        finally
     
    341462      }
    342463    });
    343    
    344464    t.start();
     465    return t;
    345466  }
    346467 
    347468  /**
    348469    Close this torrent manager and remove all temporary working files.
    349     This method can be called any time, but it is intentded to be used
     470    This method can be called any time, but it is intended to be used
    350471    after a successful download and upload of the torrent files. If
    351472    the download is being aborted due to an error, use
    352473    {@link #close(Throwable)} instead.
    353474  */
    354   public void close()
     475  public synchronized void close()
    355476  {
    356477    log.debug("Closing torrent manager '" + getName() + "'");
    357478   
    358479    // Remove the torrent from the service
    359     service.remove(this);
     480    service.removeTorrentManager(this);
    360481   
    361482    // Remote working directory
     
    370491    working files will be deleted.
    371492  */
    372   public void close(Throwable t)
     493  public synchronized void close(Throwable t)
    373494  {
    374495    log.debug("Closing torrent manager due to an error '" + getName() + "'", t);
     
    378499 
    379500    // Remove the torrent from the service
    380     service.remove(this);
     501    service.removeTorrentManager(this);
    381502   
    382503    // Remote working directory
     
    420541  }
    421542
    422  
    423543}
  • extensions/net.sf.basedb.torrent/trunk/src/main/net/sf/basedb/clients/torrent/service/TorrentService.java

    r1299 r1304  
    2929
    3030import java.io.File;
     31import java.io.FileInputStream;
    3132import java.io.IOException;
    3233import java.io.InputStream;
     34import java.io.InputStreamReader;
    3335import java.net.InetAddress;
    3436import java.net.InetSocketAddress;
     
    4648import net.sf.basedb.core.SessionControl;
    4749import net.sf.basedb.core.Application.Pinger;
     50import net.sf.basedb.core.signal.LocalSignalReceiver;
    4851import net.sf.basedb.util.FileUtil;
    4952import net.sf.basedb.util.Values;
     
    111114  private SessionControl sc;
    112115  private Pinger pinger;
     116  private LocalSignalReceiver signalReciever;
    113117  private volatile TimerTask serviceTask;
    114118 
     
    189193      this.btClient = tmpClient;
    190194    this.torrentManagers = Collections.synchronizedList(new ArrayList<TorrentManager>());
     195    this.signalReciever = new LocalSignalReceiver();
     196    this.signalReciever.init("torrent-service:0");
     197   
     198    loadExistingTorrents();
     199         
    191200    this.isRunning = true;
    192201    log.info("Bittorrent Download Service has been started");
     
    204213    isRunning = false;
    205214
     215    if (signalReciever != null) signalReciever.close(1000);
     216    signalReciever = null;
    206217    if (serviceTask != null) serviceTask.cancel();
    207218    serviceTask = null;
     
    218229  }
    219230
     231  private void loadExistingTorrents()
     232  {
     233    // Iterate all subdirs in workDir
     234    for (File subDir : workDir.listFiles())
     235    {
     236      if (!subDir.isDirectory()) continue; // with next file
     237     
     238      File jobProperties = new File(subDir, "job.properties");
     239      if (!jobProperties.exists()) continue; // with next file
     240     
     241      Properties p = new Properties();
     242      try
     243      {
     244        p.load(new InputStreamReader(new FileInputStream(jobProperties), "UTF-8"));
     245      }
     246      catch (Exception ex)
     247      {
     248        log.warn("Could not load " + jobProperties, ex);
     249      }
     250      int jobId = Values.getInt(p.getProperty("job.id"));
     251      String name = p.getProperty("original-filename");
     252      if (jobId > 0 && name != null)
     253      {
     254        log.debug("Resuming download: " + name + "; jobId=" +jobId);
     255        registerTorrentManager(new TorrentManager(this, subDir, name, jobId));
     256      }
     257    }
     258  }
     259 
    220260  /**
    221261    Safely close the session control. Catches all exceptions but log
     
    268308    if (!isRunning()) throw new IllegalStateException("The service has not been started.");
    269309   
     310    log.debug("Creating torrent manager for torrent: " + torrentName);
    270311    // Create a sub-dir for this torrent
    271312    String tmpDir = torrentName;
     
    278319    // Create a Torrent manager
    279320    TorrentManager manager = new TorrentManager(this, subDir, torrentName);
     321    registerTorrentManager(manager);
     322    log.debug("Created torrent manager for torrent: " + torrentName);
     323    return manager;
     324  }
     325
     326  /**
     327    Register a new torrent manager
     328  */
     329  private void registerTorrentManager(TorrentManager manager)
     330  {
    280331    torrentManagers.add(manager);
    281    
    282332    // Create the service task if needed
    283333      if (serviceTask == null)
     
    285335          serviceTask = Application.getScheduler().schedule(new TorrentServiceStateCheckerTask(this), 10000, 10000, false);
    286336      }
    287     return manager;
    288   }
    289 
    290   /**
    291     Remove an existing torrent manager. If this is the last one,
    292     the service task will be closed.
    293   */
    294   synchronized void remove(TorrentManager manager)
    295   {
     337  }
     338 
     339  /**
     340    Remove an existing torrent manager.
     341  */
     342  synchronized void removeTorrentManager(TorrentManager manager)
     343  {
     344    log.debug("Stopping torrent manager: " + manager.getName() + "; jobId=" + manager.getJobId());
    296345    try
    297346    {
    298347      btClient.stopTorrent(manager.getTorrent());
     348      log.debug("Stopped torrent manager: " + manager.getName() + "; jobId=" + manager.getJobId());
    299349    }
    300350    catch (Exception ex)
     
    303353    }
    304354    torrentManagers.remove(manager);
     355   
     356    // Cancel the service task if there are no more torrents
    305357    if (torrentManagers.size() == 0)
    306358    {
     
    334386  void startDownload(TorrentManager manager)
    335387  {
    336 
     388    log.debug("Starting download: " + manager.getName() + "; jobId=" + manager.getJobId());
    337389    // Start the job + register signal receiver
    338390    DbControl dc = sc.newDbControl();
     
    341393      // Load the job and register it as started
    342394      Job job = manager.getJob(dc);
    343       //String handlerId = signalReciever.registerSignalHandler(manager);
    344       //job.setSignalTransporter(signalReciever.getSignalTransporterClass(), handlerId);
    345       job.start("Starting download...", "torrent-service@" + Application.getHostName(), null);
     395      String handlerId = signalReciever.registerSignalHandler(manager);
     396      job.setSignalTransporter(signalReciever.getSignalTransporterClass(), handlerId);
     397      if (job.getStatus() != Job.Status.EXECUTING)
     398      {
     399        job.start("Starting download...", "torrent-service@" + Application.getHostName(), null);
     400      }
    346401      dc.commit();
    347402    }
     
    361416      // Set torrent manager state to DOWNLOADING
    362417      manager.setTorrent(torrent);
     418      log.debug("Started download: " + manager.getName() + "; jobId=" + manager.getJobId());
    363419    }
    364420    catch (Exception ex)
Note: See TracChangeset for help on using the changeset viewer.