Changeset 3823 for trunk/yat/utility/Scheduler.cc
- Timestamp:
- Jul 16, 2019, 4:11:48 AM (4 years ago)
- Location:
- trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk
- Property svn:mergeinfo changed
/branches/0.16-stable (added) merged: 3795,3797-3799,3807-3808,3810,3817-3818,3820
- Property svn:mergeinfo changed
-
trunk/yat/utility/Scheduler.cc
r3694 r3823 2 2 3 3 /* 4 Copyright (C) 2014, 2015, 2017 Peter Johansson4 Copyright (C) 2014, 2015, 2017, 2019 Peter Johansson 5 5 6 6 This file is part of the yat library, http://dev.thep.lu.se/yat … … 33 33 34 34 Scheduler::Scheduler(unsigned int threads) 35 : running_jobs_(0), 36 job_handler_(JobHandler(threads, queue_, jobs_, running_jobs_, error_)) 35 : data_(threads), job_handler_(JobHandler(data_)) 37 36 { 38 37 assert(threads); … … 56 55 // first signal to JobHandler that Scheduler is waiting 57 56 boost::shared_ptr<Job> end; 58 jobs_.push(end);57 data_.jobs().push(end); 59 58 60 59 job_handler_.interrupt(); … … 66 65 { 67 66 throw_if_error(); 68 return running_jobs_ + queue_.size();67 return data_.running_jobs().get() + data_.queue().size(); 69 68 } 70 69 … … 73 72 { 74 73 throw_if_error(); 75 jobs_.push(job);74 data_.jobs().push(job); 76 75 } 77 76 … … 80 79 { 81 80 boost::exception_ptr error; 82 if ( error_.try_pop(error))81 if (data_.error().try_pop(error)) 83 82 boost::rethrow_exception(error); 84 83 } … … 96 95 // first signal to JobHandler that Scheduler is waiting 97 96 boost::shared_ptr<Job> end; 98 jobs_.push(end);97 data_.jobs().push(end); 99 98 100 99 // wait for job handler to finish … … 197 196 198 197 198 // Scheduler::JobHandlerData 199 Scheduler::JobHandlerData::JobHandlerData(unsigned int threads) 200 : job_count_(0), running_jobs_(0), threads_(threads) 201 {} 202 203 204 Queue<boost::exception_ptr>& 205 Scheduler::JobHandlerData::error(void) const 206 { 207 return error_; 208 } 209 210 211 const Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void) const 212 { 213 return jobs_; 214 } 215 216 217 Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void) 218 { 219 return jobs_; 220 } 221 222 223 const Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void) const 224 { 225 return queue_; 226 } 227 228 229 Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void) 230 { 231 return queue_; 232 } 233 234 235 const Scheduler::JobHandlerData::Count& 236 Scheduler::JobHandlerData::job_count(void) const 237 { 238 return job_count_; 239 } 240 241 242 Scheduler::JobHandlerData::Count& 243 Scheduler::JobHandlerData::job_count(void) 244 { 245 return job_count_; 246 } 247 248 249 const Scheduler::JobHandlerData::Count& 250 Scheduler::JobHandlerData::running_jobs(void) const 251 { 252 return running_jobs_; 253 } 254 255 256 Scheduler::JobHandlerData::Count& 257 Scheduler::JobHandlerData::running_jobs(void) 258 { 259 return running_jobs_; 260 } 261 262 263 const Scheduler::JobHandlerData::Count& 264 Scheduler::JobHandlerData::threads(void) const 265 { 266 return threads_; 267 } 268 269 270 Scheduler::JobHandlerData::Count& 271 Scheduler::JobHandlerData::threads(void) 272 { 273 return threads_; 274 } 275 276 277 // Scheduler::JobHandlerData::Count 278 Scheduler::JobHandlerData::Count::Count(int x) 279 : x_(x) 280 { 281 } 282 283 284 void Scheduler::JobHandlerData::Count::decrement(void) 285 { 286 boost::unique_lock<boost::mutex> lock(mutex_); 287 --x_; 288 } 289 290 291 int Scheduler::JobHandlerData::Count::get(void) const 292 { 293 boost::unique_lock<boost::mutex> lock(mutex_); 294 return x_; 295 } 296 297 298 void Scheduler::JobHandlerData::Count::increment(void) 299 { 300 boost::unique_lock<boost::mutex> lock(mutex_); 301 ++x_; 302 } 303 304 305 void Scheduler::JobHandlerData::Count::set(int x) 306 { 307 boost::unique_lock<boost::mutex> lock(mutex_); 308 x_ = x; 309 } 310 311 199 312 // Scheduler::JobHandler 200 313 201 Scheduler::JobHandler::JobHandler(unsigned int threads, 202 JobQueue& queue, 203 Queue<JobPtr>& jobs, 204 running_jobs_type& running_jobs, 205 Queue<boost::exception_ptr>& error) 206 : threads_(threads), 207 queue_(queue), jobs_(jobs), running_jobs_(running_jobs), 208 error_(error), job_counter_(0) 209 { 210 } 314 Scheduler::JobHandler::JobHandler(Scheduler::JobHandlerData& data) 315 : data_(&data) 316 {} 317 211 318 212 319 213 320 void Scheduler::JobHandler::post_process(JobPtr job) 214 321 { 215 --running_jobs_; 216 assert(running_jobs_>=0); 322 assert(job); 323 assert(data_); 324 data_->running_jobs().decrement(); 325 assert(data_->running_jobs().get() >= 0); 217 326 job->status_ = Job::completed; 218 327 219 328 if (job->error_) { 220 error_.push(job->error_);329 data_->error().push(job->error_); 221 330 return; 222 331 } … … 244 353 assert(job->status_ == Job::pristine); 245 354 job->status_ = Job::prepared; 246 job->id_ = job_counter_;247 ++job_counter_;355 job->id_ = data_->job_count().get(); 356 data_->job_count().increment(); 248 357 249 358 // If job have prerequisite that need to be run first, process them … … 276 385 { 277 386 job->status_ = Job::running; 278 ++running_jobs_;279 assert( running_jobs_>0);280 queue_.push(job);387 data_->running_jobs().increment(); 388 assert(data_->running_jobs().get() > 0); 389 data_->queue().push(job); 281 390 } 282 391 … … 300 409 void Scheduler::JobHandler::operator()(void) 301 410 { 411 assert(data_); 302 412 boost::thread_group workers; 303 for ( size_t i=0; i<threads_; ++i)304 workers.create_thread(Worker( queue_, jobs_));413 for (int i=0; i < data_->threads().get(); ++i) 414 workers.create_thread(Worker(data_->queue(), data_->jobs())); 305 415 // Process jobs (in jobs_) coming both from Scheduler and 306 416 // completed jobs from Workers until Scheduler is waiting … … 312 422 JobPtr job; 313 423 314 while (!scheduler_is_waiting || running_jobs_ || jobs_.size()) { 315 jobs_.pop(job); 424 while (!scheduler_is_waiting || data_->running_jobs().get() || 425 !data_->jobs().empty()) { 426 data_->jobs().pop(job); 316 427 if (job) 317 428 process(job); … … 323 434 // Since we are in a background thread, we cannot throw from 324 435 // here, instead interrupt workers and return early. 325 if (! error_.empty()) {436 if (!data_->error().empty()) { 326 437 // In case queue is empty, workers might be be stuck 327 438 // waiting for job queue to pop, then send them a poison pill so 328 439 // they stop. 329 440 boost::shared_ptr<Job> end; 330 queue_.push(end);441 data_->queue().push(end); 331 442 // For other cases (queue is not empty) send workers a 332 443 // interrupt signal and wait for them to wrap up. 333 444 workers.interrupt_all(); 334 running_jobs_ = 0;335 queue_.clear();445 data_->running_jobs().set(0); 446 data_->queue().clear(); 336 447 return; 337 448 } … … 343 454 // kill workers 344 455 boost::shared_ptr<Job> end; 345 queue_.push(end);456 data_->queue().push(end); 346 457 workers.join_all(); 347 458 }
Note: See TracChangeset
for help on using the changeset viewer.