source: trunk/yat/utility/Scheduler.cc @ 3346

Last change on this file since 3346 was 3346, checked in by Peter, 8 years ago

first version of Scheduler class. refs #800.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 3.7 KB
Line 
1// $Id: Scheduler.cc 3346 2014-11-06 13:16:34Z peter $
2
3/*
4  Copyright (C) 2014 Peter Johansson
5
6  This file is part of the yat library, http://dev.thep.lu.se/yat
7
8  The yat library is free software; you can redistribute it and/or
9  modify it under the terms of the GNU General Public License as
10  published by the Free Software Foundation; either version 3 of the
11  License, or (at your option) any later version.
12
13  The yat library is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  General Public License for more details.
17
18  You should have received a copy of the GNU General Public License
19  along with yat. If not, see <http://www.gnu.org/licenses/>.
20*/
21
22#include <config.h>
23
24#include "Scheduler.h"
25
26#include <cassert>
27
28namespace theplu {
29namespace yat {
30namespace utility {
31
32  Scheduler::Scheduler(unsigned int threads)
33  {
34    assert(threads);
35    for (size_t i=0; i<threads; ++i)
36      workers_.create_thread(Worker(queue_, completed_));
37  }
38
39
40  void Scheduler::launch(void)
41  {
42    while (!jobs_.empty() || !running_jobs_.empty()) {
43      while (!jobs_.empty()) {
44        process(jobs_.front());
45        jobs_.pop_front();
46      }
47      JobPtr job;
48      completed_.pop(job);
49      post_process(job);
50    }
51
52    // kill workers
53    boost::shared_ptr<Job> end;
54    queue_.push(end);
55
56    // wait for workers to finish
57    workers_.join_all();
58  }
59
60
61  void Scheduler::post_process(JobPtr job)
62  {
63    running_jobs_.erase(job);
64    job->status_ = Job::completed;
65    // Take care of sub-jobs
66    for (size_t i=0; i<job->sub_jobs_.size(); ++i) {
67      process(job->sub_jobs_[i]);
68    }
69    // check if there are jobs waiting for job
70    std::map<JobPtr, std::vector<JobPtr> >::iterator it = children_.find(job);
71    if (it == children_.end())
72      return;
73    // for convenience
74    const std::vector<JobPtr>& vec = it->second;
75    for (size_t i=0; i<vec.size(); ++i) {
76      vec[i]->parents_.erase(job);
77      if (vec[i]->parents_.empty())
78        queue(vec[i]);
79    }
80
81    // entry no longer needed
82    children_.erase(it);
83  }
84
85
86  void Scheduler::process(JobPtr job)
87  {
88    if (job->status_!=Job::pristine)
89      return;
90    job->status_ = Job::prepared;
91
92    // If we have parents that need to be run first, well process them
93
94    typedef std::set<JobPtr>::iterator iterator;
95    for (iterator j = job->parents_.begin(); j!=job->parents_.end(); ) {
96      if ((*j)->status_ == Job::completed)
97        job->parents_.erase(j++);
98      else {
99        process(*j);
100        // record that job is a child of j, so we can notify job when
101        // j has completed
102        children_[*j].push_back(job);
103        ++j;
104      }
105    }
106
107    // If job has no parents (or all parents are completed) send job toi queue
108    if (job->parents_.empty()) {
109      queue(job);
110    }
111  }
112
113
114  void Scheduler::queue(JobPtr job)
115  {
116    job->status_ = Job::running;
117    running_jobs_.insert(job);
118    queue_.push(job);
119  }
120
121
122  void Scheduler::submit(JobPtr job)
123  {
124    jobs_.push_back(job);
125  }
126
127
128  // Scheduler::Job
129
130  Scheduler::Job::Job(void)
131    : status_(pristine) {}
132
133
134  Scheduler::Job::~Job(void)
135  {}
136
137
138  void Scheduler::Job::add_dependency(boost::shared_ptr<Job> job)
139  {
140    parents_.insert(job);
141  }
142
143
144  void Scheduler::Job::submit(boost::shared_ptr<Job> job)
145  {
146    sub_jobs_.push_back(job);
147  }
148
149
150  // Scheduler::Worker
151
152  Scheduler::Worker::Worker(Queue<boost::shared_ptr<Job> >& q,
153                            Queue<boost::shared_ptr<Job> >& c)
154    : queue_(q), completed_(c)
155  {
156  }
157
158
159  void Scheduler::Worker::operator()(void)
160  {
161    while (true) {
162      boost::shared_ptr<Job> job;
163      // get next job
164      queue_.pop(job);
165      // NULL job indicates poison pill
166      if (job.get()==NULL) {
167        // kill other workers too
168        queue_.push(job);
169        break;
170      }
171      // action
172      (*job)();
173      // return job to scheduler
174      completed_.push(job);
175
176      // Make sure we can be interrupted
177      boost::this_thread::interruption_point();
178    }
179  }
180
181}}}
Note: See TracBrowser for help on using the repository browser.