Mixe for Privacy and Anonymity in the Internet
Classes | Public Member Functions | Public Attributes | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
ReaderWriterQueue Class Reference

#include <readerwriterqueue.h>

Collaboration diagram for ReaderWriterQueue:

Classes

struct  Block
 

Public Member Functions

AE_NO_TSAN ReaderWriterQueue (size_t maxSize=15)
 
AE_NO_TSAN ~ReaderWriterQueue ()
 
AE_FORCEINLINE bool enqueue (UINT8 *const &element) AE_NO_TSAN
 
bool try_dequeue (UINT8 *&result) AE_NO_TSAN
 
UINT8peek () AE_NO_TSAN
 
bool pop () AE_NO_TSAN
 
size_t size_approx () const AE_NO_TSAN
 

Public Attributes

UINT8value_type
 

Private Types

enum  AllocationMode { CanAlloc , CannotAlloc }
 

Private Member Functions

bool inner_enqueue (AllocationMode canAlloc, UINT8 *const &element) AE_NO_TSAN
 

Static Private Member Functions

static AE_FORCEINLINE size_t ceilToPow2 (size_t x)
 
template<typename U >
static AE_FORCEINLINE UINT8align_for (UINT8 *ptr) AE_NO_TSAN
 
static Blockmake_block (size_t capacity) AE_NO_TSAN
 

Private Attributes

weak_atomic< Block * > frontBlock
 
char cachelineFiller [MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic< Block * >)]
 
weak_atomic< Block * > tailBlock
 
size_t largestBlockSize
 

Detailed Description

Definition at line 52 of file readerwriterqueue.h.

Member Enumeration Documentation

◆ AllocationMode

Enumerator
CanAlloc 
CannotAlloc 

Definition at line 379 of file readerwriterqueue.h.

Constructor & Destructor Documentation

◆ ReaderWriterQueue()

AE_NO_TSAN ReaderWriterQueue::ReaderWriterQueue ( size_t  maxSize = 15)
inlineexplicit

Definition at line 81 of file readerwriterqueue.h.

82  {
83  assert(maxSize > 0);
84  assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
85  assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
86 
87  Block* firstBlock = nullptr;
88 
89  largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block
90  if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
91  // We need a spare block in case the producer is writing to a different block the consumer is reading from, and
92  // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the ambiguity
93  // between front == tail meaning "empty" and "full".
94  // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the
95  // number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying):
96  size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
98  Block* lastBlock = nullptr;
99  for (size_t i = 0; i != initialBlockCount; ++i) {
100  Block* block = make_block(largestBlockSize);
101  if (block == nullptr) {
102  abort();
103  }
104  if (firstBlock == nullptr) {
105  firstBlock = block;
106  }
107  else {
108  lastBlock->next = block;
109  }
110  lastBlock = block;
111  block->next = firstBlock;
112  }
113  }
114  else {
115  firstBlock = make_block(largestBlockSize);
116  if (firstBlock == nullptr) {
117  abort();
118  }
119  firstBlock->next = firstBlock;
120  }
121  frontBlock = firstBlock;
122  tailBlock = firstBlock;
123 
124  // Make sure the reader/writer threads will have the initialized memory setup above:
126  }
@ memory_order_sync
Definition: atomicops.h:91
AE_FORCEINLINE void fence(memory_order order) AE_NO_TSAN
Definition: atomicops.h:203
weak_atomic< Block * > tailBlock
static Block * make_block(size_t capacity) AE_NO_TSAN
static AE_FORCEINLINE size_t ceilToPow2(size_t x)
weak_atomic< Block * > frontBlock
#define MAX_BLOCK_SIZE

References ceilToPow2(), fence(), frontBlock, largestBlockSize, make_block(), MAX_BLOCK_SIZE, memory_order_sync, ReaderWriterQueue::Block::next, and tailBlock.

Here is the call graph for this function:

◆ ~ReaderWriterQueue()

