Changeset 4167


Ignore:
Timestamp:
Mar 23, 2022, 2:20:07 AM (4 months ago)
Author:
Peter
Message:

When a second exception was thrown, the JobHandler? would handle it
exactly like the first one, i.e., clear the job queue and kill the
workers. The problem was that the job queue could contain poison pills
sent by the JobHandler? handling the first exception in order to kill
the workers and the second call to kill_workers would do nothing since
the counter was already down to zero, which would leave the Workers in
limbo sitting and waiting for more jobs and the JobHandler? waiting for
them to finish - until end of time.

This changeset fixes that.

Location:
branches/0.19-stable
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • branches/0.19-stable/test/scheduler3.cc

    r3855 r4167  
    3232
    3333#include <boost/shared_ptr.hpp>
    34 #include <boost/exception/all.hpp>
    3534
     35#include <chrono>
    3636#include <iostream>
    3737
     
    4747    if (time_==3)
    4848      throw utility::runtime_error("some message");
    49     sleep(time_);
     49    std::this_thread::sleep_for(std::chrono::seconds(time_));
    5050    std::cerr << "sleeping " << time_ << " seconds\n";
    5151  }
     
    5555
    5656
    57 void run(void)
     57void run(test::Suite& suite)
    5858{
    5959  Scheduler scheduler(2);
    6060  std::vector<boost::shared_ptr<Sleeper> > task;
    61   task.push_back(boost::shared_ptr<Sleeper>(new Sleeper(1)));
     61  task.push_back(boost::shared_ptr<Sleeper>(new Sleeper(4)));
    6262  task.push_back(boost::shared_ptr<Sleeper>(new Sleeper(2)));
    6363  task.push_back(boost::shared_ptr<Sleeper>(new Sleeper(3)));
    64   task.push_back(boost::shared_ptr<Sleeper>(new Sleeper(1)));
    65   task.push_back(boost::shared_ptr<Sleeper>(new Sleeper(1)));
     64  for (size_t i=0; i<100; ++i)
     65    task.push_back(boost::shared_ptr<Sleeper>(new Sleeper(1)));
     66
    6667  for (size_t i=0; i<task.size(); ++i)
    6768    scheduler.submit(task[i]);
     69  while (scheduler.jobs()) {
     70    // churn to trigger ::throw_if_error
     71    scheduler.threads(2);
     72  }
     73
    6874  scheduler.wait();
    6975}
     
    7581
    7682  try {
    77     run();
     83    run(suite);
    7884    suite.err() << "error: no exception thrown\n";
    7985    suite.add(false);
  • branches/0.19-stable/yat/utility/Scheduler.cc

    r4161 r4167  
    9696  void Scheduler::threads(unsigned int n)
    9797  {
     98    throw_if_error();
    9899    data_.n_threads().set(n);
    99100    if (job_handler_.joinable())
     
    499500  void Scheduler::JobHandler::interrupt_workers(void)
    500501  {
     502    // if # of workers is zero, kill_workers have already been called
     503    // (either directly or via interrupt_workers) and the jobs in the
     504    // queue below are most likely poison pills and stealing them
     505    // would prevent workers from dying.
     506    if (n_target_workers_ == 0)
     507      return;
    501508    JobPtr tmp;
    502509    // We would like to clear the queue and reduce running_jobs with
    503510    // the reduced size, but ATM there is no way to lock the queue to
    504     // ensure an Worker is not modifying it between we assess the size
     511    // ensure a Worker is not modifying it between we assess the size
    505512    // and clear it.
    506513    while (data_->queue().try_pop(tmp))
     
    508515
    509516    kill_workers();
     517    assert(n_target_workers_ == 0);
    510518  }
    511519
     
    554562                                      const Scheduler::Dependency::Lock& lock)
    555563  {
     564    if (!n_target_workers_)
     565      return;
     566    assert(n_target_workers_);
    556567    assert(job->status(lock) == Job::pristine);
    557568    job->status(lock, Job::prepared);
     
    589600                                    const Scheduler::Dependency::Lock& lock)
    590601  {
     602    assert(n_target_workers_);
    591603    job->status(lock, Job::running);
    592604    data_->running_jobs().increment();
Note: See TracChangeset for help on using the changeset viewer.