Changeset 3808
- Timestamp:
- Jul 5, 2019, 1:46:34 AM (4 years ago)
- Location:
- branches/0.16-stable
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/0.16-stable/test/Makefile.am
r3792 r3808 102 102 test/scheduler3.test \ 103 103 test/scheduler4.test \ 104 test/scheduler5.test \ 104 105 test/segment.test test/smart_ptr.test \ 105 106 test/smith_waterman.test \ -
branches/0.16-stable/yat/utility/Scheduler.cc
r3694 r3808 33 33 34 34 Scheduler::Scheduler(unsigned int threads) 35 : running_jobs_(0), 36 job_handler_(JobHandler(threads, queue_, jobs_, running_jobs_, error_)) 35 : data_(threads), job_handler_(JobHandler(data_)) 37 36 { 38 37 assert(threads); … … 56 55 // first signal to JobHandler that Scheduler is waiting 57 56 boost::shared_ptr<Job> end; 58 jobs_.push(end);57 data_.jobs().push(end); 59 58 60 59 job_handler_.interrupt(); … … 66 65 { 67 66 throw_if_error(); 68 return running_jobs_ + queue_.size();67 return data_.running_jobs().get() + data_.queue().size(); 69 68 } 70 69 … … 73 72 { 74 73 throw_if_error(); 75 jobs_.push(job);74 data_.jobs().push(job); 76 75 } 77 76 … … 80 79 { 81 80 boost::exception_ptr error; 82 if ( error_.try_pop(error))81 if (data_.error().try_pop(error)) 83 82 boost::rethrow_exception(error); 84 83 } … … 96 95 // first signal to JobHandler that Scheduler is waiting 97 96 boost::shared_ptr<Job> end; 98 jobs_.push(end);97 data_.jobs().push(end); 99 98 100 99 // wait for job handler to finish … … 197 196 198 197 198 // Scheduler::JobHandlerData 199 Scheduler::JobHandlerData::JobHandlerData(unsigned int threads) 200 : job_count_(0), running_jobs_(0), threads_(threads) 201 {} 202 203 204 Queue<boost::exception_ptr>& 205 Scheduler::JobHandlerData::error(void) const 206 { 207 return error_; 208 } 209 210 211 const Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void) const 212 { 213 return jobs_; 214 } 215 216 217 Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void) 218 { 219 return jobs_; 220 } 221 222 223 const Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void) const 224 { 225 return queue_; 226 } 227 228 229 Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void) 230 { 231 return queue_; 232 } 233 234 235 const Scheduler::JobHandlerData::Count& 236 Scheduler::JobHandlerData::job_count(void) const 237 { 238 return job_count_; 239 } 240 241 242 Scheduler::JobHandlerData::Count& 243 Scheduler::JobHandlerData::job_count(void) 244 { 245 return job_count_; 246 } 247 248 249 const Scheduler::JobHandlerData::Count& 250 Scheduler::JobHandlerData::running_jobs(void) const 251 { 252 return running_jobs_; 253 } 254 255 256 Scheduler::JobHandlerData::Count& 257 Scheduler::JobHandlerData::running_jobs(void) 258 { 259 return running_jobs_; 260 } 261 262 263 const Scheduler::JobHandlerData::Count& 264 Scheduler::JobHandlerData::threads(void) const 265 { 266 return threads_; 267 } 268 269 270 Scheduler::JobHandlerData::Count& 271 Scheduler::JobHandlerData::threads(void) 272 { 273 return threads_; 274 } 275 276 277 // Scheduler::JobHandlerData::Count 278 Scheduler::JobHandlerData::Count::Count(int x) 279 : x_(x) 280 { 281 } 282 283 284 void Scheduler::JobHandlerData::Count::decrement(void) 285 { 286 boost::unique_lock<boost::mutex> lock(mutex_); 287 --x_; 288 } 289 290 291 int Scheduler::JobHandlerData::Count::get(void) const 292 { 293 boost::unique_lock<boost::mutex> lock(mutex_); 294 return x_; 295 } 296 297 298 void Scheduler::JobHandlerData::Count::increment(void) 299 { 300 boost::unique_lock<boost::mutex> lock(mutex_); 301 ++x_; 302 } 303 304 305 void Scheduler::JobHandlerData::Count::set(int x) 306 { 307 boost::unique_lock<boost::mutex> lock(mutex_); 308 x_ = x; 309 } 310 311 199 312 // Scheduler::JobHandler 200 313 201 Scheduler::JobHandler::JobHandler(unsigned int threads, 202 JobQueue& queue, 203 Queue<JobPtr>& jobs, 204 running_jobs_type& running_jobs, 205 Queue<boost::exception_ptr>& error) 206 : threads_(threads), 207 queue_(queue), jobs_(jobs), running_jobs_(running_jobs), 208 error_(error), job_counter_(0) 209 { 210 } 314 Scheduler::JobHandler::JobHandler(Scheduler::JobHandlerData& data) 315 : data_(&data) 316 {} 317 211 318 212 319 213 320 void Scheduler::JobHandler::post_process(JobPtr job) 214 321 { 215 --running_jobs_; 216 assert(running_jobs_>=0); 322 assert(job); 323 assert(data_); 324 data_->running_jobs().decrement(); 325 assert(data_->running_jobs().get() >= 0); 217 326 job->status_ = Job::completed; 218 327 219 328 if (job->error_) { 220 error_.push(job->error_);329 data_->error().push(job->error_); 221 330 return; 222 331 } … … 244 353 assert(job->status_ == Job::pristine); 245 354 job->status_ = Job::prepared; 246 job->id_ = job_counter_;247 ++job_counter_;355 job->id_ = data_->job_count().get(); 356 data_->job_count().increment(); 248 357 249 358 // If job have prerequisite that need to be run first, process them … … 276 385 { 277 386 job->status_ = Job::running; 278 ++running_jobs_;279 assert( running_jobs_>0);280 queue_.push(job);387 data_->running_jobs().increment(); 388 assert(data_->running_jobs().get() > 0); 389 data_->queue().push(job); 281 390 } 282 391 … … 300 409 void Scheduler::JobHandler::operator()(void) 301 410 { 411 assert(data_); 302 412 boost::thread_group workers; 303 for (size_t i=0; i <threads_; ++i)304 workers.create_thread(Worker( queue_, jobs_));413 for (size_t i=0; i < data_->threads().get(); ++i) 414 workers.create_thread(Worker(data_->queue(), data_->jobs())); 305 415 // Process jobs (in jobs_) coming both from Scheduler and 306 416 // completed jobs from Workers until Scheduler is waiting … … 312 422 JobPtr job; 313 423 314 while (!scheduler_is_waiting || running_jobs_ || jobs_.size()) { 315 jobs_.pop(job); 424 while (!scheduler_is_waiting || data_->running_jobs().get() || 425 !data_->jobs().empty()) { 426 data_->jobs().pop(job); 316 427 if (job) 317 428 process(job); … … 323 434 // Since we are in a background thread, we cannot throw from 324 435 // here, instead interrupt workers and return early. 325 if (! error_.empty()) {436 if (!data_->error().empty()) { 326 437 // In case queue is empty, workers might be be stuck 327 438 // waiting for job queue to pop, then send them a poison pill so 328 439 // they stop. 329 440 boost::shared_ptr<Job> end; 330 queue_.push(end);441 data_->queue().push(end); 331 442 // For other cases (queue is not empty) send workers a 332 443 // interrupt signal and wait for them to wrap up. 333 444 workers.interrupt_all(); 334 running_jobs_ = 0;335 queue_.clear();445 data_->running_jobs().set(0); 446 data_->queue().clear(); 336 447 return; 337 448 } … … 343 454 // kill workers 344 455 boost::shared_ptr<Job> end; 345 queue_.push(end);456 data_->queue().push(end); 346 457 workers.join_all(); 347 458 } -
branches/0.16-stable/yat/utility/Scheduler.h
r3694 r3808 23 23 */ 24 24 25 #include "config_public.h"26 25 #include "PriorityQueue.h" 27 26 #include "Queue.h" … … 31 30 #include <boost/shared_ptr.hpp> 32 31 33 #ifdef YAT_HAVE_ATOMIC34 #include <atomic>35 #endif36 32 #include <set> 37 33 #include <deque> … … 87 83 class Scheduler 88 84 { 89 #ifdef YAT_HAVE_ATOMIC90 typedef std::atomic<int> running_jobs_type;91 #else92 typedef int running_jobs_type;93 #endif94 85 public: 95 86 /** … … 218 209 }; // end class Worker 219 210 211 212 class JobHandlerData 213 { 214 public: 215 /// thread-safe class around int 216 class Count 217 { 218 public: 219 /// Constructor 220 explicit Count(int x=0); 221 /// increase value with 1 222 void decrement(void); 223 /// return value 224 int get(void) const; 225 /// decrease value with 1 226 void increment(void); 227 /// modify value 228 void set(int x); 229 private: 230 mutable boost::mutex mutex_; 231 int x_; 232 }; 233 234 JobHandlerData(unsigned int threads); 235 Queue<boost::exception_ptr>& error(void) const; 236 237 const Queue<JobPtr>& jobs(void) const; 238 Queue<JobPtr>& jobs(void); 239 const JobQueue& queue(void) const; 240 JobQueue& queue(void); 241 242 const Count& job_count(void) const; 243 Count& job_count(void); 244 245 const Count& running_jobs(void) const; 246 Count& running_jobs(void); 247 248 const Count& threads(void) const; 249 Count& threads(void); 250 private: 251 mutable Queue<boost::exception_ptr> error_; 252 Queue<JobPtr> jobs_; 253 JobQueue queue_; 254 255 Count job_count_; 256 Count running_jobs_; 257 Count threads_; 258 }; 259 260 220 261 // \internal Class that handles job 221 262 class JobHandler 222 263 { 223 264 public: 224 JobHandler(unsigned int threads, JobQueue& queue, Queue<JobPtr>& jobs, 225 running_jobs_type& running_jobs, 226 Queue<boost::exception_ptr>& error); 265 JobHandler(JobHandlerData& data); 227 266 228 267 void operator()(void); … … 237 276 238 277 void send2queue(JobPtr& job); 239 unsigned int threads_; 240 JobQueue& queue_; 241 Queue<JobPtr>& jobs_; 242 running_jobs_type& running_jobs_; 243 Queue<boost::exception_ptr>& error_; 244 int job_counter_; 278 279 JobHandlerData* data_; 245 280 }; 246 281 … … 263 298 void throw_if_error(void) const; 264 299 265 JobQueue queue_; 266 Queue<JobPtr> jobs_; 267 running_jobs_type running_jobs_; 268 mutable Queue<boost::exception_ptr> error_; 300 JobHandlerData data_; 269 301 boost::thread job_handler_; 270 302 }; // end class Scheduler
Note: See TracChangeset
for help on using the changeset viewer.