Changeset 3808


Ignore:
Timestamp:
Jul 5, 2019, 1:46:34 AM (4 years ago)
Author:
Peter
Message:

Move the data member of the JobHandler? into its own data class,
JobHandlerData?, which is thread safe. This class is now owned by the
Scheduler and the JobHandler? has access via a pointer. This fixes #924
as the class is thread safe and since the sharing between Scheduler
and the JobHandler? class is now more complete also allow future
additions of the interface such as mentions in tickets:

refs #899, #910, #914 and #916.

Location:
branches/0.16-stable
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • branches/0.16-stable/test/Makefile.am

    r3792 r3808  
    102102  test/scheduler3.test \
    103103  test/scheduler4.test \
     104  test/scheduler5.test \
    104105  test/segment.test test/smart_ptr.test \
    105106  test/smith_waterman.test \
  • branches/0.16-stable/yat/utility/Scheduler.cc

    r3694 r3808  
    3333
    3434  Scheduler::Scheduler(unsigned int threads)
    35     : running_jobs_(0),
    36       job_handler_(JobHandler(threads, queue_, jobs_, running_jobs_, error_))
     35    : data_(threads), job_handler_(JobHandler(data_))
    3736  {
    3837    assert(threads);
     
    5655    // first signal to JobHandler that Scheduler is waiting
    5756    boost::shared_ptr<Job> end;
    58     jobs_.push(end);
     57    data_.jobs().push(end);
    5958
    6059    job_handler_.interrupt();
     
    6665  {
    6766    throw_if_error();
    68     return running_jobs_ + queue_.size();
     67    return data_.running_jobs().get() + data_.queue().size();
    6968  }
    7069
     
    7372  {
    7473    throw_if_error();
    75     jobs_.push(job);
     74    data_.jobs().push(job);
    7675  }
    7776
     
    8079  {
    8180    boost::exception_ptr error;
    82     if (error_.try_pop(error))
     81    if (data_.error().try_pop(error))
    8382      boost::rethrow_exception(error);
    8483  }
     
    9695    // first signal to JobHandler that Scheduler is waiting
    9796    boost::shared_ptr<Job> end;
    98     jobs_.push(end);
     97    data_.jobs().push(end);
    9998
    10099    // wait for job handler to finish
     
    197196
    198197
     198  // Scheduler::JobHandlerData
     199  Scheduler::JobHandlerData::JobHandlerData(unsigned int threads)
     200    : job_count_(0), running_jobs_(0), threads_(threads)
     201  {}
     202
     203
     204  Queue<boost::exception_ptr>&
     205  Scheduler::JobHandlerData::error(void) const
     206  {
     207    return error_;
     208  }
     209
     210
     211  const Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void) const
     212  {
     213    return jobs_;
     214  }
     215
     216
     217  Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void)
     218  {
     219    return jobs_;
     220  }
     221
     222
     223  const Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void) const
     224  {
     225    return queue_;
     226  }
     227
     228
     229  Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void)
     230  {
     231    return queue_;
     232  }
     233
     234
     235  const Scheduler::JobHandlerData::Count&
     236  Scheduler::JobHandlerData::job_count(void) const
     237  {
     238    return job_count_;
     239  }
     240
     241
     242  Scheduler::JobHandlerData::Count&
     243  Scheduler::JobHandlerData::job_count(void)
     244  {
     245    return job_count_;
     246  }
     247
     248
     249  const Scheduler::JobHandlerData::Count&
     250  Scheduler::JobHandlerData::running_jobs(void) const
     251  {
     252    return running_jobs_;
     253  }
     254
     255
     256  Scheduler::JobHandlerData::Count&
     257  Scheduler::JobHandlerData::running_jobs(void)
     258  {
     259    return running_jobs_;
     260  }
     261
     262
     263  const Scheduler::JobHandlerData::Count&
     264  Scheduler::JobHandlerData::threads(void) const
     265  {
     266    return threads_;
     267  }
     268
     269
     270  Scheduler::JobHandlerData::Count&
     271  Scheduler::JobHandlerData::threads(void)
     272  {
     273    return threads_;
     274  }
     275
     276
     277  // Scheduler::JobHandlerData::Count
     278  Scheduler::JobHandlerData::Count::Count(int x)
     279    : x_(x)
     280  {
     281  }
     282
     283
     284  void Scheduler::JobHandlerData::Count::decrement(void)
     285  {
     286    boost::unique_lock<boost::mutex> lock(mutex_);
     287    --x_;
     288  }
     289
     290
     291  int Scheduler::JobHandlerData::Count::get(void) const
     292  {
     293    boost::unique_lock<boost::mutex> lock(mutex_);
     294    return x_;
     295  }
     296
     297
     298  void Scheduler::JobHandlerData::Count::increment(void)
     299  {
     300    boost::unique_lock<boost::mutex> lock(mutex_);
     301    ++x_;
     302  }
     303
     304
     305  void Scheduler::JobHandlerData::Count::set(int x)
     306  {
     307    boost::unique_lock<boost::mutex> lock(mutex_);
     308    x_ = x;
     309  }
     310
     311
    199312  // Scheduler::JobHandler
    200313
    201   Scheduler::JobHandler::JobHandler(unsigned int threads,
    202                                     JobQueue& queue,
    203                                     Queue<JobPtr>& jobs,
    204                                     running_jobs_type& running_jobs,
    205                                     Queue<boost::exception_ptr>& error)
    206     : threads_(threads),
    207       queue_(queue), jobs_(jobs), running_jobs_(running_jobs),
    208       error_(error), job_counter_(0)
    209   {
    210   }
     314  Scheduler::JobHandler::JobHandler(Scheduler::JobHandlerData& data)
     315    : data_(&data)
     316  {}
     317
    211318
    212319
    213320  void Scheduler::JobHandler::post_process(JobPtr job)
    214321  {
    215     --running_jobs_;
    216     assert(running_jobs_>=0);
     322    assert(job);
     323    assert(data_);
     324    data_->running_jobs().decrement();
     325    assert(data_->running_jobs().get() >= 0);
    217326    job->status_ = Job::completed;
    218327
    219328    if (job->error_) {
    220       error_.push(job->error_);
     329      data_->error().push(job->error_);
    221330      return;
    222331    }
     
    244353    assert(job->status_ == Job::pristine);
    245354    job->status_ = Job::prepared;
    246     job->id_ = job_counter_;
    247     ++job_counter_;
     355    job->id_ = data_->job_count().get();
     356    data_->job_count().increment();
    248357
    249358    // If job have prerequisite that need to be run first, process them
     
    276385  {
    277386    job->status_ = Job::running;
    278     ++running_jobs_;
    279     assert(running_jobs_>0);
    280     queue_.push(job);
     387    data_->running_jobs().increment();
     388    assert(data_->running_jobs().get() > 0);
     389    data_->queue().push(job);
    281390  }
    282391
     
    300409  void Scheduler::JobHandler::operator()(void)
    301410  {
     411    assert(data_);
    302412    boost::thread_group workers;
    303     for (size_t i=0; i<threads_; ++i)
    304       workers.create_thread(Worker(queue_, jobs_));
     413    for (size_t i=0; i < data_->threads().get(); ++i)
     414      workers.create_thread(Worker(data_->queue(), data_->jobs()));
    305415    // Process jobs (in jobs_) coming both from Scheduler and
    306416    // completed jobs from Workers until Scheduler is waiting
     
    312422    JobPtr job;
    313423
    314     while (!scheduler_is_waiting || running_jobs_ || jobs_.size()) {
    315       jobs_.pop(job);
     424    while (!scheduler_is_waiting || data_->running_jobs().get() ||
     425           !data_->jobs().empty()) {
     426      data_->jobs().pop(job);
    316427      if (job)
    317428        process(job);
     
    323434      // Since we are in a background thread, we cannot throw from
    324435      // here, instead interrupt workers and return early.
    325       if (!error_.empty()) {
     436      if (!data_->error().empty()) {
    326437        // In case queue is empty, workers might be be stuck
    327438        // waiting for job queue to pop, then send them a poison pill so
    328439        // they stop.
    329440        boost::shared_ptr<Job> end;
    330         queue_.push(end);
     441        data_->queue().push(end);
    331442        // For other cases (queue is not empty) send workers a
    332443        // interrupt signal and wait for them to wrap up.
    333444        workers.interrupt_all();
    334         running_jobs_ = 0;
    335         queue_.clear();
     445        data_->running_jobs().set(0);
     446        data_->queue().clear();
    336447        return;
    337448      }
     
    343454    // kill workers
    344455    boost::shared_ptr<Job> end;
    345     queue_.push(end);
     456    data_->queue().push(end);
    346457    workers.join_all();
    347458  }
  • branches/0.16-stable/yat/utility/Scheduler.h

    r3694 r3808  
    2323*/
    2424
    25 #include "config_public.h"
    2625#include "PriorityQueue.h"
    2726#include "Queue.h"
     
    3130#include <boost/shared_ptr.hpp>
    3231
    33 #ifdef YAT_HAVE_ATOMIC
    34 #include <atomic>
    35 #endif
    3632#include <set>
    3733#include <deque>
     
    8783  class Scheduler
    8884  {
    89 #ifdef YAT_HAVE_ATOMIC
    90     typedef std::atomic<int> running_jobs_type;
    91 #else
    92     typedef int running_jobs_type;
    93 #endif
    9485  public:
    9586    /**
     
    218209    }; // end class Worker
    219210
     211
     212    class JobHandlerData
     213    {
     214    public:
     215      /// thread-safe class around int
     216      class Count
     217      {
     218      public:
     219        /// Constructor
     220        explicit Count(int x=0);
     221        /// increase value with 1
     222        void decrement(void);
     223        /// return value
     224        int get(void) const;
     225        /// decrease value with 1
     226        void increment(void);
     227        /// modify value
     228        void set(int x);
     229      private:
     230        mutable boost::mutex mutex_;
     231        int x_;
     232      };
     233
     234      JobHandlerData(unsigned int threads);
     235      Queue<boost::exception_ptr>& error(void) const;
     236
     237      const Queue<JobPtr>& jobs(void) const;
     238      Queue<JobPtr>& jobs(void);
     239      const JobQueue& queue(void) const;
     240      JobQueue& queue(void);
     241
     242      const Count& job_count(void) const;
     243      Count& job_count(void);
     244
     245      const Count& running_jobs(void) const;
     246      Count& running_jobs(void);
     247
     248      const Count& threads(void) const;
     249      Count& threads(void);
     250    private:
     251      mutable Queue<boost::exception_ptr> error_;
     252      Queue<JobPtr> jobs_;
     253      JobQueue queue_;
     254
     255      Count job_count_;
     256      Count running_jobs_;
     257      Count threads_;
     258    };
     259
     260
    220261    // \internal Class that handles job
    221262    class JobHandler
    222263    {
    223264    public:
    224       JobHandler(unsigned int threads, JobQueue& queue, Queue<JobPtr>& jobs,
    225                  running_jobs_type& running_jobs,
    226                  Queue<boost::exception_ptr>& error);
     265      JobHandler(JobHandlerData& data);
    227266
    228267      void operator()(void);
     
    237276
    238277      void send2queue(JobPtr& job);
    239       unsigned int threads_;
    240       JobQueue& queue_;
    241       Queue<JobPtr>& jobs_;
    242       running_jobs_type& running_jobs_;
    243       Queue<boost::exception_ptr>& error_;
    244       int job_counter_;
     278
     279      JobHandlerData* data_;
    245280    };
    246281
     
    263298    void throw_if_error(void) const;
    264299
    265     JobQueue queue_;
    266     Queue<JobPtr> jobs_;
    267     running_jobs_type running_jobs_;
    268     mutable Queue<boost::exception_ptr> error_;
     300    JobHandlerData data_;
    269301    boost::thread job_handler_;
    270302  }; // end class Scheduler
Note: See TracChangeset for help on using the changeset viewer.