source: trunk/yat/utility/multiprocess.h @ 4046

Last change on this file since 4046 was 4046, checked in by Peter, 9 months ago

docs typo

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 6.4 KB
Line 
1#ifndef _theplu_yat_utility_multi_processor_
2#define _theplu_yat_utility_multi_processor_
3
4// $Id: multiprocess.h 4046 2021-03-01 13:24:12Z peter $
5
6/*
7  Copyright (C) 2021 Peter Johansson
8
9  This file is part of the yat library, http://dev.thep.lu.se/yat
10
11  The yat library is free software; you can redistribute it and/or
12  modify it under the terms of the GNU General Public License as
13  published by the Free Software Foundation; either version 3 of the
14  License, or (at your option) any later version.
15
16  The yat library is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  General Public License for more details.
20
21  You should have received a copy of the GNU General Public License
22  along with yat. If not, see <http://www.gnu.org/licenses/>.
23*/
24
25#include "Queue.h"
26#include "yat_assert.h"
27
28#include <chrono>
29#include <thread>
30#include <vector>
31
32namespace theplu {
33namespace yat {
34namespace utility {
35
36  /**
37     Same as
38     multiprocess(begin, end, out, cap1, n_threads, cap2, function)
39     but use \a compare instead of std::less<T>
40
41     \since New in yat 0.19
42   */
43  template<typename InputIterator, typename OutputIterator,
44           class Function, class Compare>
45  void multiprocess(InputIterator begin, InputIterator end,
46                    OutputIterator out, size_t cap1, size_t n_threads,
47                    size_t cap2, Function function, Compare compare)
48  {
49    typedef typename std::iterator_traits<InputIterator>::value_type T;
50
51    class Reader
52    {
53    public:
54      Reader(InputIterator first, InputIterator last,
55             Queue<std::shared_ptr<T>>& queue)
56        : first_(first), last_(last), queue_(queue)
57      {}
58
59      void operator()(void)
60      {
61        for (; first_!=last_; ++first_)
62          queue_.push(std::make_shared<T>(*first_));
63        // send a kill pill to workers
64        queue_.push(std::shared_ptr<T>());
65      }
66    private:
67      InputIterator first_;
68      InputIterator last_;
69      Queue<std::shared_ptr<T>>& queue_;
70    };
71
72
73    class Worker
74    {
75    public:
76      Worker(Queue<std::shared_ptr<T>>& in, const Function& func,
77             Queue<std::shared_ptr<T>>& out)
78        : in_(in), out_(out), function_(func)
79      {}
80
81
82      void operator()(void)
83      {
84        std::shared_ptr<T> element;
85        while (true) {
86          in_.pop(element);
87          if (element) {
88            // Do the work and if function returns true, push worked
89            // element into out_ queue.
90            if (function_(*element))
91              out_.push(element);
92          }
93          else {
94            // When a Worker receive a kill pill, share it with other
95            // workers by pushing it into in_, and push it to the
96            // Writer to inform that this queue is completed.
97            in_.push(element);
98            out_.push(element);
99            // After that we are done, and can return home.
100            return;
101          }
102        }
103      }
104
105
106    private:
107      Queue<std::shared_ptr<T>>& in_;
108      Queue<std::shared_ptr<T>>& out_;
109      Function function_;
110      size_t cap_;
111    };
112
113
114    class PtrCompare
115    {
116    public:
117      PtrCompare(const Compare& c)
118        : compare_(c)
119      {}
120
121      bool operator()(const std::shared_ptr<T>& lhs,
122                      const std::shared_ptr<T>& rhs) const
123      {
124        YAT_ASSERT(lhs.get());
125        YAT_ASSERT(rhs.get());
126        return compare_(*lhs, *rhs);
127      }
128    private:
129      Compare compare_;
130    };
131
132
133    class Writer
134    {
135    public:
136      Writer(OutputIterator out, std::vector<Queue<std::shared_ptr<T>>>& Qs,
137             const Compare& compare)
138        : out_(out), queue_(Qs), buffer_(PtrCompare(compare))
139      {}
140
141
142      void operator()(void)
143      {
144        for (size_t i=0; i<queue_.size(); ++i)
145          read(i);
146
147        while (!buffer_.empty()) {
148          size_t idx = buffer_.begin()->second;
149          *out_ = *buffer_.begin()->first;
150          ++out_;
151          buffer_.erase(buffer_.begin());
152          // read from the same queue erased element came from
153          read(idx);
154        }
155      }
156    private:
157      void read(size_t idx)
158      {
159        YAT_ASSERT(idx < queue_.size());
160        std::shared_ptr<T> element;
161        queue_[idx].pop(element);
162        // If element was not "null" insert it into buffer. If element
163        // was a "null", size of buffer is efectively decreasing by
164        // one untile we reach zero-size.
165        if (element)
166          buffer_.insert(buffer_.end(), std::make_pair(element, idx));
167      }
168
169      OutputIterator out_;
170      std::vector<Queue<std::shared_ptr<T>>>& queue_;
171      std::multimap<std::shared_ptr<T>, size_t, PtrCompare> buffer_;
172    };
173
174    std::vector<std::thread> threads;
175    // queue used to communicate between reader and workers
176    Queue<std::shared_ptr<T>> queue;
177    queue.capacity(cap1);
178
179    YAT_ASSERT(n_threads);
180    // queues used to communicate between workers and writer; each
181    // worker has its own queue and it's the job of the writer to
182    // merge them into a sorted output.
183    Queue<std::shared_ptr<T>> queue2;
184    queue2.capacity(cap2);
185    std::vector<Queue<std::shared_ptr<T>>> Qs(n_threads, queue2);
186
187    // launch threads
188    threads.push_back(std::thread(Reader(begin, end, queue)));
189    for (size_t i=0; i<n_threads; ++i)
190      threads.push_back(std::thread(Worker(queue, function, Qs[i])));
191    threads.push_back(std::thread(Writer(out, Qs, compare)));
192
193    // wait for threads to complete
194    for (size_t i=0; i<threads.size(); ++i)
195      threads[i].join();
196  }
197
198
199  /**
200     Function reads the sorted range [\c begin, \c end), applies \c
201     function on each element and if \c function returns true, element
202     it copied into \c out and \c out is incremented.
203
204     This is done in n_threads + 2 threads. The reading of element
205     from the sorted range is done in one range, the calling of \c
206     function is done in \c n_threads threads, and the merging of the
207     results from those calls and copying to \c out is done in one
208     thread.
209
210     The communication between the threads are done in buffer
211     containers and to avoid extreme memory usage, the sizes of these
212     containers can be capped with parameter \c cap1 and \c
213     cap2. There's a single container communicating between reader
214     thread and worker threads, and \c cap1 limits the size of the
215     container. Each worker has a container communicating with the
216     writer thread, so there are \c n_threads such containers and the
217     size of each container is limited by \c cap2.
218
219     \since new in yat 0.19
220   */
221  template<typename InputIterator, typename OutputIterator, class Function>
222  void multiprocess(InputIterator begin, InputIterator end,
223                    OutputIterator out, size_t cap1, size_t n_threads,
224                    size_t cap2, Function function)
225  {
226    typedef typename std::iterator_traits<InputIterator>::value_type T;
227    std::less<T> compare;
228    multiprocess(begin, end, out, cap1, n_threads, cap2, function, compare);
229  }
230
231}}} // of namespace utility, yat, and theplu
232
233#endif
Note: See TracBrowser for help on using the repository browser.