source: trunk/yat/utility/Scheduler.h @ 3826

Last change on this file since 3826 was 3826, checked in by Peter, 3 years ago

function to access number of threads in Scheduler. closes #914

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 8.3 KB
Line 
1#ifndef theplu_yat_utility_scheduler
2#define theplu_yat_utility_scheduler
3
4// $Id: Scheduler.h 3826 2019-07-18 06:47:27Z peter $
5
6/*
7  Copyright (C) 2014, 2015, 2016, 2017, 2019 Peter Johansson
8
9  This file is part of the yat library, http://dev.thep.lu.se/yat
10
11  The yat library is free software; you can redistribute it and/or
12  modify it under the terms of the GNU General Public License as
13  published by the Free Software Foundation; either version 3 of the
14  License, or (at your option) any later version.
15
16  The yat library is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  General Public License for more details.
20
21  You should have received a copy of the GNU General Public License
22  along with yat. If not, see <http://www.gnu.org/licenses/>.
23*/
24
25#include "PriorityQueue.h"
26#include "Queue.h"
27
28#include <boost/exception_ptr.hpp>
29#include <boost/thread.hpp>
30#include <boost/shared_ptr.hpp>
31
32#include <set>
33#include <deque>
34
35namespace theplu {
36namespace yat {
37namespace utility {
38
39  /**
40     \brief Handle a number of jobs and send them to threads
41
42     Scheduler starts a (user defined) number of threads and handles a
43     series of Jobs. The Jobs can have dependencies, such that Job X
44     must finish before Job Y can run, and the Scheduler takes care of
45     these dependencies. Here is a small code example in which two
46     threads are used to process the four jobs, \c MyJob. The jobs
47     have a dependency that job4 can not run until the three first
48     jobs have completed.
49
50     \code
51
52     Scheduler scheduler(2);
53     boost::shared_ptr<MyJob> job1(new MyJob("Hello"));
54     boost::shared_ptr<MyJob> job2(new MyJob(" "));
55     boost::shared_ptr<MyJob> job3(new MyJob("World"));
56     boost::shared_ptr<MyJob> job4(new MyJob("\n"));
57     scheduler.add_dependency(job4, job1);
58     scheduler.add_dependency(job4, job2);
59     scheduler.add_dependency(job4, job3);
60     scheduler.submit(job4);
61     scheduler.wait();
62
63     \endcode
64
65     The Scheduler sends jobs to the workers taking into account
66     dependencies, priorities and the order in which the Scheduler has
67     seen the Jobs. If the Jobs that are ready to run, i.e., all jobs
68     it depends on have completed, the Scheduler choose the Job with
69     highest priority. If two Jobs have the same priority, the
70     Scheduler sends them in the order of appearance, i.e., if the
71     Scheduler saw Job X before Job Y, Job X is run before Job X.
72
73     \note In the current implementation all submitted jobs have
74     to be completed before the Scheduler goes out of scope, which can
75     be accomplished by calling wait() or interrupt().
76
77     \note Currently, the Scheduler cannot be recycled, i.e.,
78     if wait(void) or interruped(void) have been called, the behaviour of
79     submit() is undefined.
80
81     \since New in yat 0.13
82   */
83  class Scheduler
84  {
85  public:
86    /**
87       Base class that defines the interface for a Job
88     */
89    class Job
90    {
91    public:
92      /**
93         \brief constructor
94       */
95      explicit Job(unsigned int prio=0);
96
97      /**
98         \brief destructor
99       */
100      virtual ~Job(void);
101
102      /**
103         Jobs with greater priority are run before Jobs with less priority.
104       */
105      unsigned int priority(void) const;
106
107      /**
108         This function defines the work done in thread.
109       */
110      virtual void operator()(void)=0;
111    private:
112      friend class Scheduler;
113      friend class JobHandler;
114      mutable boost::mutex mutex_;
115      void add_prerequisite(const boost::shared_ptr<Job>&);
116      // \brief remove job from list of prerequisite
117      // \return number of prerequisite (after removal)
118      size_t remove_prerequisite(const boost::shared_ptr<Job>&);
119      void add_observer(const boost::shared_ptr<Job>&);
120
121      // set of jobs that have to finish before this can run
122      std::set<boost::shared_ptr<Job> > prerequisite_;
123      // jobs that have *this as prerequisite
124      std::vector<boost::shared_ptr<Job> > observers_;
125      // - pristine is what it says
126      // - prepared - job has been either submitted directly a job that
127      // depends on job has been submitted et.c.
128      // - running - job has been sent to queue for workers to chew on
129      // - completed - job has returned
130      enum status { pristine, prepared, running, completed};
131      status status_;
132      unsigned priority_;
133      unsigned id_;
134      boost::exception_ptr error_;
135    }; // end class Job
136
137    /**
138       \brief constructor
139
140       \param threads number of threads that are used
141     */
142    Scheduler(unsigned int threads);
143
144    /**
145       \brief add a dependency rule
146
147       Add a dependency that Job \a prerequisite has to complete
148       before Job \a job is run.
149
150       Note, job cannot have been submitted, neither directly or
151       by being prerequisite of a submitted job.
152     */
153    void add_dependency(boost::shared_ptr<Job> job,
154                        boost::shared_ptr<Job> prerequisite);
155    /**
156       \brief interrrupt all jobs
157     */
158    void interrupt(void);
159
160    /**
161       \return Number of jobs that are either running or queued, i.e.,
162       jobs that are waiting for a dependency are not counted.
163
164       \since New in yat 0.15
165     */
166    size_t jobs(void) const;
167
168    /**
169       \brief submit a \a job to Scheduler
170
171       If \a job depends on other jobs, they are also submitted to the
172       Scheduler.
173
174       \note If wait() or interrupt() have been called, the behaviour of
175       submit() is undefined.
176     */
177    void submit(const boost::shared_ptr<Job>& job);
178
179    /**
180       \return number of threads used
181
182       \since new in yat 0.17
183     */
184    int threads(void) const;
185
186    /**
187       \brief wait for all jobs to finish
188     */
189    void wait(void);
190
191  private:
192    typedef boost::shared_ptr<Scheduler::Job> JobPtr;
193
194    struct LowerPriority
195    {
196      bool operator()(const JobPtr& lhs, const JobPtr& rhs) const;
197    };
198
199    // some typedefs for convenience
200    typedef PriorityQueue<JobPtr, LowerPriority> JobQueue;
201
202    // \internal class that does the job
203    //
204    // It processes any job that is pushed to the \a queue until a
205    // NULL Job shows up which signals that the work is done. When
206    // NULL is observed the NULL Job is pushed to the queue so
207    // co-workers are notified too.
208    class Worker
209    {
210    public:
211      Worker(JobQueue& queue, Queue<JobPtr>& completed);
212      void operator()(void);
213    private:
214      JobQueue& queue_;
215      Queue<JobPtr>& completed_;
216    }; // end class Worker
217
218
219    class JobHandlerData
220    {
221    public:
222      /// thread-safe class around int
223      class Count
224      {
225      public:
226        /// Constructor
227        explicit Count(int x=0);
228        /// increase value with 1
229        void decrement(void);
230        /// return value
231        int get(void) const;
232        /// decrease value with 1
233        void increment(void);
234        /// modify value
235        void set(int x);
236      private:
237        mutable boost::mutex mutex_;
238        int x_;
239      };
240
241      JobHandlerData(unsigned int threads);
242      Queue<boost::exception_ptr>& error(void) const;
243
244      const Queue<JobPtr>& jobs(void) const;
245      Queue<JobPtr>& jobs(void);
246      const JobQueue& queue(void) const;
247      JobQueue& queue(void);
248
249      const Count& job_count(void) const;
250      Count& job_count(void);
251
252      const Count& running_jobs(void) const;
253      Count& running_jobs(void);
254
255      const Count& threads(void) const;
256      Count& threads(void);
257    private:
258      mutable Queue<boost::exception_ptr> error_;
259      Queue<JobPtr> jobs_;
260      JobQueue queue_;
261
262      Count job_count_;
263      Count running_jobs_;
264      Count threads_;
265    };
266
267
268    // \internal Class that handles job
269    class JobHandler
270    {
271    public:
272      JobHandler(JobHandlerData& data);
273
274      void operator()(void);
275    private:
276      void process(JobPtr& job);
277
278      // If job is ready to be submitted i.e. all prerequisite have
279      // completed, then submit job to queue for workers to chew on.
280      void prepare(JobPtr job);
281      // handle jobs returned from worker
282      void post_process(JobPtr job);
283
284      void send2queue(JobPtr& job);
285
286      JobHandlerData* data_;
287    };
288
289
290    // function called when job has finished and returned from
291    // worker. If there are any jobs that depend on \a job, those jobs
292    // are notified and if it makes them ready to be processed they
293    // are sent to queue.
294    void post_process(boost::shared_ptr<Job> job);
295
296    // If \a job has parent jobs, which need to finish first, update
297    // map children_ to reflect that. If all parents have finished,
298    // send job to queue.
299    void process(boost::shared_ptr<Job> job);
300
301    // send job to queue
302    void queue(boost::shared_ptr<Job> job);
303
304    // If an error has been detected (and stored in error_), rethrow it.
305    void throw_if_error(void) const;
306
307    JobHandlerData data_;
308    boost::thread job_handler_;
309  }; // end class Scheduler
310
311}}}
312#endif
Note: See TracBrowser for help on using the repository browser.