source: trunk/yat/utility/Scheduler.cc @ 3826

Last change on this file since 3826 was 3826, checked in by Peter, 3 years ago

function to access number of threads in Scheduler. closes #914

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 10.1 KB
Line 
1// $Id: Scheduler.cc 3826 2019-07-18 06:47:27Z peter $
2
3/*
4  Copyright (C) 2014, 2015, 2017, 2019 Peter Johansson
5
6  This file is part of the yat library, http://dev.thep.lu.se/yat
7
8  The yat library is free software; you can redistribute it and/or
9  modify it under the terms of the GNU General Public License as
10  published by the Free Software Foundation; either version 3 of the
11  License, or (at your option) any later version.
12
13  The yat library is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  General Public License for more details.
17
18  You should have received a copy of the GNU General Public License
19  along with yat. If not, see <http://www.gnu.org/licenses/>.
20*/
21
22#include <config.h>
23
24#include "Scheduler.h"
25
26#include <boost/exception/all.hpp>
27
28#include <cassert>
29
30namespace theplu {
31namespace yat {
32namespace utility {
33
34  Scheduler::Scheduler(unsigned int threads)
35    : data_(threads), job_handler_(JobHandler(data_))
36  {
37    assert(threads);
38  }
39
40
41  void Scheduler::add_dependency(boost::shared_ptr<Job> job,
42                                 boost::shared_ptr<Job> prerequisite)
43  {
44    assert(job->status_ == Job::pristine);
45    if (prerequisite->status_ != Job::completed) {
46      job->add_prerequisite(prerequisite);
47      prerequisite->add_observer(job);
48    }
49    throw_if_error();
50  }
51
52
53  void Scheduler::interrupt(void)
54  {
55    // first signal to JobHandler that Scheduler is waiting
56    boost::shared_ptr<Job> end;
57    data_.jobs().push(end);
58
59    job_handler_.interrupt();
60    throw_if_error();
61  }
62
63
64  size_t Scheduler::jobs(void) const
65  {
66    throw_if_error();
67    return data_.running_jobs().get() + data_.queue().size();
68  }
69
70
71  void Scheduler::submit(const JobPtr& job)
72  {
73    throw_if_error();
74    data_.jobs().push(job);
75  }
76
77
78  int Scheduler::threads(void) const
79  {
80    return data_.threads().get();
81  }
82
83
84  void Scheduler::throw_if_error(void) const
85  {
86    boost::exception_ptr error;
87    if (data_.error().try_pop(error))
88      boost::rethrow_exception(error);
89  }
90
91
92  // This function (or interrupt) has to be called before Scheduler
93  // goes out of scope, which triggers the idea to let destructor call
94  // ::wait(). However, wait() might very well throw, and according to
95  // this article
96  // (http://bin-login.name/ftp/pub/docs/programming_languages/cpp/cffective_cpp/MAGAZINE/SU_FRAME.HTM#destruct)
97  // from Herb Sutter basically saying that destructors that throw are
98  // evil.
99  void Scheduler::wait(void)
100  {
101    // first signal to JobHandler that Scheduler is waiting
102    boost::shared_ptr<Job> end;
103    data_.jobs().push(end);
104
105    // wait for job handler to finish
106    job_handler_.join();
107    throw_if_error();
108  }
109
110
111  // Scheduler::Job
112
113  Scheduler::Job::Job(unsigned int prio)
114    : status_(pristine), priority_(prio), id_(0) {}
115
116
117  Scheduler::Job::~Job(void)
118  {}
119
120
121  unsigned int Scheduler::Job::priority(void) const
122  {
123    return priority_;
124  }
125
126
127  bool Scheduler::LowerPriority::operator()(const JobPtr& lhs,
128                                            const JobPtr& rhs) const
129  {
130    if (rhs.get() == NULL)
131      return false;
132    if (lhs.get() == NULL)
133      return true;
134
135    if (lhs->priority() != rhs->priority())
136      return lhs->priority() < rhs->priority();
137
138    return lhs->id_ > rhs->id_;
139  }
140
141
142  void Scheduler::Job::add_prerequisite(const JobPtr& prereq)
143  {
144    boost::unique_lock<boost::mutex> lock(mutex_);
145    prerequisite_.insert(prereq);
146  }
147
148
149  size_t Scheduler::Job::remove_prerequisite(const JobPtr& prereq)
150  {
151    boost::unique_lock<boost::mutex> lock(mutex_);
152    prerequisite_.erase(prereq);
153    return prerequisite_.size();
154  }
155
156
157  void Scheduler::Job::add_observer(const JobPtr& job)
158  {
159    boost::unique_lock<boost::mutex> lock(mutex_);
160    observers_.push_back(job);
161  }
162
163
164  // Scheduler::Worker
165
166  Scheduler::Worker::Worker(JobQueue& q, Queue<JobPtr>& c)
167    : queue_(q), completed_(c)
168  {
169  }
170
171
172  void Scheduler::Worker::operator()(void)
173  {
174    while (true) {
175      boost::shared_ptr<Job> job;
176      // get next job
177      queue_.pop(job);
178      // NULL job indicates poison pill
179      if (job.get()==NULL) {
180        // kill other workers too
181        queue_.push(job);
182        break;
183      }
184      // action
185      try {
186        (*job)();
187      }
188      catch (...) {
189        job->error_ = boost::current_exception();
190        // return job to scheduler so it can act on the error
191        completed_.push(job);
192        // exit work and go home
193        return;
194      }
195      // return job to scheduler
196      completed_.push(job);
197
198      // Make sure we can be interrupted
199      boost::this_thread::interruption_point();
200    }
201  }
202
203
204  // Scheduler::JobHandlerData
205  Scheduler::JobHandlerData::JobHandlerData(unsigned int threads)
206    : job_count_(0), running_jobs_(0), threads_(threads)
207  {}
208
209
210  Queue<boost::exception_ptr>&
211  Scheduler::JobHandlerData::error(void) const
212  {
213    return error_;
214  }
215
216
217  const Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void) const
218  {
219    return jobs_;
220  }
221
222
223  Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void)
224  {
225    return jobs_;
226  }
227
228
229  const Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void) const
230  {
231    return queue_;
232  }
233
234
235  Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void)
236  {
237    return queue_;
238  }
239
240
241  const Scheduler::JobHandlerData::Count&
242  Scheduler::JobHandlerData::job_count(void) const
243  {
244    return job_count_;
245  }
246
247
248  Scheduler::JobHandlerData::Count&
249  Scheduler::JobHandlerData::job_count(void)
250  {
251    return job_count_;
252  }
253
254
255  const Scheduler::JobHandlerData::Count&
256  Scheduler::JobHandlerData::running_jobs(void) const
257  {
258    return running_jobs_;
259  }
260
261
262  Scheduler::JobHandlerData::Count&
263  Scheduler::JobHandlerData::running_jobs(void)
264  {
265    return running_jobs_;
266  }
267
268
269  const Scheduler::JobHandlerData::Count&
270  Scheduler::JobHandlerData::threads(void) const
271  {
272    return threads_;
273  }
274
275
276  Scheduler::JobHandlerData::Count&
277  Scheduler::JobHandlerData::threads(void)
278  {
279    return threads_;
280  }
281
282
283  // Scheduler::JobHandlerData::Count
284  Scheduler::JobHandlerData::Count::Count(int x)
285    : x_(x)
286  {
287  }
288
289
290  void Scheduler::JobHandlerData::Count::decrement(void)
291  {
292    boost::unique_lock<boost::mutex> lock(mutex_);
293    --x_;
294  }
295
296
297  int Scheduler::JobHandlerData::Count::get(void) const
298  {
299    boost::unique_lock<boost::mutex> lock(mutex_);
300    return x_;
301  }
302
303
304  void Scheduler::JobHandlerData::Count::increment(void)
305  {
306    boost::unique_lock<boost::mutex> lock(mutex_);
307    ++x_;
308  }
309
310
311  void Scheduler::JobHandlerData::Count::set(int x)
312  {
313    boost::unique_lock<boost::mutex> lock(mutex_);
314    x_ = x;
315  }
316
317
318  // Scheduler::JobHandler
319
320  Scheduler::JobHandler::JobHandler(Scheduler::JobHandlerData& data)
321    : data_(&data)
322  {}
323
324
325
326  void Scheduler::JobHandler::post_process(JobPtr job)
327  {
328    assert(job);
329    assert(data_);
330    data_->running_jobs().decrement();
331    assert(data_->running_jobs().get() >= 0);
332    job->status_ = Job::completed;
333
334    if (job->error_) {
335      data_->error().push(job->error_);
336      return;
337    }
338
339    // status of job is now 'completed', so Scheduler::add_dependency
340    // will not interfere with job->observers_
341
342    // for convenience
343    std::vector<JobPtr>& observers = job->observers_;
344    // notify observers, jobs that have 'job' as prerequisite
345    for (size_t i=0; i<observers.size(); ++i)
346      // If 'job' was the last blocking prerequisite and observer has
347      // been prepared (submitted directly or indirectly), then send
348      // the observer to queue.
349      if (!observers[i]->remove_prerequisite(job) &&
350          observers[i]->status_ != Job::pristine) {
351        assert(observers[i]->status_ == Job::prepared);
352        send2queue(observers[i]);
353      }
354  }
355
356
357  void Scheduler::JobHandler::prepare(JobPtr job)
358  {
359    assert(job->status_ == Job::pristine);
360    job->status_ = Job::prepared;
361    job->id_ = data_->job_count().get();
362    data_->job_count().increment();
363
364    // If job have prerequisite that need to be run first, process them
365
366    // job->prerequisite_ should be safe to access here without any
367    // locking. In Scheduler it's only access by add_dependency and
368    // it's illegal to 'add_dependency' for a job that is not
369    // pristine.
370
371    typedef std::set<JobPtr>::iterator iterator;
372    for (iterator j=job->prerequisite_.begin(); j!=job->prerequisite_.end();) {
373      if ((*j)->status_ == Job::pristine) {
374        prepare(*j);
375        ++j;
376      }
377      else if ((*j)->status_ == Job::completed)
378        job->prerequisite_.erase(j++);
379      else {
380        ++j;
381      }
382    }
383
384    // If all prerequisite are finished, send job to queue
385    if (job->prerequisite_.empty())
386      send2queue(job);
387  }
388
389
390  void Scheduler::JobHandler::send2queue(JobPtr& job)
391  {
392    job->status_ = Job::running;
393    data_->running_jobs().increment();
394    assert(data_->running_jobs().get() > 0);
395    data_->queue().push(job);
396  }
397
398
399  void Scheduler::JobHandler::process(JobPtr& job)
400  {
401    switch (job->status_) {
402    case(Job::pristine):
403      prepare(job);
404      break;
405    case(Job::running):
406      post_process(job);
407      break;
408    default:
409      assert(0 &&
410             "job either pristine (fr. Scheduler) or running (fr. Worker)");
411    }
412  }
413
414
415  void Scheduler::JobHandler::operator()(void)
416  {
417    assert(data_);
418    boost::thread_group workers;
419    for (int i=0; i < data_->threads().get(); ++i)
420      workers.create_thread(Worker(data_->queue(), data_->jobs()));
421    // Process jobs (in jobs_) coming both from Scheduler and
422    // completed jobs from Workers until Scheduler is waiting
423    // (signaled by a null Job), jobs_ is empty and there are no
424    // running jobs.
425
426    // true indicates that Scheduler::wait (or ::interrupt) has been called
427    bool scheduler_is_waiting = false;
428    JobPtr job;
429
430    while (!scheduler_is_waiting || data_->running_jobs().get() ||
431           !data_->jobs().empty()) {
432      data_->jobs().pop(job);
433      if (job)
434        process(job);
435      else
436        scheduler_is_waiting = true;
437
438      // Check for error
439      //
440      // Since we are in a background thread, we cannot throw from
441      // here, instead interrupt workers and return early.
442      if (!data_->error().empty()) {
443        // In case queue is empty, workers might be be stuck
444        // waiting for job queue to pop, then send them a poison pill so
445        // they stop.
446        boost::shared_ptr<Job> end;
447        data_->queue().push(end);
448        // For other cases (queue is not empty) send workers a
449        // interrupt signal and wait for them to wrap up.
450        workers.interrupt_all();
451        data_->running_jobs().set(0);
452        data_->queue().clear();
453        return;
454      }
455
456      // Make sure we can be interrupted
457      boost::this_thread::interruption_point();
458    }
459
460    // kill workers
461    boost::shared_ptr<Job> end;
462    data_->queue().push(end);
463    workers.join_all();
464  }
465
466}}}
Note: See TracBrowser for help on using the repository browser.