Changeset 3848


Ignore:
Timestamp:
Sep 23, 2019, 2:07:23 AM (3 years ago)
Author:
Peter
Message:

add functionality to alter number of threads used by Scheduler. closes #910

Location:
trunk
Files:
1 added
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/test/Makefile.am

    r3828 r3848  
    104104  test/scheduler5.test \
    105105  test/scheduler6.test \
     106  test/scheduler7.test \
    106107  test/segment.test test/smart_ptr.test \
    107108  test/smith_waterman.test \
  • trunk/test/scheduler2.cc

    r3402 r3848  
    4848private:
    4949  int value;
     50  // We are running test with single-threaded scheduler, so an unsafe
     51  // vector is okay.
    5052  std::vector<int>& result;
    5153};
  • trunk/test/scheduler3.cc

    r3423 r3848  
    7676  try {
    7777    run();
     78    suite.err() << "error: no exception thrown\n";
    7879    suite.add(false);
    7980  }
  • trunk/yat/utility/Scheduler.cc

    r3828 r3848  
    2525
    2626#include <boost/exception/all.hpp>
     27#include <boost/make_shared.hpp>
    2728
    2829#include <cassert>
     
    3233namespace utility {
    3334
    34   Scheduler::Scheduler(unsigned int threads)
    35     : data_(threads), job_handler_(JobHandler(data_))
    36   {
    37     assert(threads);
     35  Scheduler::Scheduler(unsigned int n_threads)
     36    : job_handler_(JobHandler(data_))
     37  {
     38    assert(n_threads);
     39    this->threads(n_threads);
    3840  }
    3941
     
    4244                                 boost::shared_ptr<Job> prerequisite)
    4345  {
     46    throw_if_error();
     47    boost::unique_lock<boost::mutex> lock(mutex_);
    4448    assert(job->status_ == Job::pristine);
    4549    if (prerequisite->status_ != Job::completed) {
     
    4751      prerequisite->add_observer(job);
    4852    }
    49     throw_if_error();
    5053  }
    5154
     
    5861
    5962    // first signal to JobHandler that Scheduler is waiting
    60     boost::shared_ptr<Job> end;
    61     data_.jobs().push(end);
    62 
    63     job_handler_.interrupt();
     63    MessengerPtr msg(new InterruptWorkers);
     64    data_.messengers().push(msg);
     65  }
     66
     67
     68  size_t Scheduler::jobs(void) const
     69  {
    6470    throw_if_error();
    65   }
    66 
    67 
    68   size_t Scheduler::jobs(void) const
    69   {
    70     throw_if_error();
    71     return data_.running_jobs().get() + data_.queue().size();
     71    return data_.running_jobs().get();
    7272  }
    7373
     
    7979    // launch a new thread executing a JobHandler.
    8080    if (!job_handler_.joinable()) {
    81       assert(data_.jobs().empty());
     81
    8282      /*
    8383        We cannot relaunch a thread, so instead we create a new thread
     
    9191      swap(tmp, job_handler_);
    9292    }
    93     data_.jobs().push(job);
    94   }
    95 
    96 
    97   int Scheduler::threads(void) const
    98   {
    99     return data_.threads().get();
     93    data_.messengers().push(boost::make_shared<JobMessenger>(job));
     94  }
     95
     96
     97  unsigned int Scheduler::threads(void) const
     98  {
     99    return data_.n_threads().get();
     100  }
     101
     102
     103  void Scheduler::threads(unsigned int n)
     104  {
     105    data_.n_threads().set(n);
     106    if (job_handler_.joinable())
     107      data_.messengers().push(boost::make_shared<WorkForceSizer>());
     108    assert(threads() == n);
    100109  }
    101110
     
    118127  void Scheduler::wait(void)
    119128  {
    120     throw_if_error();
    121129    if (!job_handler_.joinable())
    122130      return;
    123131
    124132    // first signal to JobHandler that Scheduler is waiting
    125     boost::shared_ptr<Job> end;
    126     data_.jobs().push(end);
     133    MessengerPtr msg(new SchedulerIsWaiting);
     134    data_.messengers().push(msg);
    127135
    128136    // wait for job handler to finish
     
    187195  // Scheduler::Worker
    188196
    189   Scheduler::Worker::Worker(JobQueue& q, Queue<JobPtr>& c)
     197  Scheduler::Worker::Worker(JobQueue& q, MessengerQueue& c)
    190198    : queue_(q), completed_(c)
    191199  {
     
    201209      // NULL job indicates poison pill
    202210      if (job.get()==NULL) {
    203         // kill other workers too
    204         queue_.push(job);
    205         break;
     211        completed_.push(boost::make_shared<JobMessenger>(job));
     212        return;
    206213      }
    207214      // action
     
    212219        job->error_ = boost::current_exception();
    213220        // return job to scheduler so it can act on the error
    214         completed_.push(job);
     221        completed_.push(boost::make_shared<JobMessenger>(job));
    215222        // exit work and go home
    216223        return;
    217224      }
    218225      // return job to scheduler
    219       completed_.push(job);
     226      completed_.push(boost::make_shared<JobMessenger>(job));
    220227
    221228      // Make sure we can be interrupted
     
    225232
    226233
     234  // Scheduler::Messenger
     235  Scheduler::JobMessenger::JobMessenger(const Scheduler::JobPtr& job)
     236    : job_(job)
     237  {
     238  }
     239
     240
     241  void Scheduler::JobMessenger::operator()(Scheduler::JobHandler& handler)
     242  {
     243    if (job_)
     244      handler.process(job_);
     245  }
     246
     247
     248  void Scheduler::SchedulerIsWaiting::operator()(Scheduler::JobHandler& handler)
     249  {
     250    handler.scheduler_is_waiting(true);
     251  }
     252
     253
     254  void Scheduler::InterruptWorkers::operator()(Scheduler::JobHandler& handler)
     255  {
     256    handler.scheduler_is_waiting(true);
     257    handler.interrupt_workers();
     258  }
     259
     260
     261  void Scheduler::WorkForceSizer::operator()(Scheduler::JobHandler& handler)
     262  {
     263    unsigned int wanted_n = handler.data().n_threads().get();
     264    unsigned int current_n = handler.n_target_workers();
     265
     266    if (wanted_n < current_n) {
     267      handler.kill_workers(current_n - wanted_n);
     268    }
     269    else if (wanted_n > current_n) {
     270      handler.remove_joined_workers();
     271      handler.create_workers(wanted_n - current_n);
     272    }
     273  }
     274
     275
    227276  // Scheduler::JobHandlerData
    228   Scheduler::JobHandlerData::JobHandlerData(unsigned int threads)
    229     : job_count_(0), running_jobs_(0), threads_(threads)
     277  Scheduler::JobHandlerData::JobHandlerData(void)
     278    : running_jobs_(0)
    230279  {}
    231280
     
    238287
    239288
    240   const Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void) const
    241   {
    242     return jobs_;
    243   }
    244 
    245 
    246   Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void)
    247   {
    248     return jobs_;
     289  const Scheduler::MessengerQueue&
     290  Scheduler::JobHandlerData::messengers(void) const
     291  {
     292    return messengers_;
     293  }
     294
     295
     296  Scheduler::MessengerQueue& Scheduler::JobHandlerData::messengers(void)
     297  {
     298    return messengers_;
    249299  }
    250300
     
    259309  {
    260310    return queue_;
    261   }
    262 
    263 
    264   const Scheduler::JobHandlerData::Count&
    265   Scheduler::JobHandlerData::job_count(void) const
    266   {
    267     return job_count_;
    268   }
    269 
    270 
    271   Scheduler::JobHandlerData::Count&
    272   Scheduler::JobHandlerData::job_count(void)
    273   {
    274     return job_count_;
    275311  }
    276312
     
    291327
    292328  const Scheduler::JobHandlerData::Count&
    293   Scheduler::JobHandlerData::threads(void) const
     329  Scheduler::JobHandlerData::n_threads(void) const
    294330  {
    295331    return threads_;
     
    298334
    299335  Scheduler::JobHandlerData::Count&
    300   Scheduler::JobHandlerData::threads(void)
     336  Scheduler::JobHandlerData::n_threads(void)
    301337  {
    302338    return threads_;
     
    314350  {
    315351    boost::unique_lock<boost::mutex> lock(mutex_);
     352    assert(x_ > 0);
    316353    --x_;
    317354  }
     
    342379
    343380  Scheduler::JobHandler::JobHandler(Scheduler::JobHandlerData& data)
    344     : data_(&data)
     381    : data_(&data), n_target_workers_(0)
    345382  {}
    346383
     384
     385  void Scheduler::JobHandler::create_workers(unsigned int n)
     386  {
     387    for (size_t i=0; i<n; ++i) {
     388      Worker worker(data_->queue(), data_->messengers());
     389      workers_.push_back(boost::make_shared<boost::thread>(worker));
     390    }
     391    n_target_workers_ += n;
     392  }
     393
     394
     395  Scheduler::JobHandlerData& Scheduler::JobHandler::data(void)
     396  {
     397    assert(data_);
     398    return *data_;
     399  }
     400
     401
     402  void Scheduler::JobHandler::kill_workers(unsigned int n)
     403  {
     404    assert(n_target_workers_ >= n);
     405    JobPtr poison_pill;
     406    for (size_t i=0; i<n; ++i) {
     407      data_->queue().push(poison_pill);
     408    }
     409    n_target_workers_ -= n;
     410  }
     411
     412
     413  void Scheduler::JobHandler::kill_workers(void)
     414  {
     415    kill_workers(n_target_workers_);
     416    assert(n_target_workers_ == 0);
     417  }
     418
     419
     420  void Scheduler::JobHandler::interrupt_workers(void)
     421  {
     422    // FIXME this might leak memory, see ticket #930
     423    // http://dev.thep.lu.se/yat/ticket/930#ticket
     424    data_->queue().clear();
     425    kill_workers();
     426  }
     427
     428
     429  unsigned int Scheduler::JobHandler::n_target_workers(void) const
     430  {
     431    return n_target_workers_;
     432  }
    347433
    348434
     
    351437    assert(job);
    352438    assert(data_);
     439    assert(data_->running_jobs().get() > 0);
    353440    data_->running_jobs().decrement();
    354441    assert(data_->running_jobs().get() >= 0);
     
    382469    assert(job->status_ == Job::pristine);
    383470    job->status_ = Job::prepared;
    384     job->id_ = data_->job_count().get();
    385     data_->job_count().increment();
     471    job->id_ = job_count_;
     472    ++job_count_;
    386473
    387474    // If job have prerequisite that need to be run first, process them
     
    411498
    412499
     500  void Scheduler::JobHandler::remove_joined_workers(void)
     501  {
     502    for (WorkerList::iterator w=workers_.begin(); w!=workers_.end(); ) {
     503      if ((*w)->joinable() == false)
     504        workers_.erase(w++);
     505      else
     506        ++w;
     507    }
     508  }
     509
     510
    413511  void Scheduler::JobHandler::send2queue(JobPtr& job)
    414512  {
     
    422520  void Scheduler::JobHandler::process(JobPtr& job)
    423521  {
     522    assert(job);
    424523    switch (job->status_) {
    425524    case(Job::pristine):
     
    436535
    437536
     537  void Scheduler::JobHandler::wait_workers(void) const
     538  {
     539    for (WorkerList::const_iterator w=workers_.begin(); w!=workers_.end(); ++w)
     540      if ((*w)->joinable())
     541        (*w)->join();
     542  }
     543
     544
    438545  void Scheduler::JobHandler::operator()(void)
    439546  {
    440547    assert(data_);
    441     boost::thread_group workers;
    442     for (int i=0; i < data_->threads().get(); ++i)
    443       workers.create_thread(Worker(data_->queue(), data_->jobs()));
    444     // Process jobs (in jobs_) coming both from Scheduler and
    445     // completed jobs from Workers until Scheduler is waiting
    446     // (signaled by a null Job), jobs_ is empty and there are no
    447     // running jobs.
    448 
    449     // true indicates that Scheduler::wait (or ::interrupt) has been called
    450     bool scheduler_is_waiting = false;
    451     JobPtr job;
    452 
    453     while (!scheduler_is_waiting || data_->running_jobs().get() ||
    454            !data_->jobs().empty()) {
    455       data_->jobs().pop(job);
    456       if (job)
    457         process(job);
    458       else
    459         scheduler_is_waiting = true;
     548    assert(workers_.empty());
     549    create_workers(data_->n_threads().get());
     550
     551    // Process messengers stored in data_->messengers() coming from
     552    // both the Scheduler and Workers. We keep waiting here until:
     553    //   1) scheduler_is_waiting is true i.e. Scheduler::wait has been
     554    //   called
     555    //   2) There are no jobs running
     556    //   3) There are no more messengers
     557    MessengerPtr msg;
     558
     559    while (!scheduler_is_waiting_ || data_->running_jobs().get() ||
     560           !data_->messengers().empty()) {
     561      data_->messengers().pop(msg);
     562      (*msg)(*this);
    460563
    461564      // Check for error
     
    464567      // here, instead interrupt workers and return early.
    465568      if (!data_->error().empty()) {
    466         // In case queue is empty, workers might be be stuck
    467         // waiting for job queue to pop, then send them a poison pill so
    468         // they stop.
    469         boost::shared_ptr<Job> end;
    470         data_->queue().push(end);
    471         // For other cases (queue is not empty) send workers a
    472         // interrupt signal and wait for them to wrap up.
    473         workers.interrupt_all();
    474569        data_->running_jobs().set(0);
    475         data_->queue().clear();
    476         return;
     570        interrupt_workers();
     571        break;
    477572      }
    478573
     
    481576    }
    482577
    483     // kill workers
    484     boost::shared_ptr<Job> end;
    485     data_->queue().push(end);
    486     workers.join_all();
     578    // give poison pills to workers
     579    kill_workers();
     580    // wait for workers
     581    wait_workers();
     582    // clean up
     583    workers_.clear();
     584  }
     585
     586
     587  void Scheduler::JobHandler::scheduler_is_waiting(bool value)
     588  {
     589    scheduler_is_waiting_ = value;
    487590  }
    488591
  • trunk/yat/utility/Scheduler.h

    r3828 r3848  
    3030#include <boost/shared_ptr.hpp>
    3131
     32#include <list>
    3233#include <set>
    33 #include <deque>
     34#include <vector>
    3435
    3536namespace theplu {
     
    9293      /**
    9394         \brief constructor
     95         \param prio sets the priority. Jobs with greater priority are
     96         run before jobs with lower priority (when possible).
    9497       */
    9598      explicit Job(unsigned int prio=0);
     
    171174       If \a job depends on other jobs, they are also submitted to the
    172175       Scheduler.
    173 
    174        \note If wait() or interrupt() have been called, the behaviour of
    175        submit() is undefined.
    176176     */
    177177    void submit(const boost::shared_ptr<Job>& job);
     
    182182       \since new in yat 0.17
    183183     */
    184     int threads(void) const;
     184    unsigned int threads(void) const;
     185
     186    /**
     187       Change number of threads used. If \a n is greater than current
     188       number of threads, additional threads are launched. If \a n is
     189       smaller than current number used, a number of threads are
     190       notified to end after the current job has been completed. A
     191       decrease in threads therefore does not have an immediate
     192       effect.
     193
     194       \since new in yat 0.17
     195     */
     196    void threads(unsigned int n);
    185197
    186198    /**
     
    190202
    191203  private:
     204    // forward declaration
     205    class JobHandler;
     206
    192207    typedef boost::shared_ptr<Scheduler::Job> JobPtr;
     208
     209    class Messenger
     210    {
     211    public:
     212      virtual void operator()(JobHandler& handler)=0;
     213    };
     214
     215    typedef boost::shared_ptr<Messenger> MessengerPtr;
     216    typedef Queue<MessengerPtr> MessengerQueue;
     217
     218    class JobMessenger : public Messenger
     219    {
     220    public:
     221      JobMessenger(const JobPtr&);
     222      void operator()(JobHandler& handler);
     223    private:
     224      JobPtr job_;
     225    };
     226
     227
     228    class SchedulerIsWaiting : public Messenger
     229    {
     230    public:
     231      void operator()(JobHandler& handler);
     232    };
     233
     234
     235    class InterruptWorkers : public Messenger
     236    {
     237    public:
     238      void operator()(JobHandler& handler);
     239    };
     240
     241
     242    // Class used to change number of workers
     243    class WorkForceSizer : public Messenger
     244    {
     245    public:
     246      void operator()(JobHandler& handler);
     247    };
     248
    193249
    194250    struct LowerPriority
     
    209265    {
    210266    public:
    211       Worker(JobQueue& queue, Queue<JobPtr>& completed);
     267      Worker(JobQueue& queue, MessengerQueue& completed);
    212268      void operator()(void);
    213269    private:
    214270      JobQueue& queue_;
    215       Queue<JobPtr>& completed_;
     271      MessengerQueue& completed_;
    216272    }; // end class Worker
    217273
     
    239295      };
    240296
    241       JobHandlerData(unsigned int threads);
     297      JobHandlerData(void);
    242298      Queue<boost::exception_ptr>& error(void) const;
    243299
    244       const Queue<JobPtr>& jobs(void) const;
    245       Queue<JobPtr>& jobs(void);
    246       const JobQueue& queue(void) const;
     300      const MessengerQueue& messengers(void) const;
     301      MessengerQueue& messengers(void); //
     302      const JobQueue& queue(void) const; //
    247303      JobQueue& queue(void);
    248 
    249       const Count& job_count(void) const;
    250       Count& job_count(void);
    251304
    252305      const Count& running_jobs(void) const;
    253306      Count& running_jobs(void);
    254307
    255       const Count& threads(void) const;
    256       Count& threads(void);
     308      const Count& n_threads(void) const;
     309      Count& n_threads(void);
    257310    private:
    258311      mutable Queue<boost::exception_ptr> error_;
    259       Queue<JobPtr> jobs_;
     312      MessengerQueue messengers_;
     313      // This is the queue workers are consuming from.
    260314      JobQueue queue_;
    261315
    262       Count job_count_;
    263316      Count running_jobs_;
    264317      Count threads_;
     
    273326
    274327      void operator()(void);
    275     private:
     328      void create_workers(unsigned int n);
     329      JobHandlerData& data(void);
     330      void interrupt_workers(void);
     331      void kill_workers(void);
     332      void kill_workers(unsigned int n);
     333      // If \a job has parent jobs, which need to finish first, update
     334      // map children_ to reflect that. If all parents have finished,
     335      // send job to queue.
    276336      void process(JobPtr& job);
     337      void remove_joined_workers(void);
     338      void scheduler_is_waiting(bool value);
     339      unsigned int n_target_workers(void) const;
     340      void wait_workers(void) const;
     341    private:
    277342
    278343      // If job is ready to be submitted i.e. all prerequisite have
     
    280345      void prepare(JobPtr job);
    281346      // handle jobs returned from worker
     347      //
     348      // function called when job has finished and returned from
     349      // worker. If there are any jobs that depend on \a job, those jobs
     350      // are notified and if it makes them ready to be processed they
     351      // are sent to queue.
    282352      void post_process(JobPtr job);
    283353
     
    285355
    286356      JobHandlerData* data_;
    287     };
    288 
    289 
    290     // function called when job has finished and returned from
    291     // worker. If there are any jobs that depend on \a job, those jobs
    292     // are notified and if it makes them ready to be processed they
    293     // are sent to queue.
    294     void post_process(boost::shared_ptr<Job> job);
    295 
    296     // If \a job has parent jobs, which need to finish first, update
    297     // map children_ to reflect that. If all parents have finished,
    298     // send job to queue.
    299     void process(boost::shared_ptr<Job> job);
    300 
    301     // send job to queue
    302     void queue(boost::shared_ptr<Job> job);
     357      bool scheduler_is_waiting_;
     358      unsigned int job_count_;
     359      void join_workers(void);
     360
     361      typedef std::list<boost::shared_ptr<boost::thread> > WorkerList;
     362      // FIXME is this needed here or can it be in
     363      // JobHandler::operator() scope?  The reason we keep it here is
     364      // so we can access it from other functions and don't need to
     365      // pass it around.
     366      WorkerList workers_;
     367      // Number of Workers minus number of poison pills sent for them
     368      unsigned int n_target_workers_;
     369    }; // end class JobHandler
    303370
    304371    // If an error has been detected (and stored in error_), rethrow it.
    305372    void throw_if_error(void) const;
    306373
     374    mutable boost::mutex mutex_;
    307375    JobHandlerData data_;
    308376    boost::thread job_handler_;
Note: See TracChangeset for help on using the changeset viewer.