pion-net  4.0.9
PionLockFreeQueue.hpp
1 // -----------------------------------------------------------------------
2 // pion-common: a collection of common libraries used by the Pion Platform
3 // -----------------------------------------------------------------------
4 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com)
5 //
6 // Distributed under the Boost Software License, Version 1.0.
7 // See http://www.boost.org/LICENSE_1_0.txt
8 //
9 
10 #ifndef __PION_PIONLOCKFREEQUEUE_HEADER__
11 #define __PION_PIONLOCKFREEQUEUE_HEADER__
12 
13 #ifndef PION_HAVE_LOCKFREE
14  #error "PionLockFreeQueue requires the boost::lockfree library!"
15 #endif
16 #ifdef _MSC_VER
17  #include <iso646.h>
18  #pragma warning(push)
19  #pragma warning(disable: 4800) // forcing value to bool 'true' or 'false' (performance warning)
20 #endif
21 #include <boost/lockfree/detail/tagged_ptr.hpp>
22 #ifdef _MSC_VER
23  #pragma warning(pop)
24 #endif
25 #include <boost/lockfree/detail/cas.hpp>
26 #include <boost/lockfree/detail/freelist.hpp>
27 #include <boost/lockfree/detail/branch_hints.hpp>
28 #include <boost/detail/atomic_count.hpp>
29 #include <boost/noncopyable.hpp>
30 #include <boost/thread/thread.hpp>
31 #include <pion/PionConfig.hpp>
32 //#include <boost/array.hpp>
33 //#include <boost/cstdint.hpp>
34 //#include <boost/static_assert.hpp>
35 
36 
37 // NOTE: the data structures contained in this file are based upon algorithms
38 // published in the paper "Simple, Fast, and Practical Non-Blocking and Blocking
39 // Concurrent Queue Algorithms" (1996, Maged M. Michael and Michael L. Scott,
40 // Department of Computer Science, University of Rochester).
41 // See http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
42 
43 
44 namespace pion { // begin namespace pion
45 
46 
50 template <typename T>
52  private boost::noncopyable
53 {
54 protected:
55 
57  struct QueueNode {
59  QueueNode(void) : next(NULL) {}
60 
62  QueueNode(const T& d) : next(NULL), data(d) {}
63 
66 
68  T data;
69  };
70 
73 
75  inline QueueNode *createNode(void) {
76  QueueNode *node_ptr = m_free_list.allocate();
77  new(node_ptr) QueueNode();
78  return node_ptr;
79  }
80 
82  inline void destroyNode(QueueNode *node_ptr) {
83  node_ptr->~QueueNode();
84  m_free_list.deallocate(node_ptr);
85  }
86 
87 
88 public:
89 
91  PionLockFreeQueue(void) : m_size(0) {
92  // initialize with a dummy node since m_head_ptr is always
93  // pointing to the item before the head of the list
94  QueueNode *dummy_ptr = createNode();
95  m_head_ptr.set_ptr(dummy_ptr);
96  m_tail_ptr.set_ptr(dummy_ptr);
97  }
98 
100  virtual ~PionLockFreeQueue() {
101  clear();
102  destroyNode(m_head_ptr.get_ptr());
103  }
104 
106  inline bool empty(void) const {
107  return (m_head_ptr.get_ptr() == m_tail_ptr.get_ptr());
108  }
109 
111  inline std::size_t size(void) const {
112  return m_size;
113  }
114 
117  volatile void clear(void) {
118  while (! empty()) {
119  QueueNodePtr node_ptr(m_head_ptr);
120  m_head_ptr = m_head_ptr->next;
121  destroyNode(node_ptr.get_ptr());
122  --m_size;
123  }
124  }
125 
131  inline void push(const T& t) {
132  // create a new list node for the queue item
133  QueueNode *node_ptr = createNode();
134  node_ptr->data = t;
135 
136  while (true) {
137  // get copy of tail pointer
138  QueueNodePtr tail_ptr(m_tail_ptr);
139  //boost::lockfree::memory_barrier();
140 
141  // get copy of tail's next pointer
142  QueueNodePtr next_ptr(tail_ptr->next);
143  boost::lockfree::memory_barrier();
144 
145  // make sure that the tail pointer has not changed since reading next
146  if (boost::lockfree::likely(tail_ptr == m_tail_ptr)) {
147  // check if tail was pointing to the last node
148  if (next_ptr.get_ptr() == NULL) {
149  // try to link the new node at the end of the list
150  if (tail_ptr->next.cas(next_ptr, node_ptr)) {
151  // done with enqueue; try to swing tail to the inserted node
152  m_tail_ptr.cas(tail_ptr, node_ptr);
153  break;
154  }
155  } else {
156  // try to swing tail to the next node
157  m_tail_ptr.cas(tail_ptr, next_ptr.get_ptr());
158  }
159  }
160  }
161 
162  // increment size
163  ++m_size;
164  }
165 
173  inline bool pop(T& t) {
174  while (true) {
175  // get copy of head pointer
176  QueueNodePtr head_ptr(m_head_ptr);
177  //boost::lockfree::memory_barrier();
178 
179  // get copy of tail pointer
180  QueueNodePtr tail_ptr(m_tail_ptr);
181  QueueNode *next_ptr = head_ptr->next.get_ptr();
182  boost::lockfree::memory_barrier();
183 
184  // check consistency of head pointer
185  if (boost::lockfree::likely(head_ptr == m_head_ptr)) {
186 
187  // check if queue is empty, or tail is falling behind
188  if (head_ptr.get_ptr() == tail_ptr.get_ptr()) {
189  // is queue empty?
190  if (next_ptr == NULL)
191  return false; // queue is empty
192 
193  // not empty; try to advance tail to catch it up
194  m_tail_ptr.cas(tail_ptr, next_ptr);
195 
196  } else {
197  // tail is OK
198  // read value before CAS, otherwise another dequeue
199  // might free the next node
200  t = next_ptr->data;
201 
202  // try to swing head to the next node
203  if (m_head_ptr.cas(head_ptr, next_ptr)) {
204  // success -> nuke the old head item
205  destroyNode(head_ptr.get_ptr());
206  break; // exit loop
207  }
208  }
209  }
210  }
211 
212  // decrement size
213  --m_size;
214 
215  // item successfully retrieved
216  return true;
217  }
218 
219 
220 private:
221 
224 
225 
227  boost::detail::atomic_count m_size;
228 
230  NodeFreeList m_free_list;
231 
233  QueueNodePtr m_head_ptr;
234 
236 #ifdef _MSC_VER
237  #pragma pack(8) /* force head_ and tail_ to different cache lines! */
238  QueueNodePtr m_tail_ptr;
239 #else
240  QueueNodePtr m_tail_ptr __attribute__((aligned(64))); /* force head_ and tail_ to different cache lines! */
241 #endif
242 };
243 
244 
245 
246 
247 #if 0
248 template <typename T,
253  boost::uint16_t MaxSize = 50000,
254  boost::uint32_t SleepMilliSec = 10 >
255 class PionLockFreeQueue :
256  private boost::noncopyable
257 {
258 protected:
259 
261  BOOST_STATIC_ASSERT(sizeof(boost::uint32_t) >= (sizeof(boost::uint16_t) * 2));
262 
264  union QueueNodePtr {
266  struct {
268  boost::uint16_t index;
270  boost::uint16_t counter;
271  } data;
273  boost::int32_t value;
274  };
275 
277  struct QueueNode {
279  QueueNode(void) { m_next.value = 0; }
281  T m_data;
283  volatile QueueNodePtr m_next;
284  };
285 
293  inline QueueNode& getQueueNode(QueueNodePtr node_ptr) {
294  return m_nodes[node_ptr.data.index];
295  }
296 
306  static inline bool cas(volatile QueueNodePtr& cur_ptr, QueueNodePtr old_ptr,
307  boost::uint16_t new_index)
308  {
309  QueueNodePtr new_ptr;
310  new_ptr.data.index = new_index;
311  new_ptr.data.counter = old_ptr.data.counter + 1;
312  return boost::lockfree::cas(&(cur_ptr.value), old_ptr.value, new_ptr.value);
313  }
314 
316  inline boost::uint16_t acquireNode(void) {
317  QueueNodePtr current_free_ptr;
318  boost::uint16_t new_free_index;
319  boost::uint16_t avail_index;
320 
321  while (true) {
322  while (true) {
323  // get current free_ptr value
324  current_free_ptr.value = m_free_ptr.value;
325  // check if current free_ptr value == 0
326  if (current_free_ptr.data.index > 0)
327  break;
328  // sleep while MaxSize is exceeded
329  boost::system_time wakeup_time = boost::get_system_time()
330  + boost::posix_time::millisec(SleepMilliSec);
331  boost::thread::sleep(wakeup_time);
332  }
333 
334  // prepare what will become the new free_ptr index value
335  new_free_index = current_free_ptr.data.index - 1;
336 
337  // optimisticly get the next available node index
338  avail_index = m_free_nodes[new_free_index];
339 
340  // use cas operation to update free_ptr value
341  if (avail_index != 0
342  && cas(m_free_ptr, current_free_ptr, new_free_index))
343  {
344  m_free_nodes[new_free_index] = 0;
345  break; // cas successful - all done!
346  }
347  }
348 
349  return avail_index;
350  }
351 
353  inline void releaseNode(const boost::uint16_t node_index) {
354  QueueNodePtr current_free_ptr;
355  boost::uint16_t new_free_index;
356 
357  while (true) {
358  // get current free_ptr value
359  current_free_ptr.value = m_free_ptr.value;
360 
361  // prepare what will become the new free_ptr index value
362  new_free_index = current_free_ptr.data.index + 1;
363 
364  // use cas operation to update free_ptr value
365  if (m_free_nodes[current_free_ptr.data.index] == 0
366  && cas(m_free_ptr, current_free_ptr, new_free_index))
367  {
368  // push the available index value into the next free position
369  m_free_nodes[current_free_ptr.data.index] = node_index;
370 
371  // all done!
372  break;
373  }
374  }
375  }
376 
377 
378 public:
379 
381  virtual ~PionLockFreeQueue() {}
382 
384  PionLockFreeQueue(void)
385  {
386  // point head and tail to the node at index 1 (0 is reserved for NULL)
387  m_head_ptr.data.index = m_tail_ptr.data.index = 1;
388  m_head_ptr.data.counter = m_tail_ptr.data.counter = 0;
389  // initialize free_ptr to zero
390  m_free_ptr.value = 0;
391  // initialize free_nodes to zero
392  for (boost::uint16_t n = 0; n < MaxSize; ++n)
393  m_free_nodes[n] = 0;
394  // initialize next values to zero
395  for (boost::uint16_t n = 0; n < MaxSize+2; ++n)
396  m_nodes[n].m_next.value = 0;
397  // push everything but the first two nodes into the available stack
398  for (boost::uint16_t n = 2; n < MaxSize+2; ++n)
399  releaseNode(n);
400  }
401 
403  inline bool empty(void) const { return m_free_ptr.data.index == 0; }
404 
406  inline boost::uint16_t size(void) const { return m_free_ptr.data.index; }
407 
413  inline void push(const T& t) {
414  // retrieve a new list node for the queue item
415  const boost::uint16_t node_index(acquireNode());
416 
417  // prepare it to be added to the list
418  QueueNode& node_ref = m_nodes[node_index];
419  node_ref.m_data = t;
420  node_ref.m_next.data.index = 0;
421 
422  // append node to the end of the list
423  QueueNodePtr tail_ptr;
424  QueueNodePtr next_ptr;
425  while (true) {
426  tail_ptr.value = m_tail_ptr.value;
427  next_ptr.value = getQueueNode(tail_ptr).m_next.value;
428  // make sure that the tail pointer has not changed since reading next
429  if (tail_ptr.value == m_tail_ptr.value) {
430  // check if tail was pointing to the last node
431  if (next_ptr.data.index == 0) {
432  // try to link the new node at the end of the list
433  if (cas(getQueueNode(tail_ptr).m_next, next_ptr, node_index))
434  break;
435  } else {
436  // try to swing tail to the next node
437  cas(m_tail_ptr, tail_ptr, next_ptr.data.index);
438  }
439  }
440  }
441 
442  // done with enqueue; try to swing tail to the inserted node
443  cas(m_tail_ptr, tail_ptr, node_index);
444  }
445 
453  inline bool pop(T& t) {
454  QueueNodePtr head_ptr;
455  QueueNodePtr tail_ptr;
456  QueueNodePtr next_ptr;
457 
458  while (true) {
459  // read current pointer values
460  head_ptr.value = m_head_ptr.value;
461  tail_ptr.value = m_tail_ptr.value;
462  next_ptr.value = getQueueNode(head_ptr).m_next.value;
463  // check consistency
464  if (head_ptr.value == m_head_ptr.value) {
465  // check if queue is empty, or tail is falling behind
466  if (head_ptr.data.index == tail_ptr.data.index) {
467  // is queue empty?
468  if (next_ptr.data.index == 0)
469  return false;
470  // not empty; try to advance tail to catch it up
471  cas(m_tail_ptr, tail_ptr, next_ptr.data.index);
472  } else {
473  // tail is OK
474  // read value before CAS, otherwise another dequeue might
475  // free the next node
476  t = getQueueNode(next_ptr).m_data;
477  // try to swing head to the next node
478  if (cas(m_head_ptr, head_ptr, next_ptr.data.index))
479  break; // success -> exit loop
480  }
481  }
482  }
483 
484  // item successfully retrieved
485  releaseNode(const_cast<boost::uint16_t&>(head_ptr.data.index));
486  return true;
487  }
488 
489 
490 private:
491 
493  boost::array<QueueNode, MaxSize+2> m_nodes;
494 
496  boost::array<volatile boost::uint16_t, MaxSize> m_free_nodes;
497 
499  volatile QueueNodePtr m_head_ptr;
500 
502  volatile QueueNodePtr m_tail_ptr;
503 
505  volatile QueueNodePtr m_free_ptr;
506 };
507 #endif
508 
509 } // end namespace pion
510 
511 #endif
data structure used to wrap each item in the queue
bool cas(tagged_ptr const &oldval, T *newptr)
virtual ~PionLockFreeQueue()
virtual destructor
QueueNode(void)
default constructor
PionLockFreeQueue(void)
constructs a new PionLockFreeQueue
bool empty(void) const
returns true if the queue is empty; false if it is not
boost::lockfree::tagged_ptr< QueueNode > next
points to the next node in the queue
QueueNode * createNode(void)
returns a new queue node item for use in the queue
volatile void clear(void)
boost::lockfree::tagged_ptr< QueueNode > QueueNodePtr
data type for an atomic QueueNode pointer
std::size_t size(void) const
returns the number of items that are currently in the queue
the following enables use of the lock-free cache
T data
data wrapped by the node item
void destroyNode(QueueNode *node_ptr)
frees memory for an existing queue node item
QueueNode(const T &d)
constructs QueueNode with a data value