source: trunk/yat/utility/Scheduler.h @ 3823

Last change on this file since 3823 was 3823, checked in by Peter, 3 years ago

Merge release 0.16 into trunk

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 8.2 KB
Line 
1#ifndef theplu_yat_utility_scheduler
2#define theplu_yat_utility_scheduler
3
4// $Id: Scheduler.h 3823 2019-07-16 02:11:48Z peter $
5
6/*
7  Copyright (C) 2014, 2015, 2016, 2017, 2019 Peter Johansson
8
9  This file is part of the yat library, http://dev.thep.lu.se/yat
10
11  The yat library is free software; you can redistribute it and/or
12  modify it under the terms of the GNU General Public License as
13  published by the Free Software Foundation; either version 3 of the
14  License, or (at your option) any later version.
15
16  The yat library is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  General Public License for more details.
20
21  You should have received a copy of the GNU General Public License
22  along with yat. If not, see <http://www.gnu.org/licenses/>.
23*/
24
25#include "PriorityQueue.h"
26#include "Queue.h"
27
28#include <boost/exception_ptr.hpp>
29#include <boost/thread.hpp>
30#include <boost/shared_ptr.hpp>
31
32#include <set>
33#include <deque>
34
35namespace theplu {
36namespace yat {
37namespace utility {
38
39  /**
40     \brief Handle a number of jobs and send them to threads
41
42     Scheduler starts a (user defined) number of threads and handles a
43     series of Jobs. The Jobs can have dependencies, such that Job X
44     must finish before Job Y can run, and the Scheduler takes care of
45     these dependencies. Here is a small code example in which two
46     threads are used to process the four jobs, \c MyJob. The jobs
47     have a dependency that job4 can not run until the three first
48     jobs have completed.
49
50     \code
51
52     Scheduler scheduler(2);
53     boost::shared_ptr<MyJob> job1(new MyJob("Hello"));
54     boost::shared_ptr<MyJob> job2(new MyJob(" "));
55     boost::shared_ptr<MyJob> job3(new MyJob("World"));
56     boost::shared_ptr<MyJob> job4(new MyJob("\n"));
57     scheduler.add_dependency(job4, job1);
58     scheduler.add_dependency(job4, job2);
59     scheduler.add_dependency(job4, job3);
60     scheduler.submit(job4);
61     scheduler.wait();
62
63     \endcode
64
65     The Scheduler sends jobs to the workers taking into account
66     dependencies, priorities and the order in which the Scheduler has
67     seen the Jobs. If the Jobs that are ready to run, i.e., all jobs
68     it depends on have completed, the Scheduler choose the Job with
69     highest priority. If two Jobs have the same priority, the
70     Scheduler sends them in the order of appearance, i.e., if the
71     Scheduler saw Job X before Job Y, Job X is run before Job X.
72
73     \note In the current implementation all submitted jobs have
74     to be completed before the Scheduler goes out of scope, which can
75     be accomplished by calling wait() or interrupt().
76
77     \note Currently, the Scheduler cannot be recycled, i.e.,
78     if wait(void) or interruped(void) have been called, the behaviour of
79     submit() is undefined.
80
81     \since New in yat 0.13
82   */
83  class Scheduler
84  {
85  public:
86    /**
87       Base class that defines the interface for a Job
88     */
89    class Job
90    {
91    public:
92      /**
93         \brief constructor
94       */
95      explicit Job(unsigned int prio=0);
96
97      /**
98         \brief destructor
99       */
100      virtual ~Job(void);
101
102      /**
103         Jobs with greater priority are run before Jobs with less priority.
104       */
105      unsigned int priority(void) const;
106
107      /**
108         This function defines the work done in thread.
109       */
110      virtual void operator()(void)=0;
111    private:
112      friend class Scheduler;
113      friend class JobHandler;
114      mutable boost::mutex mutex_;
115      void add_prerequisite(const boost::shared_ptr<Job>&);
116      // \brief remove job from list of prerequisite
117      // \return number of prerequisite (after removal)
118      size_t remove_prerequisite(const boost::shared_ptr<Job>&);
119      void add_observer(const boost::shared_ptr<Job>&);
120
121      // set of jobs that have to finish before this can run
122      std::set<boost::shared_ptr<Job> > prerequisite_;
123      // jobs that have *this as prerequisite
124      std::vector<boost::shared_ptr<Job> > observers_;
125      // - pristine is what it says
126      // - prepared - job has been either submitted directly a job that
127      // depends on job has been submitted et.c.
128      // - running - job has been sent to queue for workers to chew on
129      // - completed - job has returned
130      enum status { pristine, prepared, running, completed};
131      status status_;
132      unsigned priority_;
133      unsigned id_;
134      boost::exception_ptr error_;
135    }; // end class Job
136
137    /**
138       \brief constructor
139
140       \param threads number of threads that are used
141     */
142    Scheduler(unsigned int threads);
143
144    /**
145       \brief add a dependency rule
146
147       Add a dependency that Job \a prerequisite has to complete
148       before Job \a job is run.
149
150       Note, job cannot have been submitted, neither directly or
151       by being prerequisite of a submitted job.
152     */
153    void add_dependency(boost::shared_ptr<Job> job,
154                        boost::shared_ptr<Job> prerequisite);
155    /**
156       \brief interrrupt all jobs
157     */
158    void interrupt(void);
159
160    /**
161       \return Number of jobs that are either running or queued, i.e.,
162       jobs that are waiting for a dependency are not counted.
163
164       \since New in yat 0.15
165     */
166    size_t jobs(void) const;
167
168    /**
169       \brief submit a \a job to Scheduler
170
171       If \a job depends on other jobs, they are also submitted to the
172       Scheduler.
173
174       \note If wait() or interrupt() have been called, the behaviour of
175       submit() is undefined.
176     */
177    void submit(const boost::shared_ptr<Job>& job);
178
179    /**
180       \brief wait for all jobs to finish
181     */
182    void wait(void);
183
184  private:
185    typedef boost::shared_ptr<Scheduler::Job> JobPtr;
186
187    struct LowerPriority
188    {
189      bool operator()(const JobPtr& lhs, const JobPtr& rhs) const;
190    };
191
192    // some typedefs for convenience
193    typedef PriorityQueue<JobPtr, LowerPriority> JobQueue;
194
195    // \internal class that does the job
196    //
197    // It processes any job that is pushed to the \a queue until a
198    // NULL Job shows up which signals that the work is done. When
199    // NULL is observed the NULL Job is pushed to the queue so
200    // co-workers are notified too.
201    class Worker
202    {
203    public:
204      Worker(JobQueue& queue, Queue<JobPtr>& completed);
205      void operator()(void);
206    private:
207      JobQueue& queue_;
208      Queue<JobPtr>& completed_;
209    }; // end class Worker
210
211
212    class JobHandlerData
213    {
214    public:
215      /// thread-safe class around int
216      class Count
217      {
218      public:
219        /// Constructor
220        explicit Count(int x=0);
221        /// increase value with 1
222        void decrement(void);
223        /// return value
224        int get(void) const;
225        /// decrease value with 1
226        void increment(void);
227        /// modify value
228        void set(int x);
229      private:
230        mutable boost::mutex mutex_;
231        int x_;
232      };
233
234      JobHandlerData(unsigned int threads);
235      Queue<boost::exception_ptr>& error(void) const;
236
237      const Queue<JobPtr>& jobs(void) const;
238      Queue<JobPtr>& jobs(void);
239      const JobQueue& queue(void) const;
240      JobQueue& queue(void);
241
242      const Count& job_count(void) const;
243      Count& job_count(void);
244
245      const Count& running_jobs(void) const;
246      Count& running_jobs(void);
247
248      const Count& threads(void) const;
249      Count& threads(void);
250    private:
251      mutable Queue<boost::exception_ptr> error_;
252      Queue<JobPtr> jobs_;
253      JobQueue queue_;
254
255      Count job_count_;
256      Count running_jobs_;
257      Count threads_;
258    };
259
260
261    // \internal Class that handles job
262    class JobHandler
263    {
264    public:
265      JobHandler(JobHandlerData& data);
266
267      void operator()(void);
268    private:
269      void process(JobPtr& job);
270
271      // If job is ready to be submitted i.e. all prerequisite have
272      // completed, then submit job to queue for workers to chew on.
273      void prepare(JobPtr job);
274      // handle jobs returned from worker
275      void post_process(JobPtr job);
276
277      void send2queue(JobPtr& job);
278
279      JobHandlerData* data_;
280    };
281
282
283    // function called when job has finished and returned from
284    // worker. If there are any jobs that depend on \a job, those jobs
285    // are notified and if it makes them ready to be processed they
286    // are sent to queue.
287    void post_process(boost::shared_ptr<Job> job);
288
289    // If \a job has parent jobs, which need to finish first, update
290    // map children_ to reflect that. If all parents have finished,
291    // send job to queue.
292    void process(boost::shared_ptr<Job> job);
293
294    // send job to queue
295    void queue(boost::shared_ptr<Job> job);
296
297    // If an error has been detected (and stored in error_), rethrow it.
298    void throw_if_error(void) const;
299
300    JobHandlerData data_;
301    boost::thread job_handler_;
302  }; // end class Scheduler
303
304}}}
305#endif
Note: See TracBrowser for help on using the repository browser.