Mixe for Privacy and Anonymity in the Internet
readerwriterqueue.h
Go to the documentation of this file.
1 // ©2013-2016 Cameron Desrochers.
2 // Distributed under the simplified BSD license (see the license file that
3 // should have come with this header).
4 
5 #pragma once
6 
7 #include "atomicops.h"
8 #include <type_traits>
9 #include <utility>
10 #include <cassert>
11 #include <stdexcept>
12 #include <new>
13 #include <cstdint>
14 #include <cstdlib> // For malloc/free/abort & size_t
15 #include <memory>
16 #if __cplusplus > 199711L || _MSC_VER >= 1700 // C++11 or VS2012
17 #include <chrono>
18 #endif
19 
20 
21 // A lock-free queue for a single-consumer, single-producer architecture.
22 // The queue is also wait-free in the common path (except if more memory
23 // needs to be allocated, in which case malloc is called).
24 // Allocates memory sparingly (O(lg(n) times, amortized), and only once if
25 // the original maximum size estimate is never exceeded.
26 // Tested on x86/x64 processors, but semantics should be correct for all
27 // architectures (given the right implementations in atomicops.h), provided
28 // that aligned integer and pointer accesses are naturally atomic.
29 // Note that there should only be one consumer thread and producer thread;
30 // Switching roles of the threads, or using multiple consecutive threads for
31 // one role, is not safe unless properly synchronized.
32 // Using the queue exclusively from one thread is fine, though a bit silly.
33 
34 #ifndef MOODYCAMEL_CACHE_LINE_SIZE
35 #define MOODYCAMEL_CACHE_LINE_SIZE 64
36 #endif
37 
38 #ifndef MOODYCAMEL_HAS_EMPLACE
39 #if !defined(_MSC_VER) || _MSC_VER >= 1800 // variadic templates: either a non-MS compiler or VS >= 2013
40 #define MOODYCAMEL_HAS_EMPLACE 1
41 #endif
42 #endif
43 
44 #ifdef AE_VCPP
45 #pragma warning(push)
46 #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
47 #pragma warning(disable: 4820) // padding was added
48 #pragma warning(disable: 4127) // conditional expression is constant
49 #endif
50 
51 #define MAX_BLOCK_SIZE 512
53 {
54  // Design: Based on a queue-of-queues. The low-level queues are just
55  // circular buffers with front and tail indices indicating where the
56  // next element to dequeue is and where the next element can be enqueued,
57  // respectively. Each low-level queue is called a "block". Each block
58  // wastes exactly one element's worth of space to keep the design simple
59  // (if front == tail then the queue is empty, and can't be full).
60  // The high-level queue is a circular linked list of blocks; again there
61  // is a front and tail, but this time they are pointers to the blocks.
62  // The front block is where the next element to be dequeued is, provided
63  // the block is not empty. The back block is where elements are to be
64  // enqueued, provided the block is not full.
65  // The producer thread owns all the tail indices/pointers. The consumer
66  // thread owns all the front indices/pointers. Both threads read each
67  // other's variables, but only the owning thread updates them. E.g. After
68  // the consumer reads the producer's tail, the tail may change before the
69  // consumer is done dequeuing an object, but the consumer knows the tail
70  // will never go backwards, only forwards.
71  // If there is no room to enqueue an object, an additional block (of
72  // equal size to the last block) is added. Blocks are never removed.
73 
74 public:
76 
77  // Constructs a queue that can hold maxSize elements without further
78  // allocations. If more than MAX_BLOCK_SIZE elements are requested,
79  // then several blocks of MAX_BLOCK_SIZE each are reserved (including
80  // at least one extra buffer block).
81  AE_NO_TSAN explicit ReaderWriterQueue(size_t maxSize = 15)
82  {
83  assert(maxSize > 0);
84  assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
85  assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
86 
87  Block* firstBlock = nullptr;
88 
89  largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block
90  if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
91  // We need a spare block in case the producer is writing to a different block the consumer is reading from, and
92  // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the ambiguity
93  // between front == tail meaning "empty" and "full".
94  // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the
95  // number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying):
96  size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
98  Block* lastBlock = nullptr;
99  for (size_t i = 0; i != initialBlockCount; ++i) {
101  if (block == nullptr) {
102  abort();
103  }
104  if (firstBlock == nullptr) {
105  firstBlock = block;
106  }
107  else {
108  lastBlock->next = block;
109  }
110  lastBlock = block;
111  block->next = firstBlock;
112  }
113  }
114  else {
115  firstBlock = make_block(largestBlockSize);
116  if (firstBlock == nullptr) {
117  abort();
118  }
119  firstBlock->next = firstBlock;
120  }
121  frontBlock = firstBlock;
122  tailBlock = firstBlock;
123 
124  // Make sure the reader/writer threads will have the initialized memory setup above:
126  }
127 
128 
129 
130  // Note: The queue should not be accessed concurrently while it's
131  // being deleted. It's up to the user to synchronize this.
133  {
134  // Make sure we get the latest version of all variables from other CPUs:
136 
137  // Destroy any remaining objects in queue and free memory
138  Block* frontBlock_ = frontBlock;
139  Block* block = frontBlock_;
140  do {
141  Block* nextBlock = block->next;
142  UINT8* rawBlock = block->rawThis;
143  block->~Block();
144  std::free(rawBlock);
145  block = nextBlock;
146  } while (block != frontBlock_);
147  }
148 
149 
150 
151 
152 
153  // Enqueues a copy of element on the queue.
154  // Allocates an additional block of memory if needed.
155  // Only fails (returns false) if memory allocation fails.
156  AE_FORCEINLINE bool enqueue(UINT8* const& element) AE_NO_TSAN
157  {
158  return inner_enqueue(CanAlloc,element);
159  }
160 
161 
162  // Attempts to dequeue an element; if the queue is empty,
163  // returns false instead. If the queue has at least one element,
164  // moves front to result using operator=, then returns true.
165  bool try_dequeue(UINT8*& result) AE_NO_TSAN
166  {
167 
168  // High-level pseudocode:
169  // Remember where the tail block is
170  // If the front block has an element in it, dequeue it
171  // Else
172  // If front block was the tail block when we entered the function, return false
173  // Else advance to next block and dequeue the item there
174 
175  // Note that we have to use the value of the tail block from before we check if the front
176  // block is full or not, in case the front block is empty and then, before we check if the
177  // tail block is at the front block or not, the producer fills up the front block *and
178  // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
179  // reproducible in practice.
180  // In order to avoid overhead in the common case, though, we do a double-checked pattern
181  // where we have the fast path if the front block is not empty, then read the tail block,
182  // then re-read the front block and check if it's not empty again, then check if the tail
183  // block has advanced.
184 
185  Block* frontBlock_ = frontBlock.load();
186  size_t blockTail = frontBlock_->localTail;
187  size_t blockFront = frontBlock_->front.load();
188 
189  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
191 
192  non_empty_front_block:
193  // Front block not empty, dequeue from here
194  result = frontBlock_->data[blockFront];
195  //element->~T();
196 
197  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
198 
200  frontBlock_->front = blockFront;
201  }
202  else if (frontBlock_ != tailBlock.load()) {
204 
205  frontBlock_ = frontBlock.load();
206  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
207  blockFront = frontBlock_->front.load();
209 
210  if (blockFront != blockTail) {
211  // Oh look, the front block isn't empty after all
212  goto non_empty_front_block;
213  }
214 
215  // Front block is empty but there's another block ahead, advance to it
216  Block* nextBlock = frontBlock_->next;
217  // Don't need an acquire fence here since next can only ever be set on the tailBlock,
218  // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
219  // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
220 
221  size_t nextBlockFront = nextBlock->front.load();
222  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
224 
225  // Since the tailBlock is only ever advanced after being written to,
226  // we know there's for sure an element to dequeue on it
227  assert(nextBlockFront != nextBlockTail);
228  AE_UNUSED(nextBlockTail);
229 
230  // We're done with this block, let the producer use it if it needs
231  fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
232  frontBlock = frontBlock_ = nextBlock;
233 
234  compiler_fence(memory_order_release); // Not strictly needed
235 
236  result = frontBlock_->data [nextBlockFront];
237 
238  //element->~T();
239 
240  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
241 
243  frontBlock_->front = nextBlockFront;
244  }
245  else {
246  // No elements in current block and no other block to advance to
247  return false;
248  }
249 
250  return true;
251  }
252 
253 
254  // Returns a pointer to the front element in the queue (the one that
255  // would be removed next by a call to `try_dequeue` or `pop`). If the
256  // queue appears empty at the time the method is called, nullptr is
257  // returned instead.
258  // Must be called only from the consumer thread.
260  {
261  // See try_dequeue() for reasoning
262 
263  Block* frontBlock_ = frontBlock.load();
264  size_t blockTail = frontBlock_->localTail;
265  size_t blockFront = frontBlock_->front.load();
266 
267  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
269  non_empty_front_block:
270  return reinterpret_cast<UINT8*>(frontBlock_->data + blockFront * sizeof(UINT8*));
271  }
272  else if (frontBlock_ != tailBlock.load()) {
274  frontBlock_ = frontBlock.load();
275  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
276  blockFront = frontBlock_->front.load();
278 
279  if (blockFront != blockTail) {
280  goto non_empty_front_block;
281  }
282 
283  Block* nextBlock = frontBlock_->next;
284 
285  size_t nextBlockFront = nextBlock->front.load();
287 
288  assert(nextBlockFront != nextBlock->tail.load());
289  return reinterpret_cast<UINT8*>(nextBlock->data + nextBlockFront * sizeof(UINT8*));
290  }
291 
292  return nullptr;
293  }
294 
295  // Removes the front element from the queue, if any, without returning it.
296  // Returns true on success, or false if the queue appeared empty at the time
297  // `pop` was called.
299  {
300  // See try_dequeue() for reasoning
301 
302  Block* frontBlock_ = frontBlock.load();
303  size_t blockTail = frontBlock_->localTail;
304  size_t blockFront = frontBlock_->front.load();
305 
306  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
308 
309  non_empty_front_block:
310  auto element = reinterpret_cast<UINT8*>(frontBlock_->data + blockFront * sizeof(UINT8));
311  //element->~T();
312 
313  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
314 
316  frontBlock_->front = blockFront;
317  }
318  else if (frontBlock_ != tailBlock.load()) {
320  frontBlock_ = frontBlock.load();
321  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
322  blockFront = frontBlock_->front.load();
324 
325  if (blockFront != blockTail) {
326  goto non_empty_front_block;
327  }
328 
329  // Front block is empty but there's another block ahead, advance to it
330  Block* nextBlock = frontBlock_->next;
331 
332  size_t nextBlockFront = nextBlock->front.load();
333  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
335 
336  assert(nextBlockFront != nextBlockTail);
337  AE_UNUSED(nextBlockTail);
338 
340  frontBlock = frontBlock_ = nextBlock;
341 
343 
344  auto element = reinterpret_cast<UINT8*>(frontBlock_->data + nextBlockFront * sizeof(UINT8*));
345 // element->~T();
346 
347  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
348 
350  frontBlock_->front = nextBlockFront;
351  }
352  else {
353  // No elements in current block and no other block to advance to
354  return false;
355  }
356 
357  return true;
358  }
359 
360  // Returns the approximate number of items currently in the queue.
361  // Safe to call from both the producer and consumer threads.
362  inline size_t size_approx() const AE_NO_TSAN
363  {
364  size_t result = 0;
365  Block* frontBlock_ = frontBlock.load();
366  Block* block = frontBlock_;
367  do {
369  size_t blockFront = block->front.load();
370  size_t blockTail = block->tail.load();
371  result += (blockTail - blockFront) & block->sizeMask;
372  block = block->next.load();
373  } while (block != frontBlock_);
374  return result;
375  }
376 
377 
378 private:
380 
381  bool inner_enqueue(AllocationMode canAlloc,UINT8* const& element) AE_NO_TSAN
382  {
383 
384  // High-level pseudocode (assuming we're allowed to alloc a new block):
385  // If room in tail block, add to tail
386  // Else check next block
387  // If next block is not the head block, enqueue on next block
388  // Else create a new block and enqueue there
389  // Advance tail to the block we just enqueued to
390 
391  Block* tailBlock_ = tailBlock.load();
392  size_t blockFront = tailBlock_->localFront;
393  size_t blockTail = tailBlock_->tail.load();
394 
395  size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
396  if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
398  // This block has room for at least one more element
399  tailBlock_->data[blockTail]= element;
400 
401 
403  tailBlock_->tail = nextBlockTail;
404  }
405  else {
407  if (tailBlock_->next.load() != frontBlock) {
408  // Note that the reason we can't advance to the frontBlock and start adding new entries there
409  // is because if we did, then dequeue would stay in that block, eventually reading the new values,
410  // instead of advancing to the next full block (whose values were enqueued first and so should be
411  // consumed first).
412 
413  fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
414 
415  // tailBlock is full, but there's a free block ahead, use it
416  Block* tailBlockNext = tailBlock_->next.load();
417  size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
418  nextBlockTail = tailBlockNext->tail.load();
420 
421  // This block must be empty since it's not the head block and we
422  // go through the blocks in a circle
423  assert(nextBlockFront == nextBlockTail);
424  tailBlockNext->localFront = nextBlockFront;
425 
426  tailBlockNext->data[nextBlockTail] = element;
427 
428  tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
429 
431  tailBlock = tailBlockNext;
432  }
433  else if (canAlloc == CanAlloc) {
434  // tailBlock is full and there's no free block ahead; create a new block
435  auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
436  auto newBlock = make_block(newBlockSize);
437  if (newBlock == nullptr) {
438  // Could not allocate a block!
439  return false;
440  }
441  largestBlockSize = newBlockSize;
442 
443 
444  newBlock->data[0] = element;
445  assert(newBlock->front == 0);
446  newBlock->tail = newBlock->localTail = 1;
447 
448  newBlock->next = tailBlock_->next.load();
449  tailBlock_->next = newBlock;
450 
451  // Might be possible for the dequeue thread to see the new tailBlock->next
452  // *without* seeing the new tailBlock value, but this is OK since it can't
453  // advance to the next block until tailBlock is set anyway (because the only
454  // case where it could try to read the next is if it's already at the tailBlock,
455  // and it won't advance past tailBlock in any circumstance).
456 
458  tailBlock = newBlock;
459  }
460  else if (canAlloc == CannotAlloc) {
461  // Would have had to allocate a new block to enqueue, but not allowed
462  return false;
463  }
464  else {
465  assert(false && "Should be unreachable code");
466  return false;
467  }
468  }
469 
470  return true;
471  }
472 
473 
474 
475  AE_FORCEINLINE static size_t ceilToPow2(size_t x)
476  {
477  // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
478  --x;
479  x |= x >> 1;
480  x |= x >> 2;
481  x |= x >> 4;
482  for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
483  x |= x >> (i << 3);
484  }
485  ++x;
486  return x;
487  }
488 
489  template<typename U>
491  {
492  const std::size_t alignment = std::alignment_of<U>::value;
493  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
494  }
495 
496 private:
497 
498 
499  struct Block
500  {
501  // Avoid false-sharing by putting highly contended variables on their own cache lines
502  weak_atomic<size_t> front; // (Atomic) Elements are read from here
503  size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
504 
506  weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
507  size_t localFront;
508 
509  char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)]; // next isn't very contended, but we don't want it on the same cache line as tail (which is)
511 
512  UINT8** data; // Contents (on heap) are aligned to T's alignment
513 
514  const size_t sizeMask;
515 
516 
517  // size must be a power of two (and greater than 0)
518  AE_NO_TSAN Block(size_t const& _size, UINT8* _rawThis, UINT8* _data)
519  : front(0), localTail(0), tail(0), localFront(0), next(nullptr), data((UINT8**)_data), sizeMask(_size - 1), rawThis(_rawThis)
520  {
521  }
522 
523  private:
524  // C4512 - Assignment operator could not be generated
526 
527  public:
529  };
530 
531 
532  static Block* make_block(size_t capacity) AE_NO_TSAN
533  {
534  // Allocate enough memory for the block itself, as well as all the elements it will contain
535  UINT size = sizeof(Block) + std::alignment_of<Block>::value - 1;
536  size += sizeof(UINT8*) * capacity + std::alignment_of<UINT8*>::value - 1;
537  UINT8* newBlockRaw = static_cast<UINT8*>(std::malloc(size));
538  if (newBlockRaw == nullptr) {
539  return nullptr;
540  }
541 
542  UINT8* newBlockAligned = align_for<Block>(newBlockRaw);
543  UINT8* newBlockData = align_for<UINT8*>(newBlockAligned + sizeof(Block));
544  return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
545  }
546 
547 private:
548  weak_atomic<Block*> frontBlock; // (Atomic) Elements are enqueued to this block
549 
551  weak_atomic<Block*> tailBlock; // (Atomic) Elements are dequeued from this block
552 
554 
555 };
556 
557 // Like ReaderWriterQueue, but also providees blocking operations
559 {
560 
561 public:
562  explicit BlockingReaderWriterQueue(size_t maxSize = 15) AE_NO_TSAN
563  : inner(maxSize), sema(new spsc_sema::LightweightSemaphore())
564  { }
565 
567  : inner(std::move(other.inner)), sema(std::move(other.sema))
568  { }
569 
571  {
572  std::swap(sema, other.sema);
573  std::swap(inner, other.inner);
574  return *this;
575  }
576 
577  // Enqueues a copy of element on the queue.
578  // Allocates an additional block of memory if needed.
579  // Only fails (returns false) if memory allocation fails.
580  AE_FORCEINLINE bool enqueue(UINT8* const& element) AE_NO_TSAN
581  {
582  if (inner.enqueue(element)) {
583  sema->signal();
584  return true;
585  }
586  return false;
587  }
588 
589 
590 
591  // Attempts to dequeue an element; if the queue is empty,
592  // returns false instead. If the queue has at least one element,
593  // moves front to result using operator=, then returns true.
594  bool try_dequeue(UINT8*& result) AE_NO_TSAN
595  {
596  if (sema->tryWait()) {
597  bool success = inner.try_dequeue(result);
598  assert(success);
599  AE_UNUSED(success);
600  return true;
601  }
602  return false;
603  }
604 
605 
606  // Attempts to dequeue an element; if the queue is empty,
607  // waits until an element is available, then dequeues it.
609  {
610  sema->wait();
611  bool success = inner.try_dequeue(result);
612  AE_UNUSED(result);
613  assert(success);
614  AE_UNUSED(success);
615  }
616 
617 
618  // Attempts to dequeue an element; if the queue is empty,
619  // waits until an element is available up to the specified timeout,
620  // then dequeues it and returns true, or returns false if the timeout
621  // expires before an element can be dequeued.
622  // Using a negative timeout indicates an indefinite timeout,
623  // and is thus functionally equivalent to calling wait_dequeue.
624  bool wait_dequeue_timed(UINT8*& result, std::int64_t timeout_usecs) AE_NO_TSAN
625  {
626  if (!sema->wait(timeout_usecs)) {
627  return false;
628  }
629  bool success = inner.try_dequeue(result);
630  AE_UNUSED(result);
631  assert(success);
632  AE_UNUSED(success);
633  return true;
634  }
635 
636 
637 #if __cplusplus > 199711L || _MSC_VER >= 1700
638  // Attempts to dequeue an element; if the queue is empty,
639  // waits until an element is available up to the specified timeout,
640  // then dequeues it and returns true, or returns false if the timeout
641  // expires before an element can be dequeued.
642  // Using a negative timeout indicates an indefinite timeout,
643  // and is thus functionally equivalent to calling wait_dequeue.
644  template<typename Rep, typename Period>
645  inline bool wait_dequeue_timed(UINT8*& result, std::chrono::duration<Rep, Period> const& timeout) AE_NO_TSAN
646  {
647  return wait_dequeue_timed(result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
648  }
649 #endif
650 
651 
652  // Returns a pointer to the front element in the queue (the one that
653  // would be removed next by a call to `try_dequeue` or `pop`). If the
654  // queue appears empty at the time the method is called, nullptr is
655  // returned instead.
656  // Must be called only from the consumer thread.
658  {
659  return inner.peek();
660  }
661 
662  // Removes the front element from the queue, if any, without returning it.
663  // Returns true on success, or false if the queue appeared empty at the time
664  // `pop` was called.
666  {
667  if (sema->tryWait()) {
668  bool result = inner.pop();
669  assert(result);
670  AE_UNUSED(result);
671  return true;
672  }
673  return false;
674  }
675 
676  // Returns the approximate number of items currently in the queue.
677  // Safe to call from both the producer and consumer threads.
679  {
680  return sema->availableApprox();
681  }
682 
683 
684 
685 private:
687  std::unique_ptr<spsc_sema::LightweightSemaphore> sema;
688 };
689 
690 
691 #ifdef AE_VCPP
692 #pragma warning(pop)
693 #endif
@ memory_order_sync
Definition: atomicops.h:91
@ memory_order_release
Definition: atomicops.h:85
@ memory_order_acquire
Definition: atomicops.h:84
#define AE_UNUSED(x)
Definition: atomicops.h:43
AE_FORCEINLINE void fence(memory_order order) AE_NO_TSAN
Definition: atomicops.h:203
AE_FORCEINLINE void compiler_fence(memory_order order) AE_NO_TSAN
Definition: atomicops.h:191
#define AE_FORCEINLINE
Definition: atomicops.h:64
#define AE_NO_TSAN
Definition: atomicops.h:53
unsigned int UINT
Definition: basetypedefs.h:155
unsigned char UINT8
Definition: basetypedefs.h:135
BlockingReaderWriterQueue(BlockingReaderWriterQueue &&other) AE_NO_TSAN
std::unique_ptr< spsc_sema::LightweightSemaphore > sema
AE_FORCEINLINE bool enqueue(UINT8 *const &element) AE_NO_TSAN
AE_FORCEINLINE UINT8 * peek() AE_NO_TSAN
bool wait_dequeue_timed(UINT8 *&result, std::int64_t timeout_usecs) AE_NO_TSAN
AE_FORCEINLINE bool pop() AE_NO_TSAN
BlockingReaderWriterQueue & operator=(BlockingReaderWriterQueue &&other) AE_NO_TSAN
AE_FORCEINLINE size_t size_approx() const AE_NO_TSAN
BlockingReaderWriterQueue(size_t maxSize=15) AE_NO_TSAN
void wait_dequeue(UINT8 *&result) AE_NO_TSAN
bool try_dequeue(UINT8 *&result) AE_NO_TSAN
char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic< Block * >)]
bool pop() AE_NO_TSAN
static AE_FORCEINLINE UINT8 * align_for(UINT8 *ptr) AE_NO_TSAN
weak_atomic< Block * > tailBlock
AE_FORCEINLINE bool enqueue(UINT8 *const &element) AE_NO_TSAN
AE_NO_TSAN ReaderWriterQueue(size_t maxSize=15)
AE_NO_TSAN ~ReaderWriterQueue()
bool try_dequeue(UINT8 *&result) AE_NO_TSAN
static Block * make_block(size_t capacity) AE_NO_TSAN
bool inner_enqueue(AllocationMode canAlloc, UINT8 *const &element) AE_NO_TSAN
size_t size_approx() const AE_NO_TSAN
static AE_FORCEINLINE size_t ceilToPow2(size_t x)
UINT8 * peek() AE_NO_TSAN
weak_atomic< Block * > frontBlock
AE_FORCEINLINE T load() const AE_NO_TSAN
Definition: atomicops.h:302
#define MOODYCAMEL_CACHE_LINE_SIZE
#define MAX_BLOCK_SIZE
char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic< size_t >) - sizeof(size_t)]
weak_atomic< size_t > front
weak_atomic< Block * > next
Block & operator=(Block const &)
char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic< size_t >) - sizeof(size_t)]
weak_atomic< size_t > tail
AE_NO_TSAN Block(size_t const &_size, UINT8 *_rawThis, UINT8 *_data)