source: branches/0.16-stable/yat/utility/Scheduler.cc @ 3808

Last change on this file since 3808 was 3808, checked in by Peter, 3 years ago

Move the data member of the JobHandler? into its own data class,
JobHandlerData?, which is thread safe. This class is now owned by the
Scheduler and the JobHandler? has access via a pointer. This fixes #924
as the class is thread safe and since the sharing between Scheduler
and the JobHandler? class is now more complete also allow future
additions of the interface such as mentions in tickets:

refs #899, #910, #914 and #916.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 10.0 KB
Line 
1// $Id: Scheduler.cc 3808 2019-07-04 23:46:34Z peter $
2
3/*
4  Copyright (C) 2014, 2015, 2017 Peter Johansson
5
6  This file is part of the yat library, http://dev.thep.lu.se/yat
7
8  The yat library is free software; you can redistribute it and/or
9  modify it under the terms of the GNU General Public License as
10  published by the Free Software Foundation; either version 3 of the
11  License, or (at your option) any later version.
12
13  The yat library is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  General Public License for more details.
17
18  You should have received a copy of the GNU General Public License
19  along with yat. If not, see <http://www.gnu.org/licenses/>.
20*/
21
22#include <config.h>
23
24#include "Scheduler.h"
25
26#include <boost/exception/all.hpp>
27
28#include <cassert>
29
30namespace theplu {
31namespace yat {
32namespace utility {
33
34  Scheduler::Scheduler(unsigned int threads)
35    : data_(threads), job_handler_(JobHandler(data_))
36  {
37    assert(threads);
38  }
39
40
41  void Scheduler::add_dependency(boost::shared_ptr<Job> job,
42                                 boost::shared_ptr<Job> prerequisite)
43  {
44    assert(job->status_ == Job::pristine);
45    if (prerequisite->status_ != Job::completed) {
46      job->add_prerequisite(prerequisite);
47      prerequisite->add_observer(job);
48    }
49    throw_if_error();
50  }
51
52
53  void Scheduler::interrupt(void)
54  {
55    // first signal to JobHandler that Scheduler is waiting
56    boost::shared_ptr<Job> end;
57    data_.jobs().push(end);
58
59    job_handler_.interrupt();
60    throw_if_error();
61  }
62
63
64  size_t Scheduler::jobs(void) const
65  {
66    throw_if_error();
67    return data_.running_jobs().get() + data_.queue().size();
68  }
69
70
71  void Scheduler::submit(const JobPtr& job)
72  {
73    throw_if_error();
74    data_.jobs().push(job);
75  }
76
77
78  void Scheduler::throw_if_error(void) const
79  {
80    boost::exception_ptr error;
81    if (data_.error().try_pop(error))
82      boost::rethrow_exception(error);
83  }
84
85
86  // This function (or interrupt) has to be called before Scheduler
87  // goes out of scope, which triggers the idea to let destructor call
88  // ::wait(). However, wait() might very well throw, and according to
89  // this article
90  // (http://bin-login.name/ftp/pub/docs/programming_languages/cpp/cffective_cpp/MAGAZINE/SU_FRAME.HTM#destruct)
91  // from Herb Sutter basically saying that destructors that throw are
92  // evil.
93  void Scheduler::wait(void)
94  {
95    // first signal to JobHandler that Scheduler is waiting
96    boost::shared_ptr<Job> end;
97    data_.jobs().push(end);
98
99    // wait for job handler to finish
100    job_handler_.join();
101    throw_if_error();
102  }
103
104
105  // Scheduler::Job
106
107  Scheduler::Job::Job(unsigned int prio)
108    : status_(pristine), priority_(prio), id_(0) {}
109
110
111  Scheduler::Job::~Job(void)
112  {}
113
114
115  unsigned int Scheduler::Job::priority(void) const
116  {
117    return priority_;
118  }
119
120
121  bool Scheduler::LowerPriority::operator()(const JobPtr& lhs,
122                                            const JobPtr& rhs) const
123  {
124    if (rhs.get() == NULL)
125      return false;
126    if (lhs.get() == NULL)
127      return true;
128
129    if (lhs->priority() != rhs->priority())
130      return lhs->priority() < rhs->priority();
131
132    return lhs->id_ > rhs->id_;
133  }
134
135
136  void Scheduler::Job::add_prerequisite(const JobPtr& prereq)
137  {
138    boost::unique_lock<boost::mutex> lock(mutex_);
139    prerequisite_.insert(prereq);
140  }
141
142
143  size_t Scheduler::Job::remove_prerequisite(const JobPtr& prereq)
144  {
145    boost::unique_lock<boost::mutex> lock(mutex_);
146    prerequisite_.erase(prereq);
147    return prerequisite_.size();
148  }
149
150
151  void Scheduler::Job::add_observer(const JobPtr& job)
152  {
153    boost::unique_lock<boost::mutex> lock(mutex_);
154    observers_.push_back(job);
155  }
156
157
158  // Scheduler::Worker
159
160  Scheduler::Worker::Worker(JobQueue& q, Queue<JobPtr>& c)
161    : queue_(q), completed_(c)
162  {
163  }
164
165
166  void Scheduler::Worker::operator()(void)
167  {
168    while (true) {
169      boost::shared_ptr<Job> job;
170      // get next job
171      queue_.pop(job);
172      // NULL job indicates poison pill
173      if (job.get()==NULL) {
174        // kill other workers too
175        queue_.push(job);
176        break;
177      }
178      // action
179      try {
180        (*job)();
181      }
182      catch (...) {
183        job->error_ = boost::current_exception();
184        // return job to scheduler so it can act on the error
185        completed_.push(job);
186        // exit work and go home
187        return;
188      }
189      // return job to scheduler
190      completed_.push(job);
191
192      // Make sure we can be interrupted
193      boost::this_thread::interruption_point();
194    }
195  }
196
197
198  // Scheduler::JobHandlerData
199  Scheduler::JobHandlerData::JobHandlerData(unsigned int threads)
200    : job_count_(0), running_jobs_(0), threads_(threads)
201  {}
202
203
204  Queue<boost::exception_ptr>&
205  Scheduler::JobHandlerData::error(void) const
206  {
207    return error_;
208  }
209
210
211  const Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void) const
212  {
213    return jobs_;
214  }
215
216
217  Queue<Scheduler::JobPtr>& Scheduler::JobHandlerData::jobs(void)
218  {
219    return jobs_;
220  }
221
222
223  const Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void) const
224  {
225    return queue_;
226  }
227
228
229  Scheduler::JobQueue& Scheduler::JobHandlerData::queue(void)
230  {
231    return queue_;
232  }
233
234
235  const Scheduler::JobHandlerData::Count&
236  Scheduler::JobHandlerData::job_count(void) const
237  {
238    return job_count_;
239  }
240
241
242  Scheduler::JobHandlerData::Count&
243  Scheduler::JobHandlerData::job_count(void)
244  {
245    return job_count_;
246  }
247
248
249  const Scheduler::JobHandlerData::Count&
250  Scheduler::JobHandlerData::running_jobs(void) const
251  {
252    return running_jobs_;
253  }
254
255
256  Scheduler::JobHandlerData::Count&
257  Scheduler::JobHandlerData::running_jobs(void)
258  {
259    return running_jobs_;
260  }
261
262
263  const Scheduler::JobHandlerData::Count&
264  Scheduler::JobHandlerData::threads(void) const
265  {
266    return threads_;
267  }
268
269
270  Scheduler::JobHandlerData::Count&
271  Scheduler::JobHandlerData::threads(void)
272  {
273    return threads_;
274  }
275
276
277  // Scheduler::JobHandlerData::Count
278  Scheduler::JobHandlerData::Count::Count(int x)
279    : x_(x)
280  {
281  }
282
283
284  void Scheduler::JobHandlerData::Count::decrement(void)
285  {
286    boost::unique_lock<boost::mutex> lock(mutex_);
287    --x_;
288  }
289
290
291  int Scheduler::JobHandlerData::Count::get(void) const
292  {
293    boost::unique_lock<boost::mutex> lock(mutex_);
294    return x_;
295  }
296
297
298  void Scheduler::JobHandlerData::Count::increment(void)
299  {
300    boost::unique_lock<boost::mutex> lock(mutex_);
301    ++x_;
302  }
303
304
305  void Scheduler::JobHandlerData::Count::set(int x)
306  {
307    boost::unique_lock<boost::mutex> lock(mutex_);
308    x_ = x;
309  }
310
311
312  // Scheduler::JobHandler
313
314  Scheduler::JobHandler::JobHandler(Scheduler::JobHandlerData& data)
315    : data_(&data)
316  {}
317
318
319
320  void Scheduler::JobHandler::post_process(JobPtr job)
321  {
322    assert(job);
323    assert(data_);
324    data_->running_jobs().decrement();
325    assert(data_->running_jobs().get() >= 0);
326    job->status_ = Job::completed;
327
328    if (job->error_) {
329      data_->error().push(job->error_);
330      return;
331    }
332
333    // status of job is now 'completed', so Scheduler::add_dependency
334    // will not interfere with job->observers_
335
336    // for convenience
337    std::vector<JobPtr>& observers = job->observers_;
338    // notify observers, jobs that have 'job' as prerequisite
339    for (size_t i=0; i<observers.size(); ++i)
340      // If 'job' was the last blocking prerequisite and observer has
341      // been prepared (submitted directly or indirectly), then send
342      // the observer to queue.
343      if (!observers[i]->remove_prerequisite(job) &&
344          observers[i]->status_ != Job::pristine) {
345        assert(observers[i]->status_ == Job::prepared);
346        send2queue(observers[i]);
347      }
348  }
349
350
351  void Scheduler::JobHandler::prepare(JobPtr job)
352  {
353    assert(job->status_ == Job::pristine);
354    job->status_ = Job::prepared;
355    job->id_ = data_->job_count().get();
356    data_->job_count().increment();
357
358    // If job have prerequisite that need to be run first, process them
359
360    // job->prerequisite_ should be safe to access here without any
361    // locking. In Scheduler it's only access by add_dependency and
362    // it's illegal to 'add_dependency' for a job that is not
363    // pristine.
364
365    typedef std::set<JobPtr>::iterator iterator;
366    for (iterator j=job->prerequisite_.begin(); j!=job->prerequisite_.end();) {
367      if ((*j)->status_ == Job::pristine) {
368        prepare(*j);
369        ++j;
370      }
371      else if ((*j)->status_ == Job::completed)
372        job->prerequisite_.erase(j++);
373      else {
374        ++j;
375      }
376    }
377
378    // If all prerequisite are finished, send job to queue
379    if (job->prerequisite_.empty())
380      send2queue(job);
381  }
382
383
384  void Scheduler::JobHandler::send2queue(JobPtr& job)
385  {
386    job->status_ = Job::running;
387    data_->running_jobs().increment();
388    assert(data_->running_jobs().get() > 0);
389    data_->queue().push(job);
390  }
391
392
393  void Scheduler::JobHandler::process(JobPtr& job)
394  {
395    switch (job->status_) {
396    case(Job::pristine):
397      prepare(job);
398      break;
399    case(Job::running):
400      post_process(job);
401      break;
402    default:
403      assert(0 &&
404             "job either pristine (fr. Scheduler) or running (fr. Worker)");
405    }
406  }
407
408
409  void Scheduler::JobHandler::operator()(void)
410  {
411    assert(data_);
412    boost::thread_group workers;
413    for (size_t i=0; i < data_->threads().get(); ++i)
414      workers.create_thread(Worker(data_->queue(), data_->jobs()));
415    // Process jobs (in jobs_) coming both from Scheduler and
416    // completed jobs from Workers until Scheduler is waiting
417    // (signaled by a null Job), jobs_ is empty and there are no
418    // running jobs.
419
420    // true indicates that Scheduler::wait (or ::interrupt) has been called
421    bool scheduler_is_waiting = false;
422    JobPtr job;
423
424    while (!scheduler_is_waiting || data_->running_jobs().get() ||
425           !data_->jobs().empty()) {
426      data_->jobs().pop(job);
427      if (job)
428        process(job);
429      else
430        scheduler_is_waiting = true;
431
432      // Check for error
433      //
434      // Since we are in a background thread, we cannot throw from
435      // here, instead interrupt workers and return early.
436      if (!data_->error().empty()) {
437        // In case queue is empty, workers might be be stuck
438        // waiting for job queue to pop, then send them a poison pill so
439        // they stop.
440        boost::shared_ptr<Job> end;
441        data_->queue().push(end);
442        // For other cases (queue is not empty) send workers a
443        // interrupt signal and wait for them to wrap up.
444        workers.interrupt_all();
445        data_->running_jobs().set(0);
446        data_->queue().clear();
447        return;
448      }
449
450      // Make sure we can be interrupted
451      boost::this_thread::interruption_point();
452    }
453
454    // kill workers
455    boost::shared_ptr<Job> end;
456    data_->queue().push(end);
457    workers.join_all();
458  }
459
460}}}
Note: See TracBrowser for help on using the repository browser.