Changeset 3401
- Timestamp:
- Mar 31, 2015, 2:56:34 AM (8 years ago)
- Location:
- trunk
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/test/scheduler.cc
r3348 r3401 62 62 scheduler.submit(sleeper[1]); 63 63 sleeper.push_back(boost::make_shared<Sleeper>(2)); 64 s leeper.back()->add_dependency(sleeper[0]);64 scheduler.add_dependency(sleeper[2], sleeper[0]); 65 65 sleeper.push_back(boost::make_shared<Sleeper>(3)); 66 66 67 67 // not strictly needed since sleeper2 depends on sleeper0, 68 68 // but... this is a test 69 s leeper.back()->add_dependency(sleeper[0]);70 s leeper.back()->add_dependency(sleeper[1]);71 s leeper.back()->add_dependency(sleeper[2]);69 scheduler.add_dependency(sleeper[3], sleeper[0]); 70 scheduler.add_dependency(sleeper[3], sleeper[1]); 71 scheduler.add_dependency(sleeper[3], sleeper[2]); 72 72 73 73 scheduler.submit(sleeper[3]); -
trunk/yat/utility/Scheduler.cc
r3349 r3401 39 39 40 40 41 void Scheduler::add_dependency(boost::shared_ptr<Job> job, 42 boost::shared_ptr<Job> prerequisite) 43 { 44 assert(job->status_ == Job::pristine); 45 job->prerequisite_.insert(prerequisite); 46 prerequisite->observers_.push_back(job); 47 } 48 49 41 50 void Scheduler::post_process(JobPtr job) 42 51 { … … 44 53 assert(running_jobs_>=0); 45 54 job->status_ = Job::completed; 46 // check if there are jobs waiting for job47 std::map<JobPtr, std::vector<JobPtr> >::iterator it = children_.find(job);48 if (it == children_.end())49 return;50 55 // for convenience 51 const std::vector<JobPtr>& vec = it->second; 56 const std::vector<JobPtr>& vec = job->observers_; 57 // notify obervers, jobs that have 'job' as prerequisite 52 58 for (size_t i=0; i<vec.size(); ++i) { 53 vec[i]->p arents_.erase(job);54 if (vec[i]->p arents_.empty())59 vec[i]->prerequisite_.erase(job); 60 if (vec[i]->prerequisite_.empty()) 55 61 queue(vec[i]); 56 62 } 57 58 // entry no longer needed59 children_.erase(it);60 63 } 61 64 … … 67 70 job->status_ = Job::prepared; 68 71 69 // If we have p arents that need to be run first, wellprocess them72 // If we have prerequisite that need to be run first, process them 70 73 71 74 typedef std::set<JobPtr>::iterator iterator; 72 for (iterator j = job->parents_.begin(); j!=job->parents_.end();) {75 for (iterator j=job->prerequisite_.begin(); j!=job->prerequisite_.end();) { 73 76 if ((*j)->status_ == Job::completed) 74 job->p arents_.erase(j++);77 job->prerequisite_.erase(j++); 75 78 else { 76 79 process(*j); 77 // record that job is a child of j, so we can notify job when78 // j has completed79 children_[*j].push_back(job);80 80 ++j; 81 81 } 82 82 } 83 83 84 // If job has no parents (or all parents are completed)send job to queue85 if (job->p arents_.empty()) {84 // If all prerequisite are finished, send job to queue 85 if (job->prerequisite_.empty()) { 86 86 queue(job); 87 87 } … … 136 136 137 137 138 void Scheduler::Job::add_dependency(boost::shared_ptr<Job> job)139 {140 assert(status_==pristine);141 parents_.insert(job);142 }143 144 145 138 // Scheduler::Worker 146 139 -
trunk/yat/utility/Scheduler.h
r3349 r3401 50 50 Scheduler scheduler(2); 51 51 boost::shared_ptr<MyJob> job1(new MyJob("Hello")); 52 boost::shared_ptr<MyJob> job 1(new MyJob(" "));52 boost::shared_ptr<MyJob> job2(new MyJob(" ")); 53 53 boost::shared_ptr<MyJob> job3(new MyJob("World")); 54 54 boost::shared_ptr<MyJob> job4(new MyJob("\n")); 55 job4.add_dependency(job1);56 job4.add_dependency(job2);57 job4.add_dependency(job3);55 scheduler.add_dependency(job4, job1); 56 scheduler.add_dependency(job4, job2); 57 scheduler.add_dependency(job4, job3); 58 58 scheduler.submit(job4); 59 59 scheduler.launch(); … … 81 81 82 82 /** 83 \brief add a dependency84 85 This job will not be processed until \a job has finished.86 */87 void add_dependency(boost::shared_ptr<Job> job);88 89 /**90 83 This function defines the work done in thread. 91 84 */ … … 94 87 friend class Scheduler; 95 88 // set of jobs that have to finish before this can run 96 std::set<boost::shared_ptr<Job> > parents_; 89 std::set<boost::shared_ptr<Job> > prerequisite_; 90 // jobs that have *this as prerequisite 91 std::vector<boost::shared_ptr<Job> > observers_; 97 92 enum status { pristine, prepared, running, completed}; 98 93 status status_; … … 105 100 */ 106 101 Scheduler(unsigned int threads); 102 103 /** 104 \brief add a dependency rule 105 106 Add a dependency that Job \a prerequisite has to complete 107 before Job \a job is run. 108 */ 109 void add_dependency(boost::shared_ptr<Job> job, 110 boost::shared_ptr<Job> prerequisite); 107 111 108 112 /** … … 138 142 139 143 // function called when job has finished and returned from 140 // worker. If there are any jobs that depend s on \a jobthose jobs144 // worker. If there are any jobs that depend on \a job, those jobs 141 145 // are notified and if it makes them ready to be processed they 142 146 // are sent to queue. … … 154 158 int running_jobs_; 155 159 156 // value lists all jobs that depend on key, i.e., key has to157 // finish before jobs in value can start158 std::map<JobPtr, std::vector<JobPtr> > children_;159 160 160 Queue<boost::shared_ptr<Job> > queue_; 161 161 Queue<boost::shared_ptr<Job> > completed_;
Note: See TracChangeset
for help on using the changeset viewer.