Changeset 3681


Ignore:
Timestamp:
Aug 22, 2017, 5:29:56 AM (5 weeks ago)
Author:
peter
Message:

closes #896

Reimplement Scheduler. It now spawns a specific thread to handle the
jobs (see class JobHandler?). It means that the processing that
previously was done in Schedyuler::wait is done is this thread instead
and wait is now free of processing. The redesign has the side effect
that functions like Scheduler::submit and Scheduler::add_dependency
are more thread safe (more analysis warranted). Otherwise the
behaviour is virtually unchanged, except that processing of completed
(checking if their dependants are ready to queued) now starts running
immediately rather than only running when a Scheduler function was
called (::submit, ::add_dependancy or finally ::wait).

Location:
trunk/yat/utility
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/yat/utility/Scheduler.cc

    r3679 r3681  
    22
    33/*
    4         Copyright (C) 2014, 2015 Peter Johansson
     4        Copyright (C) 2014, 2015, 2017 Peter Johansson
    55
    66        This file is part of the yat library, http://dev.thep.lu.se/yat
     
    2727
    2828#include <cassert>
     29#include <iostream>
    2930
    3031namespace theplu {
     
    3334
    3435        Scheduler::Scheduler(unsigned int threads)
    35                 : running_jobs_(0), job_counter_(0)
     36                : running_jobs_(0),
     37                        job_handler_(JobHandler(threads, queue_, jobs_, running_jobs_, error_))
    3638        {
    3739                assert(threads);
    38                 for (size_t i=0; i<threads; ++i)
    39                         workers_.create_thread(Worker(queue_, completed_));
    4040        }
    4141
     
    4545        {
    4646                assert(job->status_ == Job::pristine);
    47                 job->prerequisite_.insert(prerequisite);
    48                 prerequisite->observers_.push_back(job);
     47                if (prerequisite->status_ != Job::completed) {
     48                        job->add_prerequisite(prerequisite);
     49                        prerequisite->add_observer(job);
     50                }
    4951        }
    5052
     
    5254        void Scheduler::interrupt(void)
    5355        {
    54                 workers_.interrupt_all();
     56                // first signal to JobHandler that Scheduler is waiting
     57                boost::shared_ptr<Job> end;
     58                jobs_.push(end);
     59
     60                job_handler_.interrupt();
     61                throw_if_error();
    5562        }
    5663
     
    6269
    6370
    64         void Scheduler::post_process(JobPtr job)
    65         {
    66                 --running_jobs_;
    67                 assert(running_jobs_>=0);
    68                 job->status_ = Job::completed;
    69 
    70                 // check if an error occurred
    71                 if (job->error_) {
    72                         interrupt();
    73                         boost::rethrow_exception(job->error_);
    74                 }
    75 
    76                 // for convenience
    77                 const std::vector<JobPtr>& vec = job->observers_;
    78                 // notify observers, jobs that have 'job' as prerequisite
    79                 for (size_t i=0; i<vec.size(); ++i) {
    80                         vec[i]->prerequisite_.erase(job);
    81                         if (vec[i]->prerequisite_.empty())
    82                                 queue(vec[i]);
    83                 }
    84         }
    85 
    86 
    87         void Scheduler::process(JobPtr job)
    88         {
    89                 if (job->status_!=Job::pristine)
    90                         return;
    91                 job->status_ = Job::prepared;
    92                 job->id_ = job_counter_;
    93                 ++job_counter_;
    94 
    95                 // If we have prerequisite that need to be run first, process them
    96 
    97                 typedef std::set<JobPtr>::iterator iterator;
    98                 for (iterator j=job->prerequisite_.begin(); j!=job->prerequisite_.end();) {
    99                         if ((*j)->status_ == Job::completed)
    100                                 job->prerequisite_.erase(j++);
    101                         else {
    102                                 process(*j);
    103                                 ++j;
    104                         }
    105                 }
    106 
    107                 // If all prerequisite are finished, send job to queue
    108                 if (job->prerequisite_.empty()) {
    109                         queue(job);
    110                 }
    111         }
    112 
    113 
    114         void Scheduler::queue(JobPtr job)
    115         {
    116                 job->status_ = Job::running;
    117                 ++running_jobs_;
    118                 assert(running_jobs_>0);
    119                 queue_.push(job);
    120         }
    121 
    122 
    123         void Scheduler::submit(JobPtr job)
    124         {
    125                 // If any job has been completed since last submission,
    126                 // post-process the completed job
    127                 JobPtr completed_job;
    128                 while (completed_.try_pop(completed_job))
    129                         post_process(completed_job);
    130                 process(job);
     71        void Scheduler::submit(const JobPtr& job)
     72        {
     73                jobs_.push(job);
     74        }
     75
     76
     77        void Scheduler::throw_if_error(void)
     78        {
     79                boost::exception_ptr error;
     80                if (error_.try_pop(error))
     81                        boost::rethrow_exception(error);
    13182        }
    13283
     
    13485        void Scheduler::wait(void)
    13586        {
    136                 while (running_jobs_ || completed_.size()) {
    137                         JobPtr job;
    138                         completed_.pop(job);
    139                         post_process(job);
    140                 }
    141 
    142                 // kill workers
     87                // first signal to JobHandler that Scheduler is waiting
    14388                boost::shared_ptr<Job> end;
    144                 queue_.push(end);
    145 
    146                 // wait for workers to finish
    147                 workers_.join_all();
     89                jobs_.push(end);
     90
     91                // wait for job handler to finish
     92                job_handler_.join();
     93                throw_if_error();
    14894        }
    14995
     
    177123
    178124                return lhs->id_ > rhs->id_;
     125        }
     126
     127
     128        void Scheduler::Job::add_prerequisite(const JobPtr& prereq)
     129        {
     130                boost::unique_lock<boost::mutex> lock(mutex_);
     131                prerequisite_.insert(prereq);
     132        }
     133
     134
     135        size_t Scheduler::Job::remove_prerequisite(const JobPtr& prereq)
     136        {
     137                boost::unique_lock<boost::mutex> lock(mutex_);
     138                prerequisite_.erase(prereq);
     139                return prerequisite_.size();
     140        }
     141
     142
     143        void Scheduler::Job::add_observer(const JobPtr& job)
     144        {
     145                boost::unique_lock<boost::mutex> lock(mutex_);
     146                observers_.push_back(job);
    179147        }
    180148
     
    219187        }
    220188
     189
     190        // Scheduler::JobHandler
     191
     192        Scheduler::JobHandler::JobHandler(unsigned int threads,
     193                                                                                                                                                JobQueue& queue,
     194                                                                                                                                                Queue<JobPtr>& jobs,
     195                                                                                                                                                int& running_jobs,
     196                                                                                                                                                Queue<boost::exception_ptr>& error)
     197                : workers_(new boost::thread_group),
     198                        queue_(queue), jobs_(jobs), running_jobs_(running_jobs),
     199                        error_(error), job_counter_(0)
     200        {
     201                for (size_t i=0; i<threads; ++i)
     202                        workers_->create_thread(Worker(queue_, jobs_));
     203        }
     204
     205
     206        void Scheduler::JobHandler::post_process(JobPtr job)
     207        {
     208                --running_jobs_;
     209                assert(running_jobs_>=0);
     210                job->status_ = Job::completed;
     211
     212                if (job->error_) {
     213                        error_.push(job->error_);
     214                        return;
     215                }
     216
     217                // status of job is now 'completed', so Scheduler::add_dependency
     218                // will not interfere with job->observers_
     219
     220                // for convenience
     221                std::vector<JobPtr>& observers = job->observers_;
     222                // notify observers, jobs that have 'job' as prerequisite
     223                for (size_t i=0; i<observers.size(); ++i)
     224                        if (!observers[i]->remove_prerequisite(job))
     225                                send2queue(observers[i]);
     226        }
     227
     228
     229        void Scheduler::JobHandler::prepare(JobPtr job)
     230        {
     231                assert(job->status_ == Job::pristine);
     232                job->status_ = Job::prepared;
     233                job->id_ = job_counter_;
     234                ++job_counter_;
     235
     236                // If job have prerequisite that need to be run first, process them
     237
     238                // job->prerequisite_ should be safe to access here without any
     239                // locking. In Scheduler it's only access by add_dependency and
     240                // it's illegal to 'add_dependency' for a job that is not
     241                // pristine.
     242
     243                typedef std::set<JobPtr>::iterator iterator;
     244                for (iterator j=job->prerequisite_.begin(); j!=job->prerequisite_.end();) {
     245                        if ((*j)->status_ == Job::pristine) {
     246                                prepare(*j);
     247                                ++j;
     248                        }
     249                        else if ((*j)->status_ == Job::completed)
     250                                job->prerequisite_.erase(j++);
     251                        else {
     252                                ++j;
     253                        }
     254                }
     255
     256                // If all prerequisite are finished, send job to queue
     257                if (job->prerequisite_.empty())
     258                        send2queue(job);
     259        }
     260
     261
     262        void Scheduler::JobHandler::send2queue(JobPtr& job)
     263        {
     264                job->status_ = Job::running;
     265                ++running_jobs_;
     266                assert(running_jobs_>0);
     267                queue_.push(job);
     268        }
     269
     270
     271        void Scheduler::JobHandler::process(JobPtr& job)
     272        {
     273                switch (job->status_) {
     274                case(Job::pristine):
     275                        prepare(job);
     276                        break;
     277                case(Job::running):
     278                        post_process(job);
     279                        break;
     280                default:
     281                        assert(0 &&
     282                                                 "job either pristine (fr. Scheduler) or running (fr. Worker)");
     283                }
     284        }
     285
     286
     287        void Scheduler::JobHandler::operator()(void)
     288        {
     289                // Process jobs (in jobs_) coming both from Scheduler and
     290                // completed jobs from Workers until Scheduler is waiting
     291                // (signaled by a null Job), jobs_ is empty and there are no
     292                // running jobs.
     293
     294                // true indicates that Scheduler::wait (or ::interrupt) has been called
     295                bool scheduler_is_waiting = false;
     296                JobPtr job;
     297
     298                while (!scheduler_is_waiting || running_jobs_ || jobs_.size()) {
     299                        jobs_.pop(job);
     300                        if (job)
     301                                process(job);
     302                        else
     303                                scheduler_is_waiting = true;
     304
     305                        // Check for error
     306                        //
     307                        // Since we are in a background thread, we cannot throw from
     308                        // here, instead interrupt workers and return early.
     309                        if (!error_.empty()) {
     310                                // In case queue is empty, workers might be be stuck
     311                                // waiting for job queue to pop, then send them a poison pill so
     312                                // they stop.
     313                                boost::shared_ptr<Job> end;
     314                                queue_.push(end);
     315                                // For other cases (queue is not empty) send workers a
     316                                // interrupt signal and wait for them to wrap up.
     317                                workers_->interrupt_all();
     318                                return;
     319                        }
     320
     321                        // Make sure we can be interrupted
     322                        boost::this_thread::interruption_point();
     323                }
     324
     325                // kill workers
     326                boost::shared_ptr<Job> end;
     327                queue_.push(end);
     328                workers_->join_all();
     329        }
     330
    221331}}}
  • trunk/yat/utility/Scheduler.h

    r3679 r3681  
    55
    66/*
    7         Copyright (C) 2014, 2015, 2016 Peter Johansson
     7        Copyright (C) 2014, 2015, 2016, 2017 Peter Johansson
    88
    99        This file is part of the yat library, http://dev.thep.lu.se/yat
     
    111111                private:
    112112                        friend class Scheduler;
     113                        friend class JobHandler;
     114                        mutable boost::mutex mutex_;
     115                        void add_prerequisite(const boost::shared_ptr<Job>&);
     116                        // \brief remove job from list of prerequisite
     117                        // \return number of prerequisite (after removal)
     118                        size_t remove_prerequisite(const boost::shared_ptr<Job>&);
     119                        void add_observer(const boost::shared_ptr<Job>&);
     120
    113121                        // set of jobs that have to finish before this can run
    114122                        std::set<boost::shared_ptr<Job> > prerequisite_;
    115123                        // jobs that have *this as prerequisite
    116124                        std::vector<boost::shared_ptr<Job> > observers_;
     125                        // - pristine is what it says
     126                        // - prepared - job has been either submitted directly a job that
     127                        // depends on job has been submitted et.c.
     128                        // - running - job has been sent to queue for workers to chew on
     129                        // - completed - job has returned
    117130                        enum status { pristine, prepared, running, completed};
    118131                        status status_;
     
    134147                         Add a dependency that Job \a prerequisite has to complete
    135148                         before Job \a job is run.
     149
     150                         Note, job cannot have been submitted, neither directly or
     151                         by being prerequisite of a submitted job.
    136152                 */
    137153                void add_dependency(boost::shared_ptr<Job> job,
     
    159175                         submit() is undefined.
    160176                 */
    161                 void submit(boost::shared_ptr<Job> job);
     177                void submit(const boost::shared_ptr<Job>& job);
    162178
    163179                /**
     
    193209                }; // end class Worker
    194210
     211                // \internal Class that handles job
     212                class JobHandler
     213                {
     214                public:
     215                        JobHandler(unsigned int threads, JobQueue& queue, Queue<JobPtr>& jobs,
     216                                                                 int& running_jobs, Queue<boost::exception_ptr>& error);
     217
     218                        void operator()(void);
     219                private:
     220                        void process(JobPtr& job);
     221
     222                        // If job is ready to be submitted i.e. all prerequisite have
     223                        // completed, then submit job to queue for workers to chew on.
     224                        void prepare(JobPtr job);
     225                        // handle jobs returned from worker
     226                        void post_process(JobPtr job);
     227
     228                        void send2queue(JobPtr& job);
     229                        // we need to be copyable
     230                        boost::shared_ptr<boost::thread_group> workers_;
     231                        JobQueue& queue_;
     232                        Queue<JobPtr>& jobs_;
     233                        int& running_jobs_;
     234                        Queue<boost::exception_ptr>& error_;
     235                        int job_counter_;
     236                };
     237
     238
    195239                // function called when job has finished and returned from
    196240                // worker. If there are any jobs that depend on \a job, those jobs
     
    207251                void queue(boost::shared_ptr<Job> job);
    208252
     253                // If an error has been detected (and stored in error_), rethrow it.
     254                void throw_if_error(void);
     255
     256                JobQueue queue_;
     257                Queue<JobPtr> jobs_;
    209258                int running_jobs_;
    210 
    211                 JobQueue queue_;
    212                 Queue<JobPtr> completed_;
    213                 boost::thread_group workers_;
    214                 unsigned long int job_counter_;
     259                Queue<boost::exception_ptr> error_;
     260                boost::thread job_handler_;
    215261        }; // end class Scheduler
    216262
Note: See TracChangeset for help on using the changeset viewer.