pion-net  4.0.9
PionLockedQueue.hpp
1 // -----------------------------------------------------------------------
2 // pion-common: a collection of common libraries used by the Pion Platform
3 // -----------------------------------------------------------------------
4 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com)
5 //
6 // Distributed under the Boost Software License, Version 1.0.
7 // See http://www.boost.org/LICENSE_1_0.txt
8 //
9 
10 #ifndef __PION_PIONLOCKEDQUEUE_HEADER__
11 #define __PION_PIONLOCKEDQUEUE_HEADER__
12 
13 #include <new>
14 #include <boost/cstdint.hpp>
15 #include <boost/noncopyable.hpp>
16 #include <boost/thread/thread.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <boost/detail/atomic_count.hpp>
20 #include <pion/PionConfig.hpp>
21 #include <pion/PionException.hpp>
22 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
23  #include <boost/lockfree/detail/freelist.hpp>
24 #endif
25 
26 
27 // NOTE: the data structures contained in this file are based upon algorithms
28 // published in the paper "Simple, Fast, and Practical Non-Blocking and Blocking
29 // Concurrent Queue Algorithms" (1996, Maged M. Michael and Michael L. Scott,
30 // Department of Computer Science, University of Rochester).
31 // See http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
32 
33 
34 namespace pion { // begin namespace pion
35 
36 
40 template <typename T,
41  boost::uint32_t MaxSize = 250000,
42  boost::uint32_t SleepMilliSec = 10 >
44  private boost::noncopyable
45 {
46 protected:
47 
49  struct QueueNode {
50  T data; //< data wrapped by the node item
51  QueueNode * next; //< points to the next node in the queue
52  boost::uint32_t version; //< the node item's version number
53  };
54 
56  inline QueueNode *createNode(void) {
57 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
58  return new (m_free_list.allocate()) QueueNode();
59 #else
60  return new QueueNode();
61 #endif
62  }
63 
65  inline void destroyNode(QueueNode *node_ptr) {
66 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
67  node_ptr->~QueueNode();
68  m_free_list.deallocate(node_ptr);
69 #else
70  delete node_ptr;
71 #endif
72  }
73 
75  inline void initialize(void) {
76  // initialize with a dummy node since m_head_ptr is always
77  // pointing to the item before the head of the list
78  m_head_ptr = m_tail_ptr = createNode();
79  m_head_ptr->next = NULL;
80  m_head_ptr->version = 0;
81  }
82 
91  inline bool dequeue(T& t, boost::uint32_t& version) {
92  // just return if the list is empty
93  boost::mutex::scoped_lock head_lock(m_head_mutex);
94  QueueNode *new_head_ptr = m_head_ptr->next;
95  if (! new_head_ptr) {
96  version = m_head_ptr->version;
97  return false;
98  }
99 
100  // get a copy of the item at the head of the list
101  version = new_head_ptr->version;
102  t = new_head_ptr->data;
103 
104  // update the pointer to the head of the list
105  QueueNode *old_head_ptr = m_head_ptr;
106  m_head_ptr = new_head_ptr;
107  head_lock.unlock();
108 
109  // free the QueueNode for the old head of the list
110  destroyNode(old_head_ptr);
111 
112  // decrement size
113  --m_size;
114 
115  // item successfully dequeued
116  return true;
117  }
118 
119 
120 public:
121 
124  public:
125 
130  ConsumerThread(void) : m_is_running(true), m_next_ptr(NULL),
131  m_wakeup_time(boost::posix_time::not_a_date_time) {}
132 
139  template <typename DurationType>
140  ConsumerThread(const DurationType& d)
141  : m_is_running(true), m_next_ptr(NULL), m_wakeup_time(d)
142  {}
143 
145  inline bool isRunning(void) const { return m_is_running; }
146 
148  inline void stop(void) { m_is_running = false; m_wakeup_event.notify_one(); }
149 
151  inline void reset(void) { m_is_running = true; m_next_ptr = NULL; }
152 
154  inline bool hasWakeupTimer(void) const { return !m_wakeup_time.is_not_a_date_time(); }
155 
157  inline const boost::posix_time::time_duration& getWakeupTimer(void) const {
158  return m_wakeup_time;
159  }
160 
161  private:
162 
164  friend class PionLockedQueue;
165 
166  volatile bool m_is_running; //< true while the thread is running/active
167  ConsumerThread * m_next_ptr; //< pointer to the next idle thread
168  boost::condition m_wakeup_event; //< triggered when a new item is available
169  boost::posix_time::time_duration m_wakeup_time; //< inactivity wakeup timer duration
170  };
171 
172 
175  : m_head_ptr(NULL), m_tail_ptr(NULL), m_idle_ptr(NULL),
176  m_next_version(1), m_size(0)
177  {
178  initialize();
179  }
180 
182  virtual ~PionLockedQueue() {
183  clear();
184  destroyNode(m_tail_ptr);
185  }
186 
188  inline bool empty(void) const { return (m_head_ptr->next == NULL); }
189 
191  std::size_t size(void) const {
192  return m_size;
193  }
194 
196  void clear(void) {
197  boost::mutex::scoped_lock tail_lock(m_tail_mutex);
198  boost::mutex::scoped_lock head_lock(m_head_mutex);
199  // also delete dummy node and reinitialize it to clear old value
200  while (m_head_ptr) {
201  m_tail_ptr = m_head_ptr;
202  m_head_ptr = m_head_ptr->next;
203  destroyNode(m_tail_ptr);
204  if (m_head_ptr)
205  --m_size;
206  }
207  initialize();
208  }
209 
215  void push(const T& t) {
216  // sleep while MaxSize is exceeded
217  if (MaxSize > 0) {
218  boost::system_time wakeup_time;
219  while (size() >= MaxSize) {
220  wakeup_time = boost::get_system_time()
221  + boost::posix_time::millisec(SleepMilliSec);
222  boost::thread::sleep(wakeup_time);
223  }
224  }
225 
226  // create a new list node for the queue item
227  QueueNode *node_ptr = createNode();
228  node_ptr->data = t;
229  node_ptr->next = NULL;
230  node_ptr->version = 0;
231 
232  // append node to the end of the list
233  boost::mutex::scoped_lock tail_lock(m_tail_mutex);
234  node_ptr->version = (m_next_version += 2);
235  m_tail_ptr->next = node_ptr;
236 
237  // update the tail pointer for the new node
238  m_tail_ptr = node_ptr;
239 
240  // increment size
241  ++m_size;
242 
243  // wake up an idle thread (if any)
244  if (m_idle_ptr) {
245  ConsumerThread *idle_ptr = m_idle_ptr;
246  m_idle_ptr = m_idle_ptr->m_next_ptr;
247  idle_ptr->m_wakeup_event.notify_one();
248  }
249  }
250 
261  bool pop(T& t, ConsumerThread& thread_info) {
262  boost::uint32_t last_known_version;
263  while (thread_info.isRunning()) {
264  // try to get the next value
265  if ( dequeue(t, last_known_version) )
266  return true; // got an item
267 
268  // queue is empty
269  boost::mutex::scoped_lock tail_lock(m_tail_mutex);
270  if (m_tail_ptr->version == last_known_version) {
271  // still empty after acquiring lock
272  thread_info.m_next_ptr = m_idle_ptr;
273  m_idle_ptr = & thread_info;
274  // get wakeup time (if any)
275  if (thread_info.hasWakeupTimer()) {
276  // wait for an item to become available
277  const boost::posix_time::ptime wakeup_time(boost::get_system_time() + thread_info.getWakeupTimer());
278  if (!thread_info.m_wakeup_event.timed_wait(tail_lock, wakeup_time))
279  return false; // timer expired if timed_wait() returns false
280  } else {
281  // wait for an item to become available
282  thread_info.m_wakeup_event.wait(tail_lock);
283  }
284  }
285  }
286  return false;
287  }
288 
296  inline bool pop(T& t) { boost::uint32_t version; return dequeue(t, version); }
297 
298 
299 private:
300 
301 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
304 #endif
305 
307  boost::mutex m_head_mutex;
308 
310  boost::mutex m_tail_mutex;
311 
313  QueueNode * m_head_ptr;
314 
316  QueueNode * m_tail_ptr;
317 
319  ConsumerThread * m_idle_ptr;
320 
322  boost::uint32_t m_next_version;
323 
325  boost::detail::atomic_count m_size;
326 };
327 
328 
329 } // end namespace pion
330 
331 #endif
std::size_t size(void) const
returns the number of items that are currently in the queue
void initialize(void)
initializes head and tail pointers for empty queue
bool empty(void) const
returns true if the queue is empty; false if it is not
bool hasWakeupTimer(void) const
returns true if an inactivity wakeup timer is set for the thread
bool isRunning(void) const
returns true while the consumer thread is active/running
data structure used to manage idle consumer threads waiting for items
void reset(void)
stops the thread – if waiting on pop() will return immediately
data structure used to wrap each item in the queue
const boost::posix_time::time_duration & getWakeupTimer(void) const
returns absolute wakeup time based on current time
void stop(void)
stops the thread – if waiting on pop() will return immediately
void clear(void)
clears the list by removing all remaining items
bool dequeue(T &t, boost::uint32_t &version)
the following enables use of the lock-free cache
PionLockedQueue(void)
constructs a new PionLockedQueue
QueueNode * createNode(void)
returns a new queue node item for use in the queue
bool pop(T &t, ConsumerThread &thread_info)
void destroyNode(QueueNode *node_ptr)
frees memory for an existing queue node item
virtual ~PionLockedQueue()
virtual destructor