Changeset 1304
- Timestamp:
- Mar 3, 2011, 9:19:00 AM (12 years ago)
- Location:
- extensions/net.sf.basedb.torrent/trunk
- Files:
-
- 1 added
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
extensions/net.sf.basedb.torrent/trunk/META-INF/extensions.xml
r1255 r1304 60 60 <factory-class>net.sf.basedb.clients.web.extensions.toolbar.FixedButtonFactory</factory-class> 61 61 <parameters> 62 <title>Upload torrent</title> 63 <onClick>Main.openPopup('$HOME$/upload_torrent.jsp?ID=' + getSessionId(), 'UploadTorrent', 600, 400)</onClick> 62 <title>Upload torrent&hellip;</title> 63 <onClick>Main.openPopup('$HOME$/upload_torrent.jsp?ID=' + getSessionId(), 'UploadTorrent', 640, 480)</onClick> 64 <icon>~/bittorrent.png</icon> 64 65 </parameters> 65 66 </action-factory> -
extensions/net.sf.basedb.torrent/trunk/resources/upload_torrent.jsp
r1255 r1304 70 70 <table class="form" cellspacing=0> 71 71 <tr> 72 <td class="prompt"> Directory</td>72 <td class="prompt">Save in</td> 73 73 <td><%=HTML.encodeTags(directory.getPath().toString())%></td> 74 74 </tr> -
extensions/net.sf.basedb.torrent/trunk/src/main/net/sf/basedb/clients/torrent/service/TorrentManager.java
r1299 r1304 31 31 import java.io.InputStream; 32 32 import java.io.OutputStream; 33 import java.io.PrintWriter; 34 import java.io.Writer; 33 35 import java.util.Arrays; 36 import java.util.Collection; 37 import java.util.Collections; 34 38 import java.util.List; 39 import java.util.Properties; 35 40 36 41 import org.slf4j.Logger; … … 40 45 import net.sf.basedb.core.Directory; 41 46 import net.sf.basedb.core.Job; 47 import net.sf.basedb.core.signal.Signal; 48 import net.sf.basedb.core.signal.SignalException; 49 import net.sf.basedb.core.signal.SignalHandler; 42 50 import net.sf.basedb.util.ChainedProgressReporter; 43 51 import net.sf.basedb.util.FileUtil; … … 56 64 */ 57 65 public class TorrentManager 66 implements SignalHandler 58 67 { 59 68 60 69 private static final Logger log = LoggerFactory.getLogger(TorrentManager.class); 61 70 71 private final Collection<Signal> supportedSignals; 62 72 private final TorrentService service; 63 73 private final File workDir; 64 74 private final String name; 65 75 66 private TorrentState state;76 private volatile TorrentState state; 67 77 private int jobId; 68 78 private Torrent torrent; 79 private volatile Thread workerThread; 80 private volatile Signal receivedSignal; 69 81 70 82 /** … … 78 90 this.name = name; 79 91 this.state = TorrentState.INITIAL; 80 } 81 92 this.supportedSignals = Collections.unmodifiableList(Arrays.asList(Signal.ABORT, Signal.SHUTDOWN)); 93 } 94 95 /** 96 Create a new torrent manager for an already existing job. The manager will be 97 set in READY_TO_DOWNLOAD state and the download should automatically be 98 resumed where it was aborted. If the download is already completed this will be 99 detected and the files will be uploaded to BASE. 100 */ 101 TorrentManager(TorrentService service, File workDir, String name, int jobId) 102 { 103 this(service, workDir, name); 104 this.jobId = jobId; 105 this.state = TorrentState.READY_TO_DOWNLOAD; 106 } 107 108 /* 109 From the SignalHandler interface 110 -------------------------------- 111 */ 112 @Override 113 public Collection<Signal> getSupportedSignals() 114 { 115 return supportedSignals; 116 } 117 118 @Override 119 public void handleSignal(Signal signal) 120 { 121 Thread worker = workerThread; 122 if (signal == Signal.ABORT) 123 { 124 if (worker != null && worker.isAlive()) 125 { 126 // Interrupt the worker thread and assume that it takes care 127 // of exception handling 128 receivedSignal = signal; 129 worker.interrupt(); 130 } 131 else 132 { 133 // Close, report an error to BASE and cleanup. 134 close(new SignalException("Aborted by user.")); 135 } 136 } 137 else if (signal == Signal.SHUTDOWN) 138 { 139 // We need to notify the worker thread 140 if (worker != null && worker.isAlive()) 141 { 142 receivedSignal = signal; 143 worker.interrupt(); 144 } 145 } 146 } 147 148 @Override 149 public boolean supports(Signal signal) 150 { 151 return supportedSignals.contains(signal); 152 } 153 // ------------------------------------ 82 154 /** 83 155 Get the current state of the torrent. … … 217 289 */ 218 290 public void queueDownload() 291 throws IOException 219 292 { 220 293 if (getState() != TorrentState.INITIAL) … … 235 308 } 236 309 310 writeProperties(); 311 237 312 log.debug("Torrent manager '" + getName() + "' is READY_TO_DOWNLOAD"); 238 313 this.state = TorrentState.READY_TO_DOWNLOAD; 239 314 } 240 315 316 private void writeProperties() 317 throws IOException 318 { 319 // Create job.properties file to store info about the related job 320 Writer writer = new PrintWriter(new File(getWorkDir(), "job.properties"), "UTF-8"); 321 try 322 { 323 Properties p = new Properties(); 324 p.setProperty("job.id", Integer.toString(jobId)); 325 p.setProperty("original-filename", getName()); 326 p.store(writer, null); 327 } 328 finally 329 { 330 if (writer != null) writer.close(); 331 } 332 } 333 334 241 335 /** 242 336 Called by the TorrentService when the download has been started. … … 269 363 downloading, nothing is done. 270 364 */ 271 public void reportDownloadProgress(DbControl dc)365 public synchronized void reportDownloadProgress(DbControl dc) 272 366 { 273 367 TorrentState state = getState(); … … 297 391 298 392 /** 299 Copy the downloaded files to BASE. 300 */ 301 public void copyToBase(DbControl dc) 393 Copy the downloaded files to BASE. This method is synchronized but may 394 be interrupted by calling {@link Thread#interrupt()} on the worker 395 thread. If the torrent hasn't been completely download nothing 396 is done. 397 */ 398 public synchronized void copyToBase(DbControl dc) 302 399 throws IOException 303 400 { … … 305 402 if (state != TorrentState.DOWNLOAD_COMPLETE) return; 306 403 307 this.state = TorrentState.UPLOADING; 308 File downloadDir = getDownloadDir(); 309 Job job = getJob(dc); 310 Directory uploadDir = (Directory)job.getParameterValue("directory"); 311 ChainedProgressReporter progress = new ChainedProgressReporter(job.getProgressReporter(null)); 312 progress.setRange(90, 100); 313 List files = FileUtil.uploadFiles(dc, uploadDir, downloadDir, null, true, progress); 314 dc.refreshItem(job); // Needed since the progress reporter has modified the state in the database 315 job.doneOk(files.size() + " file(s) uploaded successfully"); 316 } 317 318 public void copyToBaseAsync() 404 try 405 { 406 this.state = TorrentState.UPLOADING; 407 this.workerThread = Thread.currentThread(); 408 409 File downloadDir = getDownloadDir(); 410 Job job = getJob(dc); 411 Directory uploadDir = (Directory)job.getParameterValue("directory"); 412 ChainedProgressReporter progress = new ChainedProgressReporter(job.getProgressReporter(null)); 413 progress.setRange(90, 100); 414 List files = FileUtil.uploadFiles(dc, uploadDir, downloadDir, null, true, progress); 415 dc.refreshItem(job); // Needed since the progress reporter has modified the state in the database 416 job.doneOk(files.size() + " file(s) uploaded successfully"); 417 } 418 finally 419 { 420 this.workerThread = null; 421 } 422 } 423 424 /** 425 Copy the downloaded files to BASE asynchronoulsy. This method will start a 426 new thread that calls {@link #copyToBase(DbControl)} and then immediately 427 return. To abort the upload set the interrupt flag on the thread using 428 {@link Thread#interrupt()}, or send the {@link Signal#ABORT} to 429 {@link #handleSignal(Signal)}. 430 */ 431 public Thread copyToBaseAsync() 319 432 { 320 433 Thread t = new Thread(new Runnable() … … 332 445 catch (Exception ex) 333 446 { 334 close(ex); 447 // Is it an error or ABORT signal? 448 if (receivedSignal == null || receivedSignal == Signal.ABORT) 449 { 450 close(ex); 451 } 452 else if (receivedSignal == Signal.SHUTDOWN) 453 { 454 // Do nothing 455 } 335 456 } 336 457 finally … … 341 462 } 342 463 }); 343 344 464 t.start(); 465 return t; 345 466 } 346 467 347 468 /** 348 469 Close this torrent manager and remove all temporary working files. 349 This method can be called any time, but it is inten tded to be used470 This method can be called any time, but it is intended to be used 350 471 after a successful download and upload of the torrent files. If 351 472 the download is being aborted due to an error, use 352 473 {@link #close(Throwable)} instead. 353 474 */ 354 public void close()475 public synchronized void close() 355 476 { 356 477 log.debug("Closing torrent manager '" + getName() + "'"); 357 478 358 479 // Remove the torrent from the service 359 service.remove (this);480 service.removeTorrentManager(this); 360 481 361 482 // Remote working directory … … 370 491 working files will be deleted. 371 492 */ 372 public void close(Throwable t)493 public synchronized void close(Throwable t) 373 494 { 374 495 log.debug("Closing torrent manager due to an error '" + getName() + "'", t); … … 378 499 379 500 // Remove the torrent from the service 380 service.remove (this);501 service.removeTorrentManager(this); 381 502 382 503 // Remote working directory … … 420 541 } 421 542 422 423 543 } -
extensions/net.sf.basedb.torrent/trunk/src/main/net/sf/basedb/clients/torrent/service/TorrentService.java
r1299 r1304 29 29 30 30 import java.io.File; 31 import java.io.FileInputStream; 31 32 import java.io.IOException; 32 33 import java.io.InputStream; 34 import java.io.InputStreamReader; 33 35 import java.net.InetAddress; 34 36 import java.net.InetSocketAddress; … … 46 48 import net.sf.basedb.core.SessionControl; 47 49 import net.sf.basedb.core.Application.Pinger; 50 import net.sf.basedb.core.signal.LocalSignalReceiver; 48 51 import net.sf.basedb.util.FileUtil; 49 52 import net.sf.basedb.util.Values; … … 111 114 private SessionControl sc; 112 115 private Pinger pinger; 116 private LocalSignalReceiver signalReciever; 113 117 private volatile TimerTask serviceTask; 114 118 … … 189 193 this.btClient = tmpClient; 190 194 this.torrentManagers = Collections.synchronizedList(new ArrayList<TorrentManager>()); 195 this.signalReciever = new LocalSignalReceiver(); 196 this.signalReciever.init("torrent-service:0"); 197 198 loadExistingTorrents(); 199 191 200 this.isRunning = true; 192 201 log.info("Bittorrent Download Service has been started"); … … 204 213 isRunning = false; 205 214 215 if (signalReciever != null) signalReciever.close(1000); 216 signalReciever = null; 206 217 if (serviceTask != null) serviceTask.cancel(); 207 218 serviceTask = null; … … 218 229 } 219 230 231 private void loadExistingTorrents() 232 { 233 // Iterate all subdirs in workDir 234 for (File subDir : workDir.listFiles()) 235 { 236 if (!subDir.isDirectory()) continue; // with next file 237 238 File jobProperties = new File(subDir, "job.properties"); 239 if (!jobProperties.exists()) continue; // with next file 240 241 Properties p = new Properties(); 242 try 243 { 244 p.load(new InputStreamReader(new FileInputStream(jobProperties), "UTF-8")); 245 } 246 catch (Exception ex) 247 { 248 log.warn("Could not load " + jobProperties, ex); 249 } 250 int jobId = Values.getInt(p.getProperty("job.id")); 251 String name = p.getProperty("original-filename"); 252 if (jobId > 0 && name != null) 253 { 254 log.debug("Resuming download: " + name + "; jobId=" +jobId); 255 registerTorrentManager(new TorrentManager(this, subDir, name, jobId)); 256 } 257 } 258 } 259 220 260 /** 221 261 Safely close the session control. Catches all exceptions but log … … 268 308 if (!isRunning()) throw new IllegalStateException("The service has not been started."); 269 309 310 log.debug("Creating torrent manager for torrent: " + torrentName); 270 311 // Create a sub-dir for this torrent 271 312 String tmpDir = torrentName; … … 278 319 // Create a Torrent manager 279 320 TorrentManager manager = new TorrentManager(this, subDir, torrentName); 321 registerTorrentManager(manager); 322 log.debug("Created torrent manager for torrent: " + torrentName); 323 return manager; 324 } 325 326 /** 327 Register a new torrent manager 328 */ 329 private void registerTorrentManager(TorrentManager manager) 330 { 280 331 torrentManagers.add(manager); 281 282 332 // Create the service task if needed 283 333 if (serviceTask == null) … … 285 335 serviceTask = Application.getScheduler().schedule(new TorrentServiceStateCheckerTask(this), 10000, 10000, false); 286 336 } 287 return manager; 288 } 289 290 /** 291 Remove an existing torrent manager. If this is the last one, 292 the service task will be closed. 293 */ 294 synchronized void remove(TorrentManager manager) 295 { 337 } 338 339 /** 340 Remove an existing torrent manager. 341 */ 342 synchronized void removeTorrentManager(TorrentManager manager) 343 { 344 log.debug("Stopping torrent manager: " + manager.getName() + "; jobId=" + manager.getJobId()); 296 345 try 297 346 { 298 347 btClient.stopTorrent(manager.getTorrent()); 348 log.debug("Stopped torrent manager: " + manager.getName() + "; jobId=" + manager.getJobId()); 299 349 } 300 350 catch (Exception ex) … … 303 353 } 304 354 torrentManagers.remove(manager); 355 356 // Cancel the service task if there are no more torrents 305 357 if (torrentManagers.size() == 0) 306 358 { … … 334 386 void startDownload(TorrentManager manager) 335 387 { 336 388 log.debug("Starting download: " + manager.getName() + "; jobId=" + manager.getJobId()); 337 389 // Start the job + register signal receiver 338 390 DbControl dc = sc.newDbControl(); … … 341 393 // Load the job and register it as started 342 394 Job job = manager.getJob(dc); 343 //String handlerId = signalReciever.registerSignalHandler(manager); 344 //job.setSignalTransporter(signalReciever.getSignalTransporterClass(), handlerId); 345 job.start("Starting download...", "torrent-service@" + Application.getHostName(), null); 395 String handlerId = signalReciever.registerSignalHandler(manager); 396 job.setSignalTransporter(signalReciever.getSignalTransporterClass(), handlerId); 397 if (job.getStatus() != Job.Status.EXECUTING) 398 { 399 job.start("Starting download...", "torrent-service@" + Application.getHostName(), null); 400 } 346 401 dc.commit(); 347 402 } … … 361 416 // Set torrent manager state to DOWNLOADING 362 417 manager.setTorrent(torrent); 418 log.debug("Started download: " + manager.getName() + "; jobId=" + manager.getJobId()); 363 419 } 364 420 catch (Exception ex)
Note: See TracChangeset
for help on using the changeset viewer.