source: trunk/yat/utility/Scheduler.h @ 3406

Last change on this file since 3406 was 3406, checked in by Peter, 4 years ago

docs. refs #800

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 5.8 KB
Line 
1#ifndef theplu_yat_utility_scheduler
2#define theplu_yat_utility_scheduler
3
4// $Id: Scheduler.h 3406 2015-04-07 00:22:45Z peter $
5
6/*
7  Copyright (C) 2014 Peter Johansson
8
9  This file is part of the yat library, http://dev.thep.lu.se/yat
10
11  The yat library is free software; you can redistribute it and/or
12  modify it under the terms of the GNU General Public License as
13  published by the Free Software Foundation; either version 3 of the
14  License, or (at your option) any later version.
15
16  The yat library is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  General Public License for more details.
20
21  You should have received a copy of the GNU General Public License
22  along with yat. If not, see <http://www.gnu.org/licenses/>.
23*/
24
25#include "PriorityQueue.h"
26#include "Queue.h"
27
28#include <boost/exception_ptr.hpp>
29#include <boost/thread.hpp>
30#include <boost/shared_ptr.hpp>
31
32#include <set>
33#include <deque>
34
35namespace theplu {
36namespace yat {
37namespace utility {
38
39  /**
40     \brief Handle a number of jobs and send them to threads
41
42     Scheduler starts a (user defined) number of threads and handles a
43     series of Jobs. The Jobs can have dependencies, such that Job X
44     must finish before Job Y can run, and the Scheduler takes care of
45     these dependencies. Here is a small code example in which two
46     threads are used to process the four jobs, \c MyJob. The jobs
47     have a dependency that job4 can not run until the three first
48     jobs have completed.
49
50     \code
51
52     Scheduler scheduler(2);
53     boost::shared_ptr<MyJob> job1(new MyJob("Hello"));
54     boost::shared_ptr<MyJob> job2(new MyJob(" "));
55     boost::shared_ptr<MyJob> job3(new MyJob("World"));
56     boost::shared_ptr<MyJob> job4(new MyJob("\n"));
57     scheduler.add_dependency(job4, job1);
58     scheduler.add_dependency(job4, job2);
59     scheduler.add_dependency(job4, job3);
60     scheduler.submit(job4);
61     scheduler.wait();
62
63     \endcode
64
65     The Scheduler sends jobs to the workers taking into account
66     dependencies, priorities and the order in which the Scheduler has
67     seen the Jobs. Of the Jobs that are ready to run, i.e., all jobs
68     it depends on have completed, the Scheduler chose the Job with
69     highest priority. If two Jobs have the same priority, the
70     Scheduler sends them in the order of appearance, i.e., if the
71     Scheduler saw Job X before Job Y, Job X is run before Job X.
72
73     \note In the current implementation all submitted jobs have
74     to be completed before the Scheduler goes out of scope, which can
75     be accomplished by calling wait() or interrupt().
76
77     \note Currently, the Scheduler cannot be recycled, i.e.,
78     wait(void) or interruped(void) have been called, the behaviour of
79     submit() is undefined.
80
81     \since New in yat 0.13
82   */
83  class Scheduler
84  {
85  public:
86    /**
87       Base class that defines the interface for a Job
88     */
89    class Job
90    {
91    public:
92      /**
93         \brief constructor
94       */
95      explicit Job(unsigned int prio=0);
96
97      /**
98         \brief destructor
99       */
100      virtual ~Job(void);
101
102      /**
103         Jobs with greater priority are run before Jobs with less priority.
104       */
105      unsigned int priority(void) const;
106
107      /**
108         This function defines the work done in thread.
109       */
110      virtual void operator()(void)=0;
111    private:
112      friend class Scheduler;
113      // set of jobs that have to finish before this can run
114      std::set<boost::shared_ptr<Job> > prerequisite_;
115      // jobs that have *this as prerequisite
116      std::vector<boost::shared_ptr<Job> > observers_;
117      enum status { pristine, prepared, running, completed};
118      status status_;
119      unsigned priority_;
120      unsigned id_;
121      boost::exception_ptr error_;
122    }; // end class Job
123
124    /**
125       \brief constructor
126
127       \param threads number of threads that are used
128     */
129    Scheduler(unsigned int threads);
130
131    /**
132       \brief add a dependency rule
133
134       Add a dependency that Job \a prerequisite has to complete
135       before Job \a job is run.
136     */
137    void add_dependency(boost::shared_ptr<Job> job,
138                        boost::shared_ptr<Job> prerequisite);
139    /**
140       \brief interrrupt all jobs
141     */
142    void interrupt(void);
143
144    /**
145       \brief submit a \a job to Scheduler
146
147       If \a job depends on other jobs, they are also submitted to the
148       Scheduler.
149
150       \note If wait() or interrupt() have been called, the behaviour of
151       submit() is undefined.
152     */
153    void submit(boost::shared_ptr<Job> job);
154
155    /**
156       \brief wait for all jobs to finish
157     */
158    void wait(void);
159
160  private:
161    typedef boost::shared_ptr<Scheduler::Job> JobPtr;
162
163    struct LowerPriority
164    {
165      bool operator()(const JobPtr& lhs, const JobPtr& rhs) const;
166    };
167
168    // some typedefs for convenience
169    typedef PriorityQueue<JobPtr, LowerPriority> JobQueue;
170
171    // \internal class that does the job
172    //
173    // It processes any job that is pushed to the \a queue until a
174    // NULL Job shows up which signals that the work is done. When
175    // NULL is observed the NULL Job is pushed to the queue so
176    // co-workers are notified too.
177    class Worker
178    {
179    public:
180      Worker(JobQueue& queue, Queue<JobPtr>& completed);
181      void operator()(void);
182    private:
183      JobQueue& queue_;
184      Queue<JobPtr>& completed_;
185    }; // end class Worker
186
187    // function called when job has finished and returned from
188    // worker. If there are any jobs that depend on \a job, those jobs
189    // are notified and if it makes them ready to be processed they
190    // are sent to queue.
191    void post_process(boost::shared_ptr<Job> job);
192
193    // If \a job has parent jobs, which need to finish first, update
194    // map children_ to reflect that. If all parents have finished,
195    // send job to queue.
196    void process(boost::shared_ptr<Job> job);
197
198    // send job to queue
199    void queue(boost::shared_ptr<Job> job);
200
201    int running_jobs_;
202
203    JobQueue queue_;
204    Queue<JobPtr> completed_;
205    boost::thread_group workers_;
206    unsigned long int job_counter_;
207  }; // end class Scheduler
208
209}}}
210#endif
Note: See TracBrowser for help on using the repository browser.