AE_NO_TSAN ReaderWriterQueue::~ReaderWriterQueue ( )
inline

Definition at line 132 of file readerwriterqueue.h.

133  {
134  // Make sure we get the latest version of all variables from other CPUs:
136 
137  // Destroy any remaining objects in queue and free memory
138  Block* frontBlock_ = frontBlock;
139  Block* block = frontBlock_;
140  do {
141  Block* nextBlock = block->next;
142  UINT8* rawBlock = block->rawThis;
143  block->~Block();
144  std::free(rawBlock);
145  block = nextBlock;
146  } while (block != frontBlock_);
147  }
unsigned char UINT8
Definition: basetypedefs.h:135

References fence(), frontBlock, memory_order_sync, ReaderWriterQueue::Block::next, and ReaderWriterQueue::Block::rawThis.

Here is the call graph for this function:

Member Function Documentation

◆ align_for()

template<typename U >
static AE_FORCEINLINE UINT8* ReaderWriterQueue::align_for ( UINT8 ptr)
inlinestaticprivate

Definition at line 490 of file readerwriterqueue.h.

491  {
492  const std::size_t alignment = std::alignment_of<U>::value;
493  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
494  }

◆ ceilToPow2()

static AE_FORCEINLINE size_t ReaderWriterQueue::ceilToPow2 ( size_t  x)
inlinestaticprivate

Definition at line 475 of file readerwriterqueue.h.

476  {
477  // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
478  --x;
479  x |= x >> 1;
480  x |= x >> 2;
481  x |= x >> 4;
482  for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
483  x |= x >> (i << 3);
484  }
485  ++x;
486  return x;
487  }

Referenced by ReaderWriterQueue().

Here is the caller graph for this function:

◆ enqueue()

AE_FORCEINLINE bool ReaderWriterQueue::enqueue ( UINT8 *const &  element)
inline

Definition at line 156 of file readerwriterqueue.h.

157  {
158  return inner_enqueue(CanAlloc,element);
159  }
bool inner_enqueue(AllocationMode canAlloc, UINT8 *const &element) AE_NO_TSAN

References CanAlloc, and inner_enqueue().

Referenced by BlockingReaderWriterQueue::enqueue().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ inner_enqueue()

bool ReaderWriterQueue::inner_enqueue ( AllocationMode  canAlloc,
UINT8 *const &  element 
)
inlineprivate

Definition at line 381 of file readerwriterqueue.h.

382  {
383 
384  // High-level pseudocode (assuming we're allowed to alloc a new block):
385  // If room in tail block, add to tail
386  // Else check next block
387  // If next block is not the head block, enqueue on next block
388  // Else create a new block and enqueue there
389  // Advance tail to the block we just enqueued to
390 
391  Block* tailBlock_ = tailBlock.load();
392  size_t blockFront = tailBlock_->localFront;
393  size_t blockTail = tailBlock_->tail.load();
394 
395  size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
396  if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
398  // This block has room for at least one more element
399  tailBlock_->data[blockTail]= element;
400 
401 
403  tailBlock_->tail = nextBlockTail;
404  }
405  else {
407  if (tailBlock_->next.load() != frontBlock) {
408  // Note that the reason we can't advance to the frontBlock and start adding new entries there
409  // is because if we did, then dequeue would stay in that block, eventually reading the new values,
410  // instead of advancing to the next full block (whose values were enqueued first and so should be
411  // consumed first).
412 
413  fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
414 
415  // tailBlock is full, but there's a free block ahead, use it
416  Block* tailBlockNext = tailBlock_->next.load();
417  size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
418  nextBlockTail = tailBlockNext->tail.load();
420 
421  // This block must be empty since it's not the head block and we
422  // go through the blocks in a circle
423  assert(nextBlockFront == nextBlockTail);
424  tailBlockNext->localFront = nextBlockFront;
425 
426  tailBlockNext->data[nextBlockTail] = element;
427 
428  tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
429 
431  tailBlock = tailBlockNext;
432  }
433  else if (canAlloc == CanAlloc) {
434  // tailBlock is full and there's no free block ahead; create a new block
435  auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
436  auto newBlock = make_block(newBlockSize);
437  if (newBlock == nullptr) {
438  // Could not allocate a block!
439  return false;
440  }
441  largestBlockSize = newBlockSize;
442 
443 
444  newBlock->data[0] = element;
445  assert(newBlock->front == 0);
446  newBlock->tail = newBlock->localTail = 1;
447 
448  newBlock->next = tailBlock_->next.load();
449  tailBlock_->next = newBlock;
450 
451  // Might be possible for the dequeue thread to see the new tailBlock->next
452  // *without* seeing the new tailBlock value, but this is OK since it can't
453  // advance to the next block until tailBlock is set anyway (because the only
454  // case where it could try to read the next is if it's already at the tailBlock,
455  // and it won't advance past tailBlock in any circumstance).
456 
458  tailBlock = newBlock;
459  }
460  else if (canAlloc == CannotAlloc) {
461  // Would have had to allocate a new block to enqueue, but not allowed
462  return false;
463  }
464  else {
465  assert(false && "Should be unreachable code");
466  return false;
467  }
468  }
469 
470  return true;
471  }
@ memory_order_release
Definition: atomicops.h:85
@ memory_order_acquire
Definition: atomicops.h:84

