source: trunk/yat/utility/Scheduler.cc @ 3823

Last change on this file since 3823 was 3823, checked in by Peter, 3 years ago

Merge release 0.16 into trunk

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 10.0 KB
Line 
1// $Id: Scheduler.cc 3823 2019-07-16 02:11:48Z peter $
2
3/*
4  Copyright (C) 2014, 2015, 2017, 2019 Peter Johansson
5
6  This file is part of the yat library, http://dev.thep.lu.se/yat
7
8  The yat library is free software; you can redistribute it and/or
9  modify it under the terms of the GNU General Public License as
10  published by the Free Software Foundation; either version 3 of the
11  License, or (at your option) any later version.
12
13  The yat library is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  General Public License for more details.
17
18  You should have received a copy of the GNU General Public License
19  along with yat. If not, see <http://www.gnu.org/licenses/>.
20*/
21
22#include <config.h>
23
24#include "Scheduler.h"
25
26#include <boost/exception/all.hpp>
27
28#include <cassert>
29
30namespace theplu {
31namespace yat {
32namespace utility {
33
34  Scheduler::Scheduler(unsigned int threads)
35    : data_(threads), job_handler_(JobHandler(data_))
36  {
37    assert(threads);
38  }
39
40
41  void Scheduler::add_dependency(boost::shared_ptr<Job> job,
42                                 boost::shared_ptr<Job> prerequisite)
43  {
44    assert(job->status_ == Job::pristine);
45    if (prerequisite->status_ != Job::completed) {
46      job->add_prerequisite(prerequisite);
47      prerequisite->add_observer(job);
48    }
49    throw_if_error();
50  }
51
52
53  void Scheduler::interrupt(void)
54  {
55    // first signal to JobHandler that Scheduler is waiting
56    boost::shared_ptr<Job> end;
57    data_.jobs().push(end);
58
59    job_handler_.interrupt();
60    throw_if_error();
61  }
62
63
64  size_t Scheduler::jobs(void) const
65  {
66    throw_if_error();
67    return data_.running_jobs().get() + data_.queue().size();
68  }
69
70
71  void Scheduler::submit(const JobPtr& job)
72  {
73    throw_if_error();
74    data_.jobs().push(job);
75  }
76
77
78  void Scheduler::throw_if_error(void) const
79  {
80    boost::exception_ptr error;
81    if (data_.error().try_pop(error))
82      boost::rethrow_exception(error);
83  }
84
85
86  // This function (or interrupt) has to be called before Scheduler
87  // goes out of scope, which triggers the idea to let destructor call
88  // ::wait(). However, wait() might very well throw, and according to
89  // this article
90  // (http://bin-login.name/ftp/pub/docs/programming_languages/cpp/cffective_cpp/MAGAZINE/SU_FRAME.HTM#destruct)
91  // from Herb Sutter basically saying that destructors that throw are
92  // evil.
93  void Scheduler::wait(void)
94  {
95    // first signal to JobHandler that Scheduler is waiting
96    boost::shared_ptr<Job> end;
97    data_.jobs().push(end);
98
99    // wait for job handler to finish
100    job_handler_.join();
101    throw_if_error();
102  }
103
104
105  // Scheduler::Job
106
107  Scheduler::Job::Job(unsigned int prio)
108    : status_(pristine), priority_(prio), id_(0) {}
109
110
111  Scheduler::Job::~Job(void)
112  {}
113
114
115  unsigned int Scheduler::Job::priority(void) const
116  {
117    return priority_;
118  }
119
120
121  bool Scheduler::LowerPriority::operator()(const JobPtr& lhs,
122                                            const JobPtr& rhs) const
123  {
124    if (rhs.get() == NULL)
125      return false;
126    if (lhs.get() == NULL)
127      return true;
128
129    if (lhs->priority() != rhs->priority())
130      return lhs->priority() < rhs->priority();
131
132    return lhs->id_ > rhs->id_;
133  }
134
135
136  void Scheduler::Job::add_prerequisite(const JobPtr& prereq)
137  {
138    boost::unique_lock<boost::mutex> lock(mutex_);
139    prerequisite_.insert(prereq);
140  }
141
142
143  size_t Scheduler::Job::remove_prerequisite(const JobPtr& prereq)
144  {
145    boost::unique_lock<boost::mutex> lock(mutex_);
146    prerequisite_.erase(prereq);
147    return prerequisite_.size();
148  }
149
150
151  void Scheduler::Job::add_observer(const JobPtr& job)
152  {
153    boost::unique_lock<boost::mutex> lock(mutex_);
154    observers_.push_back(job);
155  }
156
157
158  // Scheduler::Worker
159
160  Scheduler::Worker::Worker(JobQueue& q, Queue<JobPtr>& c)
161    : queue_(q), completed_(c)
162  {
163  }
164
165
166  void Scheduler::Worker::operator()(void)
167  {
168    while (true) {
169      boost::shared_ptr<Job> job;
170      // get next job
171      queue_.pop(job);
172      // NULL job indicates poison pill
173      if (job.get()==NULL) {
174        // kill other workers too
175        queue_.push(job);
176        break;
177      }
178      // action
179      try {
180        (*job)();
181      }
182      catch (...) {
183        job->error_ = boost::current_exception();
184        // return job to scheduler so it can act on the error
185        completed_.push(job);
186        // exit work and go home
187        return;
188      }
189      // return job to scheduler
190      completed_.push(job);
191
192      // Make sure we can be interrupted
193      boost::this_thread::interruption_point();
194    }
195  }
196
197
198  // Scheduler::JobHandlerData
199  Scheduler::JobHandlerData::JobHandlerData(unsigned int threads)
200    : job_count_(0), running_jobs_(0), threads_(threads)
201  {}
202
203
204  Queue<boost::exception_ptr>&
205  Scheduler::JobHandlerData::error(void) const
206  {
207    return error_;
208  }
209
210
211  const Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void) const
212  {
213    return jobs_;
214  }
215
216
217  Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void)
218  {
219    return jobs_;
220  }
221
222
223  const Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void) const
224  {
225    return queue_;
226  }
227
228
229  Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void)
230  {
231    return queue_;
232  }
233
234
235  const Scheduler::JobHandlerData::Count&
236  Scheduler::JobHandlerData::job_count(void) const
237  {
238    return job_count_;
239  }
240
241
242  Scheduler::JobHandlerData::Count&
243  Scheduler::JobHandlerData::job_count(void)
244  {
245    return job_count_;
246  }
247
248
249  const Scheduler::JobHandlerData::Count&
250  Scheduler::JobHandlerData::running_jobs(void) const
251  {
252    return running_jobs_;
253  }
254
255
256  Scheduler::JobHandlerData::Count&
257  Scheduler::JobHandlerData::running_jobs(void)
258  {
259    return running_jobs_;
260  }
261
262
263  const Scheduler::JobHandlerData::Count&
264  Scheduler::JobHandlerData::threads(void) const
265  {
266    return threads_;
267  }
268
269
270  Scheduler::JobHandlerData::Count&
271  Scheduler::JobHandlerData::threads(void)
272  {
273    return threads_;
274  }
275
276
277  // Scheduler::JobHandlerData::Count
278  Scheduler::JobHandlerData::Count::Count(int x)
279    : x_(x)
280  {
281  }
282
283
284  void Scheduler::JobHandlerData::Count::decrement(void)
285  {
286    boost::unique_lock<boost::mutex> lock(mutex_);
287    --x_;
288  }
289
290
291  int Scheduler::JobHandlerData::Count::get(void) const
292  {
293    boost::unique_lock<boost::mutex> lock(mutex_);
294    return x_;
295  }
296
297
298  void Scheduler::JobHandlerData::Count::increment(void)
299  {
300    boost::unique_lock<boost::mutex> lock(mutex_);
301    ++x_;
302  }
303
304
305  void Scheduler::JobHandlerData::Count::set(int x)
306  {
307    boost::unique_lock<boost::mutex> lock(mutex_);
308    x_ = x;
309  }
310
311
312  // Scheduler::JobHandler
313
314  Scheduler::JobHandler::JobHandler(Scheduler::JobHandlerData& data)
315    : data_(&data)
316  {}
317
318
319
320  void Scheduler::JobHandler::post_process(JobPtr job)
321  {
322    assert(job);
323    assert(data_);
324    data_->running_jobs().decrement();
325    assert(data_->running_jobs().get() >= 0);
326    job->status_ = Job::completed;
327
328    if (job->error_) {
329      data_->error().push(job->error_);
330      return;
331    }
332
333    // status of job is now 'completed', so Scheduler::add_dependency
334    // will not interfere with job->observers_
335
336    // for convenience
337    std::vector<JobPtr>& observers = job->observers_;
338    // notify observers, jobs that have 'job' as prerequisite
339    for (size_t i=0; i<observers.size(); ++i)
340      // If 'job' was the last blocking prerequisite and observer has
341      // been prepared (submitted directly or indirectly), then send
342      // the observer to queue.
343      if (!observers[i]->remove_prerequisite(job) &&
344          observers[i]->status_ != Job::pristine) {
345        assert(observers[i]->status_ == Job::prepared);
346        send2queue(observers[i]);
347      }
348  }
349
350
351  void Scheduler::JobHandler::prepare(JobPtr job)
352  {
353    assert(job->status_ == Job::pristine);
354    job->status_ = Job::prepared;
355    job->id_ = data_->job_count().get();
356    data_->job_count().increment();
357
358    // If job have prerequisite that need to be run first, process them
359
360    // job->prerequisite_ should be safe to access here without any
361    // locking. In Scheduler it's only access by add_dependency and
362    // it's illegal to 'add_dependency' for a job that is not
363    // pristine.
364
365    typedef std::set<JobPtr>::iterator iterator;
366    for (iterator j=job->prerequisite_.begin(); j!=job->prerequisite_.end();) {
367      if ((*j)->status_ == Job::pristine) {
368        prepare(*j);
369        ++j;
370      }
371      else if ((*j)->status_ == Job::completed)
372        job->prerequisite_.erase(j++);
373      else {
374        ++j;
375      }
376    }
377
378    // If all prerequisite are finished, send job to queue
379    if (job->prerequisite_.empty())
380      send2queue(job);
381  }
382
383
384  void Scheduler::JobHandler::send2queue(JobPtr& job)
385  {
386    job->status_ = Job::running;
387    data_->running_jobs().increment();
388    assert(data_->running_jobs().get() > 0);
389    data_->queue().push(job);
390  }
391
392
393  void Scheduler::JobHandler::process(JobPtr& job)
394  {
395    switch (job->status_) {
396    case(Job::pristine):
397      prepare(job);
398      break;
399    case(Job::running):
400      post_process(job);
401      break;
402    default:
403      assert(0 &&
404             "job either pristine (fr. Scheduler) or running (fr. Worker)");
405    }
406  }
407
408
409  void Scheduler::JobHandler::operator()(void)
410  {
411    assert(data_);
412    boost::thread_group workers;
413    for (int i=0; i < data_->threads().get(); ++i)
414      workers.create_thread(Worker(data_->queue(), data_->jobs()));
415    // Process jobs (in jobs_) coming both from Scheduler and
416    // completed jobs from Workers until Scheduler is waiting
417    // (signaled by a null Job), jobs_ is empty and there are no
418    // running jobs.
419
420    // true indicates that Scheduler::wait (or ::interrupt) has been called
421    bool scheduler_is_waiting = false;
422    JobPtr job;
423
424    while (!scheduler_is_waiting || data_->running_jobs().get() ||
425           !data_->jobs().empty()) {
426      data_->jobs().pop(job);
427      if (job)
428        process(job);
429      else
430        scheduler_is_waiting = true;
431
432      // Check for error
433      //
434      // Since we are in a background thread, we cannot throw from
435      // here, instead interrupt workers and return early.
436      if (!data_->error().empty()) {
437        // In case queue is empty, workers might be be stuck
438        // waiting for job queue to pop, then send them a poison pill so
439        // they stop.
440        boost::shared_ptr<Job> end;
441        data_->queue().push(end);
442        // For other cases (queue is not empty) send workers a
443        // interrupt signal and wait for them to wrap up.
444        workers.interrupt_all();
445        data_->running_jobs().set(0);
446        data_->queue().clear();
447        return;
448      }
449
450      // Make sure we can be interrupted
451      boost::this_thread::interruption_point();
452    }
453
454    // kill workers
455    boost::shared_ptr<Job> end;
456    data_->queue().push(end);
457    workers.join_all();
458  }
459
460}}}
Note: See TracBrowser for help on using the repository browser.