Changeset 4043


Ignore:
Timestamp:
Mar 1, 2021, 5:52:24 AM (8 months ago)
Author:
Peter
Message:

Add capacity to Queue and PriorityQueue?, which is a feature to limit
the size of the queue, i.e., a push is waiting until size < capacity
before completing the push.

Location:
trunk
Files:
1 added
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/test/Makefile.am

    r4036 r4043  
    55# Copyright (C) 2005 Jari Häkkinen, Peter Johansson
    66# Copyright (C) 2006, 2007, 2008 Jari Häkkinen, Peter Johansson, Markus Ringnér
    7 # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020 Peter Johansson
     7# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021 Peter Johansson
    88#
    99# This file is part of the yat library, http://dev.thep.lu.se/yat
     
    102102  test/poisson.test \
    103103  test/queue.test test/queue2.test \
     104  test/queue3.test \
    104105  test/range.test test/regression.test test/rnd.test \
    105106  test/rng-mt.test \
  • trunk/yat/utility/BasicQueue.h

    r3999 r4043  
    44// $Id$
    55//
    6 // Copyright (C) 2017, 2018, 2020 Peter Johansson
     6// Copyright (C) 2017, 2018, 2020, 2021 Peter Johansson
    77//
    88// This program is free software; you can redistribute it and/or modify
     
    2222
    2323#include <condition_variable>
     24#include <limits>
    2425#include <mutex>
    2526
     
    4748       Default constructor
    4849     */
    49     BasicQueue(void) {}
     50    BasicQueue(void)
     51      : capacity_(std::numeric_limits<size_t>::max())
     52    {}
    5053
    5154    /**
     
    5659      std::unique_lock<std::mutex> lock(other.mutex_);
    5760      q_ = other.q_;
     61      capacity_ = other.capacity_;
    5862    } // lock is released here
    5963
     
    6165       Construct queue from underlying Container
    6266     */
    63     explicit BasicQueue(const Container& container) : q_(container) {}
     67    explicit BasicQueue(const Container& container)
     68      : q_(container), capacity_(std::numeric_limits<size_t>::max())
     69    {}
     70
     71
     72    /**
     73       \return maximal number of element stored in container
     74
     75       \since New in yat 0.19
     76     */
     77    size_t capacity(void)
     78    {
     79      std::unique_lock<std::mutex> lock(mutex_);
     80      return capacity_;
     81    }
     82
     83
     84    /**
     85       \brief change maximal number of element stored in container
     86
     87       \since New in yat 0.19
     88    */
     89    void capacity(size_t c)
     90    {
     91      std::unique_lock<std::mutex> lock(mutex_);
     92      capacity_ = c;
     93
     94      lock.unlock();
     95      // Notify pushers that there is potentially space
     96      push_condition_.notify_all();
     97    }
    6498
    6599    /**
     
    71105    {
    72106      std::unique_lock<std::mutex> lock(mutex_);
    73       return q_.clear();
     107      q_.clear();
     108      lock.unlock(); // unlock the mutex
     109
     110      // Notify pushers that there is space to push
     111      push_condition_.notify_all();
    74112    }
    75113
     
    96134      std::unique_lock<std::mutex> lock(mutex_);
    97135      while (q_.empty())
    98         condition_.wait(lock);
     136        pop_condition_.wait(lock);
    99137      // The obvious choice would be to create a temp copy of front,
    100138      // pop the queue and then return by-value. This is, however,
     
    103141      // pass via passed reference.
    104142      static_cast<Derived*>(this)->pop_impl(value, lock);
     143      lock.unlock(); // unlock the mutex
     144      push_condition_.notify_one();
    105145    } // lock is released here
    106146
     
    108148    /**
    109149       \brief insert an element into container
     150
     151       If size of queue is equal (or greater) to its capacity, the
     152       function is waiting until this is not the case.
    110153     */
    111154    void push(const T& t)
    112155    {
    113156      std::unique_lock<std::mutex> lock(mutex_);
     157      while (q_.size() >= capacity_)
     158        push_condition_.wait(lock);
    114159      static_cast<Derived*>(this)->push_impl(t, lock);
    115160      lock.unlock(); // unlock the mutex
    116161
    117162      // Notify others that data is ready after we have unlocked
    118       condition_.notify_one();
     163      pop_condition_.notify_one();
    119164    }
    120165
     
    123168       \brief insert an element into container
    124169
    125        \note only available if configured and built with cxx11 support
    126 
    127170       \since New in yat 0.15
    128171     */
     
    130173    {
    131174      std::unique_lock<std::mutex> lock(mutex_);
     175      while (q_.size() >= capacity_)
     176        push_condition_.wait(lock);
    132177      static_cast<Derived*>(this)->push_impl(std::move(t), lock);
    133178      lock.unlock(); // unlock the mutex
    134179
    135180      // Notify others that data is ready after we have unlocked
    136       condition_.notify_one();
     181      pop_condition_.notify_one();
    137182    }
    138183
     
    158203        return false;
    159204      static_cast<Derived*>(this)->pop_impl(value, lock);
     205      lock.unlock(); // unlock the mutex
     206
     207      // Notify others that data is ready after we have unlocked
     208      push_condition_.notify_one();
     209      return true;
     210    } // lock is released here
     211
     212
     213    /**
     214       If Queue size is less than capacity push \a value and return \c
     215       true; otherwise return \c false
     216
     217       \since New in yat 0.19
     218     */
     219    bool try_push(T& value)
     220    {
     221      std::unique_lock<std::mutex> lock(mutex_);
     222      if (q_.size() >= capacity_)
     223        return false;
     224      static_cast<Derived*>(this)->push_impl(value, lock);
     225      lock.unlock(); // unlock the mutex
     226
     227      // Notify others that data is ready after we have unlocked
     228      pop_condition_.notify_one();
     229      return true;
     230    } // lock is released here
     231
     232
     233    /**
     234       If Queue size is less than capacity push \a value and return \c
     235       true; otherwise return \c false
     236
     237       \since New in yat 0.19
     238     */
     239    bool try_push(T&& value)
     240    {
     241      std::unique_lock<std::mutex> lock(mutex_);
     242      if (q_.size() >= capacity_)
     243        return false;
     244      static_cast<Derived*>(this)->push_impl(std::move(value), lock);
     245      lock.unlock(); // unlock the mutex
     246
     247      // Notify others that data is ready after we have unlocked
     248      pop_condition_.notify_one();
    160249      return true;
    161250    } // lock is released here
     
    177266                                                std::adopt_lock_t());
    178267        q_ = other.q_;
     268        capacity_ = other.capacity_;
    179269      }
    180270    }
     
    184274  private:
    185275    mutable std::mutex mutex_;
    186     std::condition_variable condition_;
     276    std::condition_variable pop_condition_;
     277    std::condition_variable push_condition_;
     278    size_t capacity_;
    187279  };
    188280
Note: See TracChangeset for help on using the changeset viewer.