References CanAlloc, CannotAlloc, ReaderWriterQueue::Block::data, fence(), ReaderWriterQueue::Block::front, frontBlock, largestBlockSize, weak_atomic< T >::load(), ReaderWriterQueue::Block::localFront, make_block(), MAX_BLOCK_SIZE, memory_order_acquire, memory_order_release, ReaderWriterQueue::Block::next, ReaderWriterQueue::Block::sizeMask, ReaderWriterQueue::Block::tail, and tailBlock.

Referenced by enqueue().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ make_block()

static Block* ReaderWriterQueue::make_block ( size_t  capacity)
inlinestaticprivate

Definition at line 532 of file readerwriterqueue.h.

533  {
534  // Allocate enough memory for the block itself, as well as all the elements it will contain
535  UINT size = sizeof(Block) + std::alignment_of<Block>::value - 1;
536  size += sizeof(UINT8*) * capacity + std::alignment_of<UINT8*>::value - 1;
537  UINT8* newBlockRaw = static_cast<UINT8*>(std::malloc(size));
538  if (newBlockRaw == nullptr) {
539  return nullptr;
540  }
541 
542  UINT8* newBlockAligned = align_for<Block>(newBlockRaw);
543  UINT8* newBlockData = align_for<UINT8*>(newBlockAligned + sizeof(Block));
544  return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
545  }
unsigned int UINT
Definition: basetypedefs.h:155

Referenced by inner_enqueue(), and ReaderWriterQueue().

Here is the caller graph for this function:

◆ peek()

UINT8* ReaderWriterQueue::peek ( )
inline

Definition at line 259 of file readerwriterqueue.h.

260  {
261  // See try_dequeue() for reasoning
262 
263  Block* frontBlock_ = frontBlock.load();
264  size_t blockTail = frontBlock_->localTail;
265  size_t blockFront = frontBlock_->front.load();
266 
267  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
269  non_empty_front_block:
270  return reinterpret_cast<UINT8*>(frontBlock_->data + blockFront * sizeof(UINT8*));
271  }
272  else if (frontBlock_ != tailBlock.load()) {
274  frontBlock_ = frontBlock.load();
275  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
276  blockFront = frontBlock_->front.load();
278 
279  if (blockFront != blockTail) {
280  goto non_empty_front_block;
281  }
282 
283  Block* nextBlock = frontBlock_->next;
284 
285  size_t nextBlockFront = nextBlock->front.load();
287 
288  assert(nextBlockFront != nextBlock->tail.load());
289  return reinterpret_cast<UINT8*>(nextBlock->data + nextBlockFront * sizeof(UINT8*));
290  }
291 
292  return nullptr;
293  }

