source: trunk/yat/utility/multiprocess.h @ 4047

Last change on this file since 4047 was 4047, checked in by Peter, 9 months ago

corrected includes

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 6.4 KB
Line 
1#ifndef _theplu_yat_utility_multi_processor_
2#define _theplu_yat_utility_multi_processor_
3
4// $Id: multiprocess.h 4047 2021-03-01 14:00:13Z peter $
5
6/*
7  Copyright (C) 2021 Peter Johansson
8
9  This file is part of the yat library, http://dev.thep.lu.se/yat
10
11  The yat library is free software; you can redistribute it and/or
12  modify it under the terms of the GNU General Public License as
13  published by the Free Software Foundation; either version 3 of the
14  License, or (at your option) any later version.
15
16  The yat library is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  General Public License for more details.
20
21  You should have received a copy of the GNU General Public License
22  along with yat. If not, see <http://www.gnu.org/licenses/>.
23*/
24
25#include "Queue.h"
26#include "yat_assert.h"
27
28#include <iterator>
29#include <map>
30#include <thread>
31#include <vector>
32
33namespace theplu {
34namespace yat {
35namespace utility {
36
37  /**
38     Same as
39     multiprocess(begin, end, out, cap1, n_threads, cap2, function)
40     but use \a compare instead of std::less<T>
41
42     \since New in yat 0.19
43   */
44  template<typename InputIterator, typename OutputIterator,
45           class Function, class Compare>
46  void multiprocess(InputIterator begin, InputIterator end,
47                    OutputIterator out, size_t cap1, size_t n_threads,
48                    size_t cap2, Function function, Compare compare)
49  {
50    typedef typename std::iterator_traits<InputIterator>::value_type T;
51
52    class Reader
53    {
54    public:
55      Reader(InputIterator first, InputIterator last,
56             Queue<std::shared_ptr<T>>& queue)
57        : first_(first), last_(last), queue_(queue)
58      {}
59
60      void operator()(void)
61      {
62        for (; first_!=last_; ++first_)
63          queue_.push(std::make_shared<T>(*first_));
64        // send a kill pill to workers
65        queue_.push(std::shared_ptr<T>());
66      }
67    private:
68      InputIterator first_;
69      InputIterator last_;
70      Queue<std::shared_ptr<T>>& queue_;
71    };
72
73
74    class Worker
75    {
76    public:
77      Worker(Queue<std::shared_ptr<T>>& in, const Function& func,
78             Queue<std::shared_ptr<T>>& out)
79        : in_(in), out_(out), function_(func)
80      {}
81
82
83      void operator()(void)
84      {
85        std::shared_ptr<T> element;
86        while (true) {
87          in_.pop(element);
88          if (element) {
89            // Do the work and if function returns true, push worked
90            // element into out_ queue.
91            if (function_(*element))
92              out_.push(element);
93          }
94          else {
95            // When a Worker receive a kill pill, share it with other
96            // workers by pushing it into in_, and push it to the
97            // Writer to inform that this queue is completed.
98            in_.push(element);
99            out_.push(element);
100            // After that we are done, and can return home.
101            return;
102          }
103        }
104      }
105
106
107    private:
108      Queue<std::shared_ptr<T>>& in_;
109      Queue<std::shared_ptr<T>>& out_;
110      Function function_;
111      size_t cap_;
112    };
113
114
115    class PtrCompare
116    {
117    public:
118      PtrCompare(const Compare& c)
119        : compare_(c)
120      {}
121
122      bool operator()(const std::shared_ptr<T>& lhs,
123                      const std::shared_ptr<T>& rhs) const
124      {
125        YAT_ASSERT(lhs.get());
126        YAT_ASSERT(rhs.get());
127        return compare_(*lhs, *rhs);
128      }
129    private:
130      Compare compare_;
131    };
132
133
134    class Writer
135    {
136    public:
137      Writer(OutputIterator out, std::vector<Queue<std::shared_ptr<T>>>& Qs,
138             const Compare& compare)
139        : out_(out), queue_(Qs), buffer_(PtrCompare(compare))
140      {}
141
142
143      void operator()(void)
144      {
145        for (size_t i=0; i<queue_.size(); ++i)
146          read(i);
147
148        while (!buffer_.empty()) {
149          size_t idx = buffer_.begin()->second;
150          *out_ = *buffer_.begin()->first;
151          ++out_;
152          buffer_.erase(buffer_.begin());
153          // read from the same queue erased element came from
154          read(idx);
155        }
156      }
157    private:
158      void read(size_t idx)
159      {
160        YAT_ASSERT(idx < queue_.size());
161        std::shared_ptr<T> element;
162        queue_[idx].pop(element);
163        // If element was not "null" insert it into buffer. If element
164        // was a "null", size of buffer is efectively decreasing by
165        // one untile we reach zero-size.
166        if (element)
167          buffer_.insert(buffer_.end(), std::make_pair(element, idx));
168      }
169
170      OutputIterator out_;
171      std::vector<Queue<std::shared_ptr<T>>>& queue_;
172      std::multimap<std::shared_ptr<T>, size_t, PtrCompare> buffer_;
173    };
174
175    std::vector<std::thread> threads;
176    // queue used to communicate between reader and workers
177    Queue<std::shared_ptr<T>> queue;
178    queue.capacity(cap1);
179
180    YAT_ASSERT(n_threads);
181    // queues used to communicate between workers and writer; each
182    // worker has its own queue and it's the job of the writer to
183    // merge them into a sorted output.
184    Queue<std::shared_ptr<T>> queue2;
185    queue2.capacity(cap2);
186    std::vector<Queue<std::shared_ptr<T>>> Qs(n_threads, queue2);
187
188    // launch threads
189    threads.push_back(std::thread(Reader(begin, end, queue)));
190    for (size_t i=0; i<n_threads; ++i)
191      threads.push_back(std::thread(Worker(queue, function, Qs[i])));
192    threads.push_back(std::thread(Writer(out, Qs, compare)));
193
194    // wait for threads to complete
195    for (size_t i=0; i<threads.size(); ++i)
196      threads[i].join();
197  }
198
199
200  /**
201     Function reads the sorted range [\c begin, \c end), applies \c
202     function on each element and if \c function returns true, element
203     it copied into \c out and \c out is incremented.
204
205     This is done in n_threads + 2 threads. The reading of element
206     from the sorted range is done in one range, the calling of \c
207     function is done in \c n_threads threads, and the merging of the
208     results from those calls and copying to \c out is done in one
209     thread.
210
211     The communication between the threads are done in buffer
212     containers and to avoid extreme memory usage, the sizes of these
213     containers can be capped with parameter \c cap1 and \c
214     cap2. There's a single container communicating between reader
215     thread and worker threads, and \c cap1 limits the size of the
216     container. Each worker has a container communicating with the
217     writer thread, so there are \c n_threads such containers and the
218     size of each container is limited by \c cap2.
219
220     \since new in yat 0.19
221   */
222  template<typename InputIterator, typename OutputIterator, class Function>
223  void multiprocess(InputIterator begin, InputIterator end,
224                    OutputIterator out, size_t cap1, size_t n_threads,
225                    size_t cap2, Function function)
226  {
227    typedef typename std::iterator_traits<InputIterator>::value_type T;
228    std::less<T> compare;
229    multiprocess(begin, end, out, cap1, n_threads, cap2, function, compare);
230  }
231
232}}} // of namespace utility, yat, and theplu
233
234#endif
Note: See TracBrowser for help on using the repository browser.