Changeset 3401


Ignore:
Timestamp:
Mar 31, 2015, 2:56:34 AM (8 years ago)
Author:
Peter
Message:

refs #800

Avoid using parents and children terminologoy to describe Job
relationships as it was confusing. Move add_dependency function from
class Scheduler::Job to class Scheduler. Likewise move observers to
class Job rather than holding an extra map<Job, observers> in
Scheduler.

Location:
trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/test/scheduler.cc

    r3348 r3401  
    6262  scheduler.submit(sleeper[1]);
    6363  sleeper.push_back(boost::make_shared<Sleeper>(2));
    64   sleeper.back()->add_dependency(sleeper[0]);
     64  scheduler.add_dependency(sleeper[2], sleeper[0]);
    6565  sleeper.push_back(boost::make_shared<Sleeper>(3));
    6666
    6767  // not strictly needed since sleeper2 depends on sleeper0,
    6868  // but... this is a test
    69   sleeper.back()->add_dependency(sleeper[0]);
    70   sleeper.back()->add_dependency(sleeper[1]);
    71   sleeper.back()->add_dependency(sleeper[2]);
     69  scheduler.add_dependency(sleeper[3], sleeper[0]);
     70  scheduler.add_dependency(sleeper[3], sleeper[1]);
     71  scheduler.add_dependency(sleeper[3], sleeper[2]);
    7272
    7373  scheduler.submit(sleeper[3]);
  • trunk/yat/utility/Scheduler.cc

    r3349 r3401  
    3939
    4040
     41  void Scheduler::add_dependency(boost::shared_ptr<Job> job,
     42                                 boost::shared_ptr<Job> prerequisite)
     43  {
     44    assert(job->status_ == Job::pristine);
     45    job->prerequisite_.insert(prerequisite);
     46    prerequisite->observers_.push_back(job);
     47  }
     48
     49
    4150  void Scheduler::post_process(JobPtr job)
    4251  {
     
    4453    assert(running_jobs_>=0);
    4554    job->status_ = Job::completed;
    46     // check if there are jobs waiting for job
    47     std::map<JobPtr, std::vector<JobPtr> >::iterator it = children_.find(job);
    48     if (it == children_.end())
    49       return;
    5055    // for convenience
    51     const std::vector<JobPtr>& vec = it->second;
     56    const std::vector<JobPtr>& vec = job->observers_;
     57    // notify obervers, jobs that have 'job' as prerequisite
    5258    for (size_t i=0; i<vec.size(); ++i) {
    53       vec[i]->parents_.erase(job);
    54       if (vec[i]->parents_.empty())
     59      vec[i]->prerequisite_.erase(job);
     60      if (vec[i]->prerequisite_.empty())
    5561        queue(vec[i]);
    5662    }
    57 
    58     // entry no longer needed
    59     children_.erase(it);
    6063  }
    6164
     
    6770    job->status_ = Job::prepared;
    6871
    69     // If we have parents that need to be run first, well process them
     72    // If we have prerequisite that need to be run first, process them
    7073
    7174    typedef std::set<JobPtr>::iterator iterator;
    72     for (iterator j = job->parents_.begin(); j!=job->parents_.end(); ) {
     75    for (iterator j=job->prerequisite_.begin(); j!=job->prerequisite_.end();) {
    7376      if ((*j)->status_ == Job::completed)
    74         job->parents_.erase(j++);
     77        job->prerequisite_.erase(j++);
    7578      else {
    7679        process(*j);
    77         // record that job is a child of j, so we can notify job when
    78         // j has completed
    79         children_[*j].push_back(job);
    8080        ++j;
    8181      }
    8282    }
    8383
    84     // If job has no parents (or all parents are completed) send job to queue
    85     if (job->parents_.empty()) {
     84    // If all prerequisite are finished, send job to queue
     85    if (job->prerequisite_.empty()) {
    8686      queue(job);
    8787    }
     
    136136
    137137
    138   void Scheduler::Job::add_dependency(boost::shared_ptr<Job> job)
    139   {
    140     assert(status_==pristine);
    141     parents_.insert(job);
    142   }
    143 
    144 
    145138  // Scheduler::Worker
    146139
  • trunk/yat/utility/Scheduler.h

    r3349 r3401  
    5050     Scheduler scheduler(2);
    5151     boost::shared_ptr<MyJob> job1(new MyJob("Hello"));
    52      boost::shared_ptr<MyJob> job1(new MyJob(" "));
     52     boost::shared_ptr<MyJob> job2(new MyJob(" "));
    5353     boost::shared_ptr<MyJob> job3(new MyJob("World"));
    5454     boost::shared_ptr<MyJob> job4(new MyJob("\n"));
    55      job4.add_dependency(job1);
    56      job4.add_dependency(job2);
    57      job4.add_dependency(job3);
     55     scheduler.add_dependency(job4, job1);
     56     scheduler.add_dependency(job4, job2);
     57     scheduler.add_dependency(job4, job3);
    5858     scheduler.submit(job4);
    5959     scheduler.launch();
     
    8181
    8282      /**
    83          \brief add a dependency
    84 
    85          This job will not be processed until \a job has finished.
    86        */
    87       void add_dependency(boost::shared_ptr<Job> job);
    88 
    89       /**
    9083         This function defines the work done in thread.
    9184       */
     
    9487      friend class Scheduler;
    9588      // set of jobs that have to finish before this can run
    96       std::set<boost::shared_ptr<Job> > parents_;
     89      std::set<boost::shared_ptr<Job> > prerequisite_;
     90      // jobs that have *this as prerequisite
     91      std::vector<boost::shared_ptr<Job> > observers_;
    9792      enum status { pristine, prepared, running, completed};
    9893      status status_;
     
    105100     */
    106101    Scheduler(unsigned int threads);
     102
     103    /**
     104       \brief add a dependency rule
     105
     106       Add a dependency that Job \a prerequisite has to complete
     107       before Job \a job is run.
     108     */
     109    void add_dependency(boost::shared_ptr<Job> job,
     110                        boost::shared_ptr<Job> prerequisite);
    107111
    108112    /**
     
    138142
    139143    // function called when job has finished and returned from
    140     // worker. If there are any jobs that depends on \a job those jobs
     144    // worker. If there are any jobs that depend on \a job, those jobs
    141145    // are notified and if it makes them ready to be processed they
    142146    // are sent to queue.
     
    154158    int running_jobs_;
    155159
    156     // value lists all jobs that depend on key, i.e., key has to
    157     // finish before jobs in value can start
    158     std::map<JobPtr, std::vector<JobPtr> > children_;
    159 
    160160    Queue<boost::shared_ptr<Job> > queue_;
    161161    Queue<boost::shared_ptr<Job> > completed_;
Note: See TracChangeset for help on using the changeset viewer.