References ReaderWriterQueue::Block::data, fence(), ReaderWriterQueue::Block::front, frontBlock, weak_atomic< T >::load(), ReaderWriterQueue::Block::localTail, memory_order_acquire, ReaderWriterQueue::Block::next, ReaderWriterQueue::Block::tail, and tailBlock.

Referenced by BlockingReaderWriterQueue::peek().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ pop()

bool ReaderWriterQueue::pop ( )
inline

Definition at line 298 of file readerwriterqueue.h.

299  {
300  // See try_dequeue() for reasoning
301 
302  Block* frontBlock_ = frontBlock.load();
303  size_t blockTail = frontBlock_->localTail;
304  size_t blockFront = frontBlock_->front.load();
305 
306  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
308 
309  non_empty_front_block:
310  auto element = reinterpret_cast<UINT8*>(frontBlock_->data + blockFront * sizeof(UINT8));
311  //element->~T();
312 
313  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
314 
316  frontBlock_->front = blockFront;
317  }
318  else if (frontBlock_ != tailBlock.load()) {
320  frontBlock_ = frontBlock.load();
321  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
322  blockFront = frontBlock_->front.load();
324 
325  if (blockFront != blockTail) {
326  goto non_empty_front_block;
327  }
328 
329  // Front block is empty but there's another block ahead, advance to it
330  Block* nextBlock = frontBlock_->next;
331 
332  size_t nextBlockFront = nextBlock->front.load();
333  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
335 
336  assert(nextBlockFront != nextBlockTail);
337  AE_UNUSED(nextBlockTail);
338 
340  frontBlock = frontBlock_ = nextBlock;
341 
343 
344  auto element = reinterpret_cast<UINT8*>(frontBlock_->data + nextBlockFront * sizeof(UINT8*));
345 // element->~T();
346 
347  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
348 
350  frontBlock_->front = nextBlockFront;
351  }
352  else {
353  // No elements in current block and no other block to advance to
354  return false;
355  }
356 
357  return true;
358  }
#define AE_UNUSED(x)
Definition: atomicops.h:43
AE_FORCEINLINE void compiler_fence(memory_order order) AE_NO_TSAN
Definition: atomicops.h:191

References AE_UNUSED, compiler_fence(), ReaderWriterQueue::Block::data, fence(), ReaderWriterQueue::Block::front, frontBlock, weak_atomic< T >::load(), ReaderWriterQueue::Block::localTail, memory_order_acquire, memory_order_release, ReaderWriterQueue::Block::next, ReaderWriterQueue::Block::sizeMask, ReaderWriterQueue::Block::tail, and tailBlock.

Referenced by BlockingReaderWriterQueue::pop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ size_approx()

size_t ReaderWriterQueue::size_approx ( ) const
inline

Definition at line 362 of file readerwriterqueue.h.

363  {
364  size_t result = 0;
365  Block* frontBlock_ = frontBlock.load();
366  Block* block = frontBlock_;
367  do {
369  size_t blockFront = block->front.load();
370  size_t blockTail = block->tail.load();
371  result += (blockTail - blockFront) & block->sizeMask;
372  block = block->next.load();
373  } while (block != frontBlock_);
374  return result;
375  }

References fence(), ReaderWriterQueue::Block::front, frontBlock, weak_atomic< T >::load(), memory_order_acquire, ReaderWriterQueue::Block::next, ReaderWriterQueue::Block::sizeMask, and ReaderWriterQueue::Block::tail.

Here is the call graph for this function:

◆ try_dequeue()

bool ReaderWriterQueue::try_dequeue ( UINT8 *&  result)
inline

Definition at line 165 of file readerwriterqueue.h.

