source: trunk/yat/utility/BasicQueue.h @ 4043

Last change on this file since 4043 was 4043, checked in by Peter, 9 months ago

Add capacity to Queue and PriorityQueue?, which is a feature to limit
the size of the queue, i.e., a push is waiting until size < capacity
before completing the push.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 6.8 KB
Line 
1#ifndef theplu_yat_utility_basic_queue
2#define theplu_yat_utility_basic_queue
3
4// $Id: BasicQueue.h 4043 2021-03-01 04:52:24Z peter $
5//
6// Copyright (C) 2017, 2018, 2020, 2021 Peter Johansson
7//
8// This program is free software; you can redistribute it and/or modify
9// it under the terms of the GNU General Public License as published by
10// the Free Software Foundation; either version 3 of the License, or
11// (at your option) any later version.
12//
13// This program is distributed in the hope that it will be useful, but
14// WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16// General Public License for more details.
17//
18// You should have received a copy of the GNU General Public License
19// along with yat. If not, see <http://www.gnu.org/licenses/>.
20
21#include "config_public.h"
22
23#include <condition_variable>
24#include <limits>
25#include <mutex>
26
27namespace theplu {
28namespace yat {
29namespace utility {
30namespace detail {
31
32  /**
33     \internal Base class for Queue and PriorityQueue
34   */
35  template<class Derived, typename T, class Container>
36  class BasicQueue
37  {
38  public:
39    /// Type of object stored
40    typedef typename Container::value_type value_type;
41
42    /**
43       An unsigned integral type. \see size(void)
44    */
45    typedef typename Container::size_type size_type;
46
47    /**
48       Default constructor
49     */
50    BasicQueue(void)
51      : capacity_(std::numeric_limits<size_t>::max())
52    {}
53
54    /**
55       Copy constructor
56     */
57    BasicQueue(const BasicQueue& other)
58    {
59      std::unique_lock<std::mutex> lock(other.mutex_);
60      q_ = other.q_;
61      capacity_ = other.capacity_;
62    } // lock is released here
63
64    /**
65       Construct queue from underlying Container
66     */
67    explicit BasicQueue(const Container& container)
68      : q_(container), capacity_(std::numeric_limits<size_t>::max())
69    {}
70
71
72    /**
73       \return maximal number of element stored in container
74
75       \since New in yat 0.19
76     */
77    size_t capacity(void)
78    {
79      std::unique_lock<std::mutex> lock(mutex_);
80      return capacity_;
81    }
82
83
84    /**
85       \brief change maximal number of element stored in container
86
87       \since New in yat 0.19
88    */
89    void capacity(size_t c)
90    {
91      std::unique_lock<std::mutex> lock(mutex_);
92      capacity_ = c;
93
94      lock.unlock();
95      // Notify pushers that there is potentially space
96      push_condition_.notify_all();
97    }
98
99    /**
100       \brief clear queue
101
102       \since new in yat 0.15
103     */
104    void clear(void)
105    {
106      std::unique_lock<std::mutex> lock(mutex_);
107      q_.clear();
108      lock.unlock(); // unlock the mutex
109
110      // Notify pushers that there is space to push
111      push_condition_.notify_all();
112    }
113
114
115    /**
116       \return \c true if container's size is zero
117     */
118    bool empty(void) const
119    {
120      std::unique_lock<std::mutex> lock(mutex_);
121      return q_.empty();
122    } // lock is released here
123
124
125    /**
126       \brief access next element in queue
127
128       Access the next element is queue. If container is empty,
129       process is waiting until other process is inserting element
130       into container.
131     */
132    void pop(T& value)
133    {
134      std::unique_lock<std::mutex> lock(mutex_);
135      while (q_.empty())
136        pop_condition_.wait(lock);
137      // The obvious choice would be to create a temp copy of front,
138      // pop the queue and then return by-value. This is, however,
139      // dangerous becasue if the copy constructor throws, the queue
140      // has been popped and the element is lost. Instead we choose to
141      // pass via passed reference.
142      static_cast<Derived*>(this)->pop_impl(value, lock);
143      lock.unlock(); // unlock the mutex
144      push_condition_.notify_one();
145    } // lock is released here
146
147
148    /**
149       \brief insert an element into container
150
151       If size of queue is equal (or greater) to its capacity, the
152       function is waiting until this is not the case.
153     */
154    void push(const T& t)
155    {
156      std::unique_lock<std::mutex> lock(mutex_);
157      while (q_.size() >= capacity_)
158        push_condition_.wait(lock);
159      static_cast<Derived*>(this)->push_impl(t, lock);
160      lock.unlock(); // unlock the mutex
161
162      // Notify others that data is ready after we have unlocked
163      pop_condition_.notify_one();
164    }
165
166
167    /**
168       \brief insert an element into container
169
170       \since New in yat 0.15
171     */
172    void push(T&& t)
173    {
174      std::unique_lock<std::mutex> lock(mutex_);
175      while (q_.size() >= capacity_)
176        push_condition_.wait(lock);
177      static_cast<Derived*>(this)->push_impl(std::move(t), lock);
178      lock.unlock(); // unlock the mutex
179
180      // Notify others that data is ready after we have unlocked
181      pop_condition_.notify_one();
182    }
183
184
185    /**
186       \return Number of elements stored in container
187     */
188    size_type size(void) const
189    {
190      std::unique_lock<std::mutex> lock(mutex_);
191      return q_.size();
192    } // lock is released here
193
194
195    /**
196       If Queue is empty() do nothing and return \c false, else pop
197       the element into \a value and return \c true
198     */
199    bool try_pop(T& value)
200    {
201      std::unique_lock<std::mutex> lock(mutex_);
202      if (q_.empty())
203        return false;
204      static_cast<Derived*>(this)->pop_impl(value, lock);
205      lock.unlock(); // unlock the mutex
206
207      // Notify others that data is ready after we have unlocked
208      push_condition_.notify_one();
209      return true;
210    } // lock is released here
211
212
213    /**
214       If Queue size is less than capacity push \a value and return \c
215       true; otherwise return \c false
216
217       \since New in yat 0.19
218     */
219    bool try_push(T& value)
220    {
221      std::unique_lock<std::mutex> lock(mutex_);
222      if (q_.size() >= capacity_)
223        return false;
224      static_cast<Derived*>(this)->push_impl(value, lock);
225      lock.unlock(); // unlock the mutex
226
227      // Notify others that data is ready after we have unlocked
228      pop_condition_.notify_one();
229      return true;
230    } // lock is released here
231
232
233    /**
234       If Queue size is less than capacity push \a value and return \c
235       true; otherwise return \c false
236
237       \since New in yat 0.19
238     */
239    bool try_push(T&& value)
240    {
241      std::unique_lock<std::mutex> lock(mutex_);
242      if (q_.size() >= capacity_)
243        return false;
244      static_cast<Derived*>(this)->push_impl(std::move(value), lock);
245      lock.unlock(); // unlock the mutex
246
247      // Notify others that data is ready after we have unlocked
248      pop_condition_.notify_one();
249      return true;
250    } // lock is released here
251
252  protected:
253    /**
254       assign other to this
255     */
256    void assign(const BasicQueue& other)
257    {
258      if (this != &other) {
259        // std::lock guarantees that the two mutexes are locked in
260        // the same order regardless of passed order and thereby
261        // avoiding deadlock when two threads are calling
262        // lhs.assign(rhs) and rhs.assign(lhs) simultaneously.
263        std::lock(mutex_, other.mutex_);
264        std::unique_lock<std::mutex> lock(mutex_, std::adopt_lock_t());
265        std::unique_lock<std::mutex> other_lock(other.mutex_,
266                                                std::adopt_lock_t());
267        q_ = other.q_;
268        capacity_ = other.capacity_;
269      }
270    }
271
272    /// data
273    Container q_;
274  private:
275    mutable std::mutex mutex_;
276    std::condition_variable pop_condition_;
277    std::condition_variable push_condition_;
278    size_t capacity_;
279  };
280
281}}}}
282#endif
Note: See TracBrowser for help on using the repository browser.