source: trunk/yat/utility/Scheduler.cc @ 3407

Last change on this file since 3407 was 3407, checked in by Peter, 8 years ago

typo in comment

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 4.4 KB
Line 
1// $Id: Scheduler.cc 3407 2015-04-07 04:04:32Z peter $
2
3/*
4  Copyright (C) 2014, 2015 Peter Johansson
5
6  This file is part of the yat library, http://dev.thep.lu.se/yat
7
8  The yat library is free software; you can redistribute it and/or
9  modify it under the terms of the GNU General Public License as
10  published by the Free Software Foundation; either version 3 of the
11  License, or (at your option) any later version.
12
13  The yat library is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  General Public License for more details.
17
18  You should have received a copy of the GNU General Public License
19  along with yat. If not, see <http://www.gnu.org/licenses/>.
20*/
21
22#include <config.h>
23
24#include "Scheduler.h"
25
26#include <boost/exception.hpp>
27
28#include <cassert>
29
30namespace theplu {
31namespace yat {
32namespace utility {
33
34  Scheduler::Scheduler(unsigned int threads)
35    : running_jobs_(0), job_counter_(0)
36  {
37    assert(threads);
38    for (size_t i=0; i<threads; ++i)
39      workers_.create_thread(Worker(queue_, completed_));
40  }
41
42
43  void Scheduler::add_dependency(boost::shared_ptr<Job> job,
44                                 boost::shared_ptr<Job> prerequisite)
45  {
46    assert(job->status_ == Job::pristine);
47    job->prerequisite_.insert(prerequisite);
48    prerequisite->observers_.push_back(job);
49  }
50
51
52  void Scheduler::interrupt(void)
53  {
54    workers_.interrupt_all();
55  }
56
57
58  void Scheduler::post_process(JobPtr job)
59  {
60    --running_jobs_;
61    assert(running_jobs_>=0);
62    job->status_ = Job::completed;
63
64    // check if an error occurred
65    if (job->error_) {
66      interrupt();
67      boost::rethrow_exception(job->error_);
68    }
69
70    // for convenience
71    const std::vector<JobPtr>& vec = job->observers_;
72    // notify observers, jobs that have 'job' as prerequisite
73    for (size_t i=0; i<vec.size(); ++i) {
74      vec[i]->prerequisite_.erase(job);
75      if (vec[i]->prerequisite_.empty())
76        queue(vec[i]);
77    }
78  }
79
80
81  void Scheduler::process(JobPtr job)
82  {
83    if (job->status_!=Job::pristine)
84      return;
85    job->status_ = Job::prepared;
86    job->id_ = job_counter_;
87    ++job_counter_;
88
89    // If we have prerequisite that need to be run first, process them
90
91    typedef std::set<JobPtr>::iterator iterator;
92    for (iterator j=job->prerequisite_.begin(); j!=job->prerequisite_.end();) {
93      if ((*j)->status_ == Job::completed)
94        job->prerequisite_.erase(j++);
95      else {
96        process(*j);
97        ++j;
98      }
99    }
100
101    // If all prerequisite are finished, send job to queue
102    if (job->prerequisite_.empty()) {
103      queue(job);
104    }
105  }
106
107
108  void Scheduler::queue(JobPtr job)
109  {
110    job->status_ = Job::running;
111    ++running_jobs_;
112    assert(running_jobs_>0);
113    queue_.push(job);
114  }
115
116
117  void Scheduler::submit(JobPtr job)
118  {
119    // If any job has been completed since last submission,
120    // post-process the completed job
121    JobPtr completed_job;
122    while (completed_.try_pop(completed_job))
123      post_process(completed_job);
124    process(job);
125  }
126
127
128  void Scheduler::wait(void)
129  {
130    while (running_jobs_ || completed_.size()) {
131      JobPtr job;
132      completed_.pop(job);
133      post_process(job);
134    }
135
136    // kill workers
137    boost::shared_ptr<Job> end;
138    queue_.push(end);
139
140    // wait for workers to finish
141    workers_.join_all();
142  }
143
144
145  // Scheduler::Job
146
147  Scheduler::Job::Job(unsigned int prio)
148    : status_(pristine), priority_(prio), id_(0) {}
149
150
151  Scheduler::Job::~Job(void)
152  {}
153
154
155  unsigned int Scheduler::Job::priority(void) const
156  {
157    return priority_;
158  }
159
160
161  bool Scheduler::LowerPriority::operator()(const JobPtr& lhs,
162                                            const JobPtr& rhs) const
163  {
164    if (rhs.get() == NULL)
165      return false;
166    if (lhs.get() == NULL)
167      return true;
168
169    if (lhs->priority() != rhs->priority())
170      return lhs->priority() < rhs->priority();
171
172    return lhs->id_ > rhs->id_;
173  }
174
175
176  // Scheduler::Worker
177
178  Scheduler::Worker::Worker(JobQueue& q, Queue<JobPtr>& c)
179    : queue_(q), completed_(c)
180  {
181  }
182
183
184  void Scheduler::Worker::operator()(void)
185  {
186    while (true) {
187      boost::shared_ptr<Job> job;
188      // get next job
189      queue_.pop(job);
190      // NULL job indicates poison pill
191      if (job.get()==NULL) {
192        // kill other workers too
193        queue_.push(job);
194        break;
195      }
196      // action
197      try {
198        (*job)();
199      }
200      catch (...) {
201        job->error_ = boost::current_exception();
202        // return job to scheduler so it can act on the error
203        completed_.push(job);
204        // exit work and go home
205        return;
206      }
207      // return job to scheduler
208      completed_.push(job);
209
210      // Make sure we can be interrupted
211      boost::this_thread::interruption_point();
212    }
213  }
214
215}}}
Note: See TracBrowser for help on using the repository browser.