166  {
167 
168  // High-level pseudocode:
169  // Remember where the tail block is
170  // If the front block has an element in it, dequeue it
171  // Else
172  // If front block was the tail block when we entered the function, return false
173  // Else advance to next block and dequeue the item there
174 
175  // Note that we have to use the value of the tail block from before we check if the front
176  // block is full or not, in case the front block is empty and then, before we check if the
177  // tail block is at the front block or not, the producer fills up the front block *and
178  // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
179  // reproducible in practice.
180  // In order to avoid overhead in the common case, though, we do a double-checked pattern
181  // where we have the fast path if the front block is not empty, then read the tail block,
182  // then re-read the front block and check if it's not empty again, then check if the tail
183  // block has advanced.
184 
185  Block* frontBlock_ = frontBlock.load();
186  size_t blockTail = frontBlock_->localTail;
187  size_t blockFront = frontBlock_->front.load();
188 
189  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
191 
192  non_empty_front_block:
193  // Front block not empty, dequeue from here
194  result = frontBlock_->data[blockFront];
195  //element->~T();
196 
197  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
198 
200  frontBlock_->front = blockFront;
201  }
202  else if (frontBlock_ != tailBlock.load()) {
204 
205  frontBlock_ = frontBlock.load();
206  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
207  blockFront = frontBlock_->front.load();
209 
210  if (blockFront != blockTail) {
211  // Oh look, the front block isn't empty after all
212  goto non_empty_front_block;
213  }
214 
215  // Front block is empty but there's another block ahead, advance to it
216  Block* nextBlock = frontBlock_->next;
217  // Don't need an acquire fence here since next can only ever be set on the tailBlock,
218  // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
219  // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
220 
221  size_t nextBlockFront = nextBlock->front.load();
222  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
224 
225  // Since the tailBlock is only ever advanced after being written to,
226  // we know there's for sure an element to dequeue on it
227  assert(nextBlockFront != nextBlockTail);
228  AE_UNUSED(nextBlockTail);
229 
230  // We're done with this block, let the producer use it if it needs
231  fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
232  frontBlock = frontBlock_ = nextBlock;
233 
234  compiler_fence(memory_order_release); // Not strictly needed
235 
236  result = frontBlock_->data [nextBlockFront];
237 
238  //element->~T();
239 
240  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
241 
243  frontBlock_->front = nextBlockFront;
244  }
245  else {
246  // No elements in current block and no other block to advance to
247  return false;
248  }
249 
250  return true;
251  }

References AE_UNUSED, compiler_fence(), ReaderWriterQueue::Block::data, fence(), ReaderWriterQueue::Block::front, frontBlock, weak_atomic< T >::load(), ReaderWriterQueue::Block::localTail, memory_order_acquire, memory_order_release, ReaderWriterQueue::Block::next, ReaderWriterQueue::Block::sizeMask, ReaderWriterQueue::Block::tail, and tailBlock.

Referenced by BlockingReaderWriterQueue::try_dequeue(), BlockingReaderWriterQueue::wait_dequeue(), and BlockingReaderWriterQueue::wait_dequeue_timed().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ cachelineFiller

char ReaderWriterQueue::cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic< Block * >)]
private

Definition at line 550 of file readerwriterqueue.h.

◆ frontBlock

weak_atomic<Block*> ReaderWriterQueue::frontBlock
private

◆ largestBlockSize

size_t ReaderWriterQueue::largestBlockSize
private

Definition at line 553 of file readerwriterqueue.h.

Referenced by inner_enqueue(), and ReaderWriterQueue().

◆ tailBlock

weak_atomic<Block*> ReaderWriterQueue::tailBlock
private

Definition at line 551 of file readerwriterqueue.h.

Referenced by inner_enqueue(), peek(), pop(), ReaderWriterQueue(), and try_dequeue().

◆ value_type

UINT8* ReaderWriterQueue::value_type

Definition at line 75 of file readerwriterqueue.h.


The documentation for this class was generated from the following file: