Ignore:
Timestamp:
Jul 16, 2019, 4:11:48 AM (4 years ago)
Author:
Peter
Message:

Merge release 0.16 into trunk

Location:
trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk

  • trunk/yat/utility/Scheduler.cc

    r3694 r3823  
    22
    33/*
    4   Copyright (C) 2014, 2015, 2017 Peter Johansson
     4  Copyright (C) 2014, 2015, 2017, 2019 Peter Johansson
    55
    66  This file is part of the yat library, http://dev.thep.lu.se/yat
     
    3333
    3434  Scheduler::Scheduler(unsigned int threads)
    35     : running_jobs_(0),
    36       job_handler_(JobHandler(threads, queue_, jobs_, running_jobs_, error_))
     35    : data_(threads), job_handler_(JobHandler(data_))
    3736  {
    3837    assert(threads);
     
    5655    // first signal to JobHandler that Scheduler is waiting
    5756    boost::shared_ptr<Job> end;
    58     jobs_.push(end);
     57    data_.jobs().push(end);
    5958
    6059    job_handler_.interrupt();
     
    6665  {
    6766    throw_if_error();
    68     return running_jobs_ + queue_.size();
     67    return data_.running_jobs().get() + data_.queue().size();
    6968  }
    7069
     
    7372  {
    7473    throw_if_error();
    75     jobs_.push(job);
     74    data_.jobs().push(job);
    7675  }
    7776
     
    8079  {
    8180    boost::exception_ptr error;
    82     if (error_.try_pop(error))
     81    if (data_.error().try_pop(error))
    8382      boost::rethrow_exception(error);
    8483  }
     
    9695    // first signal to JobHandler that Scheduler is waiting
    9796    boost::shared_ptr<Job> end;
    98     jobs_.push(end);
     97    data_.jobs().push(end);
    9998
    10099    // wait for job handler to finish
     
    197196
    198197
     198  // Scheduler::JobHandlerData
     199  Scheduler::JobHandlerData::JobHandlerData(unsigned int threads)
     200    : job_count_(0), running_jobs_(0), threads_(threads)
     201  {}
     202
     203
     204  Queue<boost::exception_ptr>&
     205  Scheduler::JobHandlerData::error(void) const
     206  {
     207    return error_;
     208  }
     209
     210
     211  const Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void) const
     212  {
     213    return jobs_;
     214  }
     215
     216
     217  Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void)
     218  {
     219    return jobs_;
     220  }
     221
     222
     223  const Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void) const
     224  {
     225    return queue_;
     226  }
     227
     228
     229  Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void)
     230  {
     231    return queue_;
     232  }
     233
     234
     235  const Scheduler::JobHandlerData::Count&
     236  Scheduler::JobHandlerData::job_count(void) const
     237  {
     238    return job_count_;
     239  }
     240
     241
     242  Scheduler::JobHandlerData::Count&
     243  Scheduler::JobHandlerData::job_count(void)
     244  {
     245    return job_count_;
     246  }
     247
     248
     249  const Scheduler::JobHandlerData::Count&
     250  Scheduler::JobHandlerData::running_jobs(void) const
     251  {
     252    return running_jobs_;
     253  }
     254
     255
     256  Scheduler::JobHandlerData::Count&
     257  Scheduler::JobHandlerData::running_jobs(void)
     258  {
     259    return running_jobs_;
     260  }
     261
     262
     263  const Scheduler::JobHandlerData::Count&
     264  Scheduler::JobHandlerData::threads(void) const
     265  {
     266    return threads_;
     267  }
     268
     269
     270  Scheduler::JobHandlerData::Count&
     271  Scheduler::JobHandlerData::threads(void)
     272  {
     273    return threads_;
     274  }
     275
     276
     277  // Scheduler::JobHandlerData::Count
     278  Scheduler::JobHandlerData::Count::Count(int x)
     279    : x_(x)
     280  {
     281  }
     282
     283
     284  void Scheduler::JobHandlerData::Count::decrement(void)
     285  {
     286    boost::unique_lock<boost::mutex> lock(mutex_);
     287    --x_;
     288  }
     289
     290
     291  int Scheduler::JobHandlerData::Count::get(void) const
     292  {
     293    boost::unique_lock<boost::mutex> lock(mutex_);
     294    return x_;
     295  }
     296
     297
     298  void Scheduler::JobHandlerData::Count::increment(void)
     299  {
     300    boost::unique_lock<boost::mutex> lock(mutex_);
     301    ++x_;
     302  }
     303
     304
     305  void Scheduler::JobHandlerData::Count::set(int x)
     306  {
     307    boost::unique_lock<boost::mutex> lock(mutex_);
     308    x_ = x;
     309  }
     310
     311
    199312  // Scheduler::JobHandler
    200313
    201   Scheduler::JobHandler::JobHandler(unsigned int threads,
    202                                     JobQueue& queue,
    203                                     Queue<JobPtr>& jobs,
    204                                     running_jobs_type& running_jobs,
    205                                     Queue<boost::exception_ptr>& error)
    206     : threads_(threads),
    207       queue_(queue), jobs_(jobs), running_jobs_(running_jobs),
    208       error_(error), job_counter_(0)
    209   {
    210   }
     314  Scheduler::JobHandler::JobHandler(Scheduler::JobHandlerData& data)
     315    : data_(&data)
     316  {}
     317
    211318
    212319
    213320  void Scheduler::JobHandler::post_process(JobPtr job)
    214321  {
    215     --running_jobs_;
    216     assert(running_jobs_>=0);
     322    assert(job);
     323    assert(data_);
     324    data_->running_jobs().decrement();
     325    assert(data_->running_jobs().get() >= 0);
    217326    job->status_ = Job::completed;
    218327
    219328    if (job->error_) {
    220       error_.push(job->error_);
     329      data_->error().push(job->error_);
    221330      return;
    222331    }
     
    244353    assert(job->status_ == Job::pristine);
    245354    job->status_ = Job::prepared;
    246     job->id_ = job_counter_;
    247     ++job_counter_;
     355    job->id_ = data_->job_count().get();
     356    data_->job_count().increment();
    248357
    249358    // If job have prerequisite that need to be run first, process them
     
    276385  {
    277386    job->status_ = Job::running;
    278     ++running_jobs_;
    279     assert(running_jobs_>0);
    280     queue_.push(job);
     387    data_->running_jobs().increment();
     388    assert(data_->running_jobs().get() > 0);
     389    data_->queue().push(job);
    281390  }
    282391
     
    300409  void Scheduler::JobHandler::operator()(void)
    301410  {
     411    assert(data_);
    302412    boost::thread_group workers;
    303     for (size_t i=0; i<threads_; ++i)
    304       workers.create_thread(Worker(queue_, jobs_));
     413    for (int i=0; i < data_->threads().get(); ++i)
     414      workers.create_thread(Worker(data_->queue(), data_->jobs()));
    305415    // Process jobs (in jobs_) coming both from Scheduler and
    306416    // completed jobs from Workers until Scheduler is waiting
     
    312422    JobPtr job;
    313423
    314     while (!scheduler_is_waiting || running_jobs_ || jobs_.size()) {
    315       jobs_.pop(job);
     424    while (!scheduler_is_waiting || data_->running_jobs().get() ||
     425           !data_->jobs().empty()) {
     426      data_->jobs().pop(job);
    316427      if (job)
    317428        process(job);
     
    323434      // Since we are in a background thread, we cannot throw from
    324435      // here, instead interrupt workers and return early.
    325       if (!error_.empty()) {
     436      if (!data_->error().empty()) {
    326437        // In case queue is empty, workers might be be stuck
    327438        // waiting for job queue to pop, then send them a poison pill so
    328439        // they stop.
    329440        boost::shared_ptr<Job> end;
    330         queue_.push(end);
     441        data_->queue().push(end);
    331442        // For other cases (queue is not empty) send workers a
    332443        // interrupt signal and wait for them to wrap up.
    333444        workers.interrupt_all();
    334         running_jobs_ = 0;
    335         queue_.clear();
     445        data_->running_jobs().set(0);
     446        data_->queue().clear();
    336447        return;
    337448      }
     
    343454    // kill workers
    344455    boost::shared_ptr<Job> end;
    345     queue_.push(end);
     456    data_->queue().push(end);
    346457    workers.join_all();
    347458  }
Note: See TracChangeset for help on using the changeset viewer.