CARLA
 
载入中...
搜索中...
未找到
ConcurrentQueue.h
浏览该文件的文档.
1// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
2// An overview, including benchmark results, is provided here:
3// http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
4// The full design is also described in excruciating detail at:
5// http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
6
7// Simplified BSD license:
8// Copyright (c) 2013-2016, Cameron Desrochers.
9// All rights reserved.
10//
11// Redistribution and use in source and binary forms, with or without modification,
12// are permitted provided that the following conditions are met:
13//
14// - Redistributions of source code must retain the above copyright notice, this list of
15// conditions and the following disclaimer.
16// - Redistributions in binary form must reproduce the above copyright notice, this list of
17// conditions and the following disclaimer in the documentation and/or other materials
18// provided with the distribution.
19//
20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
21// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
23// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
25// OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
28// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30// Notice: This file has been slightly adapted for its use by CARLA.
31
32#pragma once
33
34#if defined(__GNUC__)
35// Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
36// Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
37// upon assigning any computed values)
38#pragma GCC diagnostic push
39#pragma GCC diagnostic ignored "-Wconversion"
40
41#ifdef MCDBGQ_USE_RELACY
42#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
43#endif
44#endif
45
46#if defined(__APPLE__)
47#include "TargetConditionals.h"
48#endif
49
50#ifdef MCDBGQ_USE_RELACY
51#include "relacy/relacy_std.hpp"
52#include "relacy_shims.h"
53// We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations.
54// We'll override the default trait malloc ourselves without a macro.
55#undef new
56#undef delete
57#undef malloc
58#undef free
59#else
60#include <atomic> // Requires C++11. Sorry VS2010.
61#include <cassert>
62#endif
63#include <cstddef> // for max_align_t
64#include <cstdint>
65#include <cstdlib>
66#include <type_traits>
67#include <algorithm>
68#include <utility>
69#include <limits>
70#include <climits> // for CHAR_BIT
71#include <array>
72#include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
73
74// Platform-specific definitions of a numeric thread ID type and an invalid value
75namespace moodycamel { namespace details {
76 template<typename thread_id_t> struct thread_id_converter {
79 static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
80 };
81} }
82#if defined(MCDBGQ_USE_RELACY)
83namespace moodycamel { namespace details {
84 typedef std::uint32_t thread_id_t;
85 static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
86 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
87 static inline thread_id_t thread_id() { return rl::thread_index(); }
88} }
89#elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
90// No sense pulling in windows.h in a header, we'll manually declare the function
91// we use and rely on backwards-compatibility for this not to break
92extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
93namespace moodycamel { namespace details {
94 static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
95 typedef std::uint32_t thread_id_t;
96 static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
97 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
98 static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
99} }
100#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
101namespace moodycamel { namespace details {
102 static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
103
104 typedef std::thread::id thread_id_t;
105 static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
106
107 // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
108 // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
109 // be.
110 static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
111
112 template<std::size_t> struct thread_id_size { };
113 template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; };
114 template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; };
115
116 template<> struct thread_id_converter<thread_id_t> {
117 typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
118#ifndef __APPLE__
119 typedef std::size_t thread_id_hash_t;
120#else
122#endif
123
124 static thread_id_hash_t prehash(thread_id_t const& x)
125 {
126#ifndef __APPLE__
127 return std::hash<std::thread::id>()(x);
128#else
129 return *reinterpret_cast<thread_id_hash_t const*>(&x);
130#endif
131 }
132 };
133} }
134#else
135// Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
136// In order to get a numeric thread ID in a platform-independent way, we use a thread-local
137// static variable's address as a thread identifier :-)
138#if defined(__GNUC__) || defined(__INTEL_COMPILER)
139#define MOODYCAMEL_THREADLOCAL __thread
140#elif defined(_MSC_VER)
141#define MOODYCAMEL_THREADLOCAL __declspec(thread)
142#else
143// Assume C++11 compliant compiler
144#define MOODYCAMEL_THREADLOCAL thread_local
145#endif
146namespace moodycamel { namespace details {
147 typedef std::uintptr_t thread_id_t;
148 static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
149 static const thread_id_t invalid_thread_id2 = 1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
150 static inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }
151} }
152#endif
153
154// Exceptions
155#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
156#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
157#define MOODYCAMEL_EXCEPTIONS_ENABLED
158#endif
159#endif
160
161// ~~~ @begin Modified for CARLA ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
162
163#include <carla/Exception.h>
164
165#if (defined(LIBCARLA_NO_EXCEPTIONS) && defined(MOODYCAMEL_EXCEPTIONS_ENABLED))
166# undef MOODYCAMEL_EXCEPTIONS_ENABLED
167#endif
168
169#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
170#define MOODYCAMEL_TRY try
171#define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
172#define MOODYCAMEL_RETHROW throw
173#define MOODYCAMEL_THROW(expr) ::carla::throw_exception(expr)
174#else
175#define MOODYCAMEL_TRY if (true)
176#define MOODYCAMEL_CATCH(...) else if (false)
177#define MOODYCAMEL_RETHROW
178#define MOODYCAMEL_THROW(expr) ::carla::throw_exception(expr)
179#endif
180
181// ~~~ @end Modified for CARLA ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
182
183#ifndef MOODYCAMEL_NOEXCEPT
184#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
185#define MOODYCAMEL_NOEXCEPT
186#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
187#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
188#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
189// VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
190// We have to assume *all* non-trivial constructors may throw on VS2012!
191#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
192#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
193#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
194#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
195#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
196#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
197#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
198#else
199#define MOODYCAMEL_NOEXCEPT noexcept
200#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
201#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
202#endif
203#endif
204
205#ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
206#ifdef MCDBGQ_USE_RELACY
207#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
208#else
209// VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
210// g++ <=4.7 doesn't support thread_local either.
211// Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
212#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
213// Assume `thread_local` is fully supported in all other C++11 compilers/platforms
214//#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // always disabled for now since several users report having problems with it on
215#endif
216#endif
217#endif
218
219// VS2012 doesn't support deleted functions.
220// In this case, we declare the function normally but don't define it. A link error will be generated if the function is called.
221#ifndef MOODYCAMEL_DELETE_FUNCTION
222#if defined(_MSC_VER) && _MSC_VER < 1800
223#define MOODYCAMEL_DELETE_FUNCTION
224#else
225#define MOODYCAMEL_DELETE_FUNCTION = delete
226#endif
227#endif
228
229// Compiler-specific likely/unlikely hints
230namespace moodycamel { namespace details {
231#if defined(__GNUC__)
232 static inline bool (likely)(bool x) { return __builtin_expect((x), true); }
233 static inline bool (unlikely)(bool x) { return __builtin_expect((x), false); }
234#else
235 static inline bool (likely)(bool x) { return x; }
236 static inline bool (unlikely)(bool x) { return x; }
237#endif
238} }
239
240#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
241#include "internal/concurrentqueue_internal_debug.h"
242#endif
243
244namespace moodycamel {
245namespace details {
246 template<typename T>
248 static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
249 static const T value = std::numeric_limits<T>::is_signed
250 ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
251 : static_cast<T>(-1);
252 };
253
254#if defined(__GLIBCXX__)
255 typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
256#else
257 typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
258#endif
259
260 // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
261 // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
262 typedef union {
264 long long y;
265 void* z;
266 } max_align_t;
267}
268
269// Default traits for the ConcurrentQueue. To change some of the
270// traits without re-implementing all of them, inherit from this
271// struct and shadow the declarations you wish to be different;
272// since the traits are used as a template type parameter, the
273// shadowed declarations will be used where defined, and the defaults
274// otherwise.
276{
277 // General-purpose size type. std::size_t is strongly recommended.
278 typedef std::size_t size_t;
279
280 // The type used for the enqueue and dequeue indices. Must be at least as
281 // large as size_t. Should be significantly larger than the number of elements
282 // you expect to hold at once, especially if you have a high turnover rate;
283 // for example, on 32-bit x86, if you expect to have over a hundred million
284 // elements or pump several million elements through your queue in a very
285 // short space of time, using a 32-bit type *may* trigger a race condition.
286 // A 64-bit int type is recommended in that case, and in practice will
287 // prevent a race condition no matter the usage of the queue. Note that
288 // whether the queue is lock-free with a 64-int type depends on the whether
289 // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
290 typedef std::size_t index_t;
291
292 // Internally, all elements are enqueued and dequeued from multi-element
293 // blocks; this is the smallest controllable unit. If you expect few elements
294 // but many producers, a smaller block size should be favoured. For few producers
295 // and/or many elements, a larger block size is preferred. A sane default
296 // is provided. Must be a power of 2.
297 static const size_t BLOCK_SIZE = 32;
298
299 // For explicit producers (i.e. when using a producer token), the block is
300 // checked for being empty by iterating through a list of flags, one per element.
301 // For large block sizes, this is too inefficient, and switching to an atomic
302 // counter-based approach is faster. The switch is made for block sizes strictly
303 // larger than this threshold.
305
306 // How many full blocks can be expected for a single explicit producer? This should
307 // reflect that number's maximum for optimal performance. Must be a power of 2.
308 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
309
310 // How many full blocks can be expected for a single implicit producer? This should
311 // reflect that number's maximum for optimal performance. Must be a power of 2.
312 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
313
314 // The initial size of the hash table mapping thread IDs to implicit producers.
315 // Note that the hash is resized every time it becomes half full.
316 // Must be a power of two, and either 0 or at least 1. If 0, implicit production
317 // (using the enqueue methods without an explicit producer token) is disabled.
318 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
319
320 // Controls the number of items that an explicit consumer (i.e. one with a token)
321 // must consume before it causes all consumers to rotate and move on to the next
322 // internal queue.
324
325 // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
326 // Enqueue operations that would cause this limit to be surpassed will fail. Note
327 // that this limit is enforced at the block level (for performance reasons), i.e.
328 // it's rounded up to the nearest block size.
330
331
332#ifndef MCDBGQ_USE_RELACY
333 // Memory allocation can be customized if needed.
334 // malloc should return nullptr on failure, and handle alignment like std::malloc.
335#if defined(malloc) || defined(free)
336 // Gah, this is 2015, stop defining macros that break standard code already!
337 // Work around malloc/free being special macros:
338 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
339 static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
340 static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
341 static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
342#else
343 static inline void* malloc(size_t size) { return std::malloc(size); }
344 static inline void free(void* ptr) { return std::free(ptr); }
345#endif
346#else
347 // Debug versions when running under the Relacy race detector (ignore
348 // these in user code)
349 static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
350 static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
351#endif
352};
353
354
355// When producing or consuming many elements, the most efficient way is to:
356// 1) Use one of the bulk-operation methods of the queue with a token
357// 2) Failing that, use the bulk-operation methods without a token
358// 3) Failing that, create a token and use that with the single-item methods
359// 4) Failing that, use the single-parameter methods of the queue
360// Having said that, don't create tokens willy-nilly -- ideally there should be
361// a maximum of one token per thread (of each kind).
362struct ProducerToken;
363struct ConsumerToken;
364
365template<typename T, typename Traits> class ConcurrentQueue;
366template<typename T, typename Traits> class BlockingConcurrentQueue;
367class ConcurrentQueueTests;
368
369
370namespace details
371{
383
384 template<bool use32> struct _hash_32_or_64 {
385 static inline std::uint32_t hash(std::uint32_t h)
386 {
387 // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
388 // Since the thread ID is already unique, all we really want to do is propagate that
389 // uniqueness evenly across all the bits, so that we can use a subset of the bits while
390 // reducing collisions significantly
391 h ^= h >> 16;
392 h *= 0x85ebca6b;
393 h ^= h >> 13;
394 h *= 0xc2b2ae35;
395 return h ^ (h >> 16);
396 }
397 };
398 template<> struct _hash_32_or_64<1> {
399 static inline std::uint64_t hash(std::uint64_t h)
400 {
401 h ^= h >> 33;
402 h *= 0xff51afd7ed558ccd;
403 h ^= h >> 33;
404 h *= 0xc4ceb9fe1a85ec53;
405 return h ^ (h >> 33);
406 }
407 };
408 template<std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { };
409
410 static inline size_t hash_thread_id(thread_id_t id)
411 {
412 static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
413 return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
415 }
416
417 template<typename T>
418 static inline bool circular_less_than(T a, T b)
419 {
420#ifdef _MSC_VER
421#pragma warning(push)
422#pragma warning(disable: 4554)
423#endif
424 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
425 return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1));
426#ifdef _MSC_VER
427#pragma warning(pop)
428#endif
429 }
430
431 template<typename U>
432 static inline char* align_for(char* ptr)
433 {
434 const std::size_t alignment = std::alignment_of<U>::value;
435 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
436 }
437
438 template<typename T>
439 static inline T ceil_to_pow_2(T x)
440 {
441 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
442
443 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
444 --x;
445 x |= x >> 1;
446 x |= x >> 2;
447 x |= x >> 4;
448 for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
449 x |= x >> (i << 3);
450 }
451 ++x;
452 return x;
453 }
454
455 template<typename T>
456 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
457 {
458 T temp = std::move(left.load(std::memory_order_relaxed));
459 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
460 right.store(std::move(temp), std::memory_order_relaxed);
461 }
462
463 template<typename T>
464 static inline T const& nomove(T const& x)
465 {
466 return x;
467 }
468
469 template<bool Enable>
471 {
472 template<typename T>
473 static inline T const& eval(T const& x)
474 {
475 return x;
476 }
477 };
478
479 template<>
480 struct nomove_if<false>
481 {
482 template<typename U>
483 static inline auto eval(U&& x)
484 -> decltype(std::forward<U>(x))
485 {
486 return std::forward<U>(x);
487 }
488 };
489
490 template<typename It>
491 static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
492 {
493 return *it;
494 }
495
496#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
497 template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
498#else
499 template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
500#endif
501
502#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
503#ifdef MCDBGQ_USE_RELACY
504 typedef RelacyThreadExitListener ThreadExitListener;
505 typedef RelacyThreadExitNotifier ThreadExitNotifier;
506#else
507 struct ThreadExitListener
508 {
509 typedef void (*callback_t)(void*);
510 callback_t callback;
511 void* userData;
512
513 ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
514 };
515
516
517 class ThreadExitNotifier
518 {
519 public:
520 static void subscribe(ThreadExitListener* listener)
521 {
522 auto& tlsInst = instance();
523 listener->next = tlsInst.tail;
524 tlsInst.tail = listener;
525 }
526
527 static void unsubscribe(ThreadExitListener* listener)
528 {
529 auto& tlsInst = instance();
530 ThreadExitListener** prev = &tlsInst.tail;
531 for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
532 if (ptr == listener) {
533 *prev = ptr->next;
534 break;
535 }
536 prev = &ptr->next;
537 }
538 }
539
540 private:
541 ThreadExitNotifier() : tail(nullptr) { }
542 ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
543 ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
544
545 ~ThreadExitNotifier()
546 {
547 // This thread is about to exit, let everyone know!
548 assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
549 for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
550 ptr->callback(ptr->userData);
551 }
552 }
553
554 // Thread-local
555 static inline ThreadExitNotifier& instance()
556 {
557 static thread_local ThreadExitNotifier notifier;
558 return notifier;
559 }
560
561 private:
562 ThreadExitListener* tail;
563 };
564#endif
565#endif
566
567 template<typename T> struct static_is_lock_free_num { enum { value = 0 }; };
568 template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
569 template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
570 template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };
571 template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };
572 template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
573 template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };
574 template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
575 template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
576}
577
578
580{
581 template<typename T, typename Traits>
583
584 template<typename T, typename Traits>
586
588 : producer(other.producer)
589 {
590 other.producer = nullptr;
591 if (producer != nullptr) {
592 producer->token = this;
593 }
594 }
595
597 {
598 swap(other);
599 return *this;
600 }
601
603 {
604 std::swap(producer, other.producer);
605 if (producer != nullptr) {
606 producer->token = this;
607 }
608 if (other.producer != nullptr) {
609 other.producer->token = &other;
610 }
611 }
612
613 // A token is always valid unless:
614 // 1) Memory allocation failed during construction
615 // 2) It was moved via the move constructor
616 // (Note: assignment does a swap, leaving both potentially valid)
617 // 3) The associated queue was destroyed
618 // Note that if valid() returns true, that only indicates
619 // that the token is valid for use with a specific queue,
620 // but not which one; that's up to the user to track.
621 inline bool valid() const { return producer != nullptr; }
622
624 {
625 if (producer != nullptr) {
626 producer->token = nullptr;
627 producer->inactive.store(true, std::memory_order_release);
628 }
629 }
630
631 // Disable copying and assignment
634
635private:
636 template<typename T, typename Traits> friend class ConcurrentQueue;
638
639protected:
641};
642
643
645{
646 template<typename T, typename Traits>
648
649 template<typename T, typename Traits>
651
653 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
654 {
655 }
656
658 {
659 swap(other);
660 return *this;
661 }
662
664 {
665 std::swap(initialOffset, other.initialOffset);
666 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
667 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
668 std::swap(currentProducer, other.currentProducer);
669 std::swap(desiredProducer, other.desiredProducer);
670 }
671
672 // Disable copying and assignment
675
676private:
677 template<typename T, typename Traits> friend class ConcurrentQueue;
679
680private: // but shared with ConcurrentQueue
681 std::uint32_t initialOffset;
686};
687
688// Need to forward-declare this swap because it's in a namespace.
689// See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
690template<typename T, typename Traits>
692
693
694template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
696{
697public:
700
701 typedef typename Traits::index_t index_t;
702 typedef typename Traits::size_t size_t;
703
704 static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
705 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
706 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
707 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
708 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
709 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
710#ifdef _MSC_VER
711#pragma warning(push)
712#pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
713#pragma warning(disable: 4309) // static_cast: Truncation of constant value
714#endif
715 static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
716#ifdef _MSC_VER
717#pragma warning(pop)
718#endif
719
720 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type");
721 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type");
722 static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
723 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
724 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
725 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
726 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
727 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)), "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
728 static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1, "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
729
730public:
731 // Creates a queue with at least `capacity` element slots; note that the
732 // actual number of elements that can be inserted without additional memory
733 // allocation depends on the number of producers and the block size (e.g. if
734 // the block size is equal to `capacity`, only a single block will be allocated
735 // up-front, which means only a single producer will be able to enqueue elements
736 // without an extra allocation -- blocks aren't shared between producers).
737 // This method is not thread safe -- it is up to the user to ensure that the
738 // queue is fully constructed before it starts being used by other threads (this
739 // includes making the memory effects of construction visible, possibly with a
740 // memory barrier).
741 explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
742 : producerListTail(nullptr),
743 producerCount(0),
747 {
748 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
750 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
751
752#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
753 // Track all the producers using a fully-resolved typed list for
754 // each kind; this makes it possible to debug them starting from
755 // the root queue object (otherwise wacky casts are needed that
756 // don't compile in the debugger's expression evaluator).
757 explicitProducers.store(nullptr, std::memory_order_relaxed);
758 implicitProducers.store(nullptr, std::memory_order_relaxed);
759#endif
760 }
761
762 // Computes the correct amount of pre-allocated blocks for you based
763 // on the minimum number of elements you want available at any given
764 // time, and the maximum concurrent number of each type of producer.
765 ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
766 : producerListTail(nullptr),
767 producerCount(0),
771 {
772 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
774 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
776
777#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
778 explicitProducers.store(nullptr, std::memory_order_relaxed);
779 implicitProducers.store(nullptr, std::memory_order_relaxed);
780#endif
781 }
782
783 // Note: The queue should not be accessed concurrently while it's
784 // being deleted. It's up to the user to synchronize this.
785 // This method is not thread safe.
787 {
788 // Destroy producers
789 auto ptr = producerListTail.load(std::memory_order_relaxed);
790 while (ptr != nullptr) {
791 auto next = ptr->next_prod();
792 if (ptr->token != nullptr) {
793 ptr->token->producer = nullptr;
794 }
795 destroy(ptr);
796 ptr = next;
797 }
798
799 // Destroy implicit producer hash tables
801 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
802 while (hash != nullptr) {
803 auto prev = hash->prev;
804 if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically
805 for (size_t i = 0; i != hash->capacity; ++i) {
806 hash->entries[i].~ImplicitProducerKVP();
807 }
808 hash->~ImplicitProducerHash();
809 (Traits::free)(hash);
810 }
811 hash = prev;
812 }
813 }
814
815 // Destroy global free list
816 auto block = freeList.head_unsafe();
817 while (block != nullptr) {
818 auto next = block->freeListNext.load(std::memory_order_relaxed);
819 if (block->dynamicallyAllocated) {
820 destroy(block);
821 }
822 block = next;
823 }
824
825 // Destroy initial free list
827 }
828
829 // Disable copying and copy assignment
832
833 // Moving is supported, but note that it is *not* a thread-safe operation.
834 // Nobody can use the queue while it's being moved, and the memory effects
835 // of that move must be propagated to other threads before they can use it.
836 // Note: When a queue is moved, its tokens are still valid but can only be
837 // used with the destination queue (i.e. semantically they are moved along
838 // with the queue itself).
840 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
841 producerCount(other.producerCount.load(std::memory_order_relaxed)),
842 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
843 initialBlockPool(other.initialBlockPool),
844 initialBlockPoolSize(other.initialBlockPoolSize),
845 freeList(std::move(other.freeList)),
846 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
847 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
848 {
849 // Move the other one into this, and leave the other one as an empty queue
850 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
853
854 other.producerListTail.store(nullptr, std::memory_order_relaxed);
855 other.producerCount.store(0, std::memory_order_relaxed);
856 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
857 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
858
859#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
860 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
861 other.explicitProducers.store(nullptr, std::memory_order_relaxed);
862 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
863 other.implicitProducers.store(nullptr, std::memory_order_relaxed);
864#endif
865
866 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
867 other.initialBlockPoolSize = 0;
868 other.initialBlockPool = nullptr;
869
871 }
872
874 {
875 return swap_internal(other);
876 }
877
878 // Swaps this queue's state with the other's. Not thread-safe.
879 // Swapping two queues does not invalidate their tokens, however
880 // the tokens that were created for one queue must be used with
881 // only the swapped queue (i.e. the tokens are tied to the
882 // queue's movable state, not the object itself).
884 {
885 swap_internal(other);
886 }
887
888private:
890 {
891 if (this == &other) {
892 return *this;
893 }
894
898 std::swap(initialBlockPool, other.initialBlockPool);
900 freeList.swap(other.freeList);
903
905
907 other.reown_producers();
908
909#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
910 details::swap_relaxed(explicitProducers, other.explicitProducers);
911 details::swap_relaxed(implicitProducers, other.implicitProducers);
912#endif
913
914 return *this;
915 }
916
917public:
918 // Enqueues a single item (by copying it).
919 // Allocates memory if required. Only fails if memory allocation fails (or implicit
920 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
921 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
922 // Thread-safe.
923 inline bool enqueue(T const& item)
924 {
925 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
926 return inner_enqueue<CanAlloc>(item);
927 }
928
929 // Enqueues a single item (by moving it, if possible).
930 // Allocates memory if required. Only fails if memory allocation fails (or implicit
931 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
932 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
933 // Thread-safe.
934 inline bool enqueue(T&& item)
935 {
936 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
937 return inner_enqueue<CanAlloc>(std::move(item));
938 }
939
940 // Enqueues a single item (by copying it) using an explicit producer token.
941 // Allocates memory if required. Only fails if memory allocation fails (or
942 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
943 // Thread-safe.
944 inline bool enqueue(producer_token_t const& token, T const& item)
945 {
946 return inner_enqueue<CanAlloc>(token, item);
947 }
948
949 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
950 // Allocates memory if required. Only fails if memory allocation fails (or
951 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
952 // Thread-safe.
953 inline bool enqueue(producer_token_t const& token, T&& item)
954 {
955 return inner_enqueue<CanAlloc>(token, std::move(item));
956 }
957
958 // Enqueues several items.
959 // Allocates memory if required. Only fails if memory allocation fails (or
960 // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
961 // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
962 // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
963 // Thread-safe.
964 template<typename It>
965 bool enqueue_bulk(It itemFirst, size_t count)
966 {
967 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
968 return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
969 }
970
971 // Enqueues several items using an explicit producer token.
972 // Allocates memory if required. Only fails if memory allocation fails
973 // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
974 // Note: Use std::make_move_iterator if the elements should be moved
975 // instead of copied.
976 // Thread-safe.
977 template<typename It>
978 bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
979 {
980 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
981 }
982
983 // Enqueues a single item (by copying it).
984 // Does not allocate memory. Fails if not enough room to enqueue (or implicit
985 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
986 // is 0).
987 // Thread-safe.
988 inline bool try_enqueue(T const& item)
989 {
990 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
991 return inner_enqueue<CannotAlloc>(item);
992 }
993
994 // Enqueues a single item (by moving it, if possible).
995 // Does not allocate memory (except for one-time implicit producer).
996 // Fails if not enough room to enqueue (or implicit production is
997 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
998 // Thread-safe.
999 inline bool try_enqueue(T&& item)
1000 {
1001 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1002 return inner_enqueue<CannotAlloc>(std::move(item));
1003 }
1004
1005 // Enqueues a single item (by copying it) using an explicit producer token.
1006 // Does not allocate memory. Fails if not enough room to enqueue.
1007 // Thread-safe.
1008 inline bool try_enqueue(producer_token_t const& token, T const& item)
1009 {
1010 return inner_enqueue<CannotAlloc>(token, item);
1011 }
1012
1013 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1014 // Does not allocate memory. Fails if not enough room to enqueue.
1015 // Thread-safe.
1016 inline bool try_enqueue(producer_token_t const& token, T&& item)
1017 {
1018 return inner_enqueue<CannotAlloc>(token, std::move(item));
1019 }
1020
1021 // Enqueues several items.
1022 // Does not allocate memory (except for one-time implicit producer).
1023 // Fails if not enough room to enqueue (or implicit production is
1024 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1025 // Note: Use std::make_move_iterator if the elements should be moved
1026 // instead of copied.
1027 // Thread-safe.
1028 template<typename It>
1029 bool try_enqueue_bulk(It itemFirst, size_t count)
1030 {
1031 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1032 return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1033 }
1034
1035 // Enqueues several items using an explicit producer token.
1036 // Does not allocate memory. Fails if not enough room to enqueue.
1037 // Note: Use std::make_move_iterator if the elements should be moved
1038 // instead of copied.
1039 // Thread-safe.
1040 template<typename It>
1041 bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1042 {
1043 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1044 }
1045
1046
1047
1048 // Attempts to dequeue from the queue.
1049 // Returns false if all producer streams appeared empty at the time they
1050 // were checked (so, the queue is likely but not guaranteed to be empty).
1051 // Never allocates. Thread-safe.
1052 template<typename U>
1053 bool try_dequeue(U& item)
1054 {
1055 // Instead of simply trying each producer in turn (which could cause needless contention on the first
1056 // producer), we score them heuristically.
1057 size_t nonEmptyCount = 0;
1058 ProducerBase* best = nullptr;
1059 size_t bestSize = 0;
1060 for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1061 auto size = ptr->size_approx();
1062 if (size > 0) {
1063 if (size > bestSize) {
1064 bestSize = size;
1065 best = ptr;
1066 }
1067 ++nonEmptyCount;
1068 }
1069 }
1070
1071 // If there was at least one non-empty queue but it appears empty at the time
1072 // we try to dequeue from it, we need to make sure every queue's been tried
1073 if (nonEmptyCount > 0) {
1074 if ((details::likely)(best->dequeue(item))) {
1075 return true;
1076 }
1077 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1078 if (ptr != best && ptr->dequeue(item)) {
1079 return true;
1080 }
1081 }
1082 }
1083 return false;
1084 }
1085
1086 // Attempts to dequeue from the queue.
1087 // Returns false if all producer streams appeared empty at the time they
1088 // were checked (so, the queue is likely but not guaranteed to be empty).
1089 // This differs from the try_dequeue(item) method in that this one does
1090 // not attempt to reduce contention by interleaving the order that producer
1091 // streams are dequeued from. So, using this method can reduce overall throughput
1092 // under contention, but will give more predictable results in single-threaded
1093 // consumer scenarios. This is mostly only useful for internal unit tests.
1094 // Never allocates. Thread-safe.
1095 template<typename U>
1097 {
1098 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1099 if (ptr->dequeue(item)) {
1100 return true;
1101 }
1102 }
1103 return false;
1104 }
1105
1106 // Attempts to dequeue from the queue using an explicit consumer token.
1107 // Returns false if all producer streams appeared empty at the time they
1108 // were checked (so, the queue is likely but not guaranteed to be empty).
1109 // Never allocates. Thread-safe.
1110 template<typename U>
1111 bool try_dequeue(consumer_token_t& token, U& item)
1112 {
1113 // The idea is roughly as follows:
1114 // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
1115 // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
1116 // If there's no items where you're supposed to be, keep moving until you find a producer with some items
1117 // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
1118
1119 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1121 return false;
1122 }
1123 }
1124
1125 // If there was at least one non-empty queue but it appears empty at the time
1126 // we try to dequeue from it, we need to make sure every queue's been tried
1127 if (static_cast<ProducerBase*>(token.currentProducer)->dequeue(item)) {
1129 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1130 }
1131 return true;
1132 }
1133
1134 auto tail = producerListTail.load(std::memory_order_acquire);
1135 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1136 if (ptr == nullptr) {
1137 ptr = tail;
1138 }
1139 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1140 if (ptr->dequeue(item)) {
1141 token.currentProducer = ptr;
1142 token.itemsConsumedFromCurrent = 1;
1143 return true;
1144 }
1145 ptr = ptr->next_prod();
1146 if (ptr == nullptr) {
1147 ptr = tail;
1148 }
1149 }
1150 return false;
1151 }
1152
1153 // Attempts to dequeue several elements from the queue.
1154 // Returns the number of items actually dequeued.
1155 // Returns 0 if all producer streams appeared empty at the time they
1156 // were checked (so, the queue is likely but not guaranteed to be empty).
1157 // Never allocates. Thread-safe.
1158 template<typename It>
1159 size_t try_dequeue_bulk(It itemFirst, size_t max)
1160 {
1161 size_t count = 0;
1162 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1163 count += ptr->dequeue_bulk(itemFirst, max - count);
1164 if (count == max) {
1165 break;
1166 }
1167 }
1168 return count;
1169 }
1170
1171 // Attempts to dequeue several elements from the queue using an explicit consumer token.
1172 // Returns the number of items actually dequeued.
1173 // Returns 0 if all producer streams appeared empty at the time they
1174 // were checked (so, the queue is likely but not guaranteed to be empty).
1175 // Never allocates. Thread-safe.
1176 template<typename It>
1177 size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
1178 {
1179 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1181 return 0;
1182 }
1183 }
1184
1185 size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1186 if (count == max) {
1187 if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1188 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1189 }
1190 return max;
1191 }
1192 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1193 max -= count;
1194
1195 auto tail = producerListTail.load(std::memory_order_acquire);
1196 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1197 if (ptr == nullptr) {
1198 ptr = tail;
1199 }
1200 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1201 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1202 count += dequeued;
1203 if (dequeued != 0) {
1204 token.currentProducer = ptr;
1205 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1206 }
1207 if (dequeued == max) {
1208 break;
1209 }
1210 max -= dequeued;
1211 ptr = ptr->next_prod();
1212 if (ptr == nullptr) {
1213 ptr = tail;
1214 }
1215 }
1216 return count;
1217 }
1218
1219
1220
1221 // Attempts to dequeue from a specific producer's inner queue.
1222 // If you happen to know which producer you want to dequeue from, this
1223 // is significantly faster than using the general-case try_dequeue methods.
1224 // Returns false if the producer's queue appeared empty at the time it
1225 // was checked (so, the queue is likely but not guaranteed to be empty).
1226 // Never allocates. Thread-safe.
1227 template<typename U>
1228 inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
1229 {
1230 return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
1231 }
1232
1233 // Attempts to dequeue several elements from a specific producer's inner queue.
1234 // Returns the number of items actually dequeued.
1235 // If you happen to know which producer you want to dequeue from, this
1236 // is significantly faster than using the general-case try_dequeue methods.
1237 // Returns 0 if the producer's queue appeared empty at the time it
1238 // was checked (so, the queue is likely but not guaranteed to be empty).
1239 // Never allocates. Thread-safe.
1240 template<typename It>
1241 inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
1242 {
1243 return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
1244 }
1245
1246
1247 // Returns an estimate of the total number of elements currently in the queue. This
1248 // estimate is only accurate if the queue has completely stabilized before it is called
1249 // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1250 // visible on the calling thread, and no further operations start while this method is
1251 // being called).
1252 // Thread-safe.
1253 size_t size_approx() const
1254 {
1255 size_t size = 0;
1256 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1257 size += ptr->size_approx();
1258 }
1259 return size;
1260 }
1261
1262
1263 // Returns true if the underlying atomic variables used by
1264 // the queue are lock-free (they should be on most platforms).
1265 // Thread-safe.
1276
1277
1278private:
1279 friend struct ProducerToken;
1280 friend struct ConsumerToken;
1281 struct ExplicitProducer;
1282 friend struct ExplicitProducer;
1283 struct ImplicitProducer;
1284 friend struct ImplicitProducer;
1286
1288
1289
1290 ///////////////////////////////
1291 // Queue methods
1292 ///////////////////////////////
1293
1294 template<AllocationMode canAlloc, typename U>
1295 inline bool inner_enqueue(producer_token_t const& token, U&& element)
1296 {
1297 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1298 }
1299
1300 template<AllocationMode canAlloc, typename U>
1301 inline bool inner_enqueue(U&& element)
1302 {
1303 auto producer = get_or_add_implicit_producer();
1304 return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1305 }
1306
1307 template<AllocationMode canAlloc, typename It>
1308 inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1309 {
1310 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1311 }
1312
1313 template<AllocationMode canAlloc, typename It>
1314 inline bool inner_enqueue_bulk(It itemFirst, size_t count)
1315 {
1316 auto producer = get_or_add_implicit_producer();
1317 return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1318 }
1319
1321 {
1322 // Ah, there's been a rotation, figure out where we should be!
1323 auto tail = producerListTail.load(std::memory_order_acquire);
1324 if (token.desiredProducer == nullptr && tail == nullptr) {
1325 return false;
1326 }
1327 auto prodCount = producerCount.load(std::memory_order_relaxed);
1328 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1329 if ((details::unlikely)(token.desiredProducer == nullptr)) {
1330 // Aha, first time we're dequeueing anything.
1331 // Figure out our local position
1332 // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
1333 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1334 token.desiredProducer = tail;
1335 for (std::uint32_t i = 0; i != offset; ++i) {
1336 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1337 if (token.desiredProducer == nullptr) {
1338 token.desiredProducer = tail;
1339 }
1340 }
1341 }
1342
1343 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1344 if (delta >= prodCount) {
1345 delta = delta % prodCount;
1346 }
1347 for (std::uint32_t i = 0; i != delta; ++i) {
1348 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1349 if (token.desiredProducer == nullptr) {
1350 token.desiredProducer = tail;
1351 }
1352 }
1353
1354 token.lastKnownGlobalOffset = globalOffset;
1355 token.currentProducer = token.desiredProducer;
1356 token.itemsConsumedFromCurrent = 0;
1357 return true;
1358 }
1359
1360
1361 ///////////////////////////
1362 // Free list
1363 ///////////////////////////
1364
1365 template <typename N>
1367 {
1369
1370 std::atomic<std::uint32_t> freeListRefs;
1371 std::atomic<N*> freeListNext;
1372 };
1373
1374 // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
1375 // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
1376 // speedy under low contention.
1377 template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
1379 {
1380 FreeList() : freeListHead(nullptr) { }
1381 FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
1383
1386
1387 inline void add(N* node)
1388 {
1389#if MCDBGQ_NOLOCKFREE_FREELIST
1390 debug::DebugLock lock(mutex);
1391#endif
1392 // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
1393 // set it using a fetch_add
1394 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1395 // Oh look! We were the last ones referencing this node, and we know
1396 // we want to add it to the free list, so let's do it!
1398 }
1399 }
1400
1401 inline N* try_get()
1402 {
1403#if MCDBGQ_NOLOCKFREE_FREELIST
1404 debug::DebugLock lock(mutex);
1405#endif
1406 auto head = freeListHead.load(std::memory_order_acquire);
1407 while (head != nullptr) {
1408 auto prevHead = head;
1409 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1410 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1411 head = freeListHead.load(std::memory_order_acquire);
1412 continue;
1413 }
1414
1415 // Good, reference count has been incremented (it wasn't at zero), which means we can read the
1416 // next and not worry about it changing between now and the time we do the CAS
1417 auto next = head->freeListNext.load(std::memory_order_relaxed);
1418 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1419 // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
1420 // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
1421 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1422
1423 // Decrease refcount twice, once for our ref, and once for the list's ref
1424 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1425 return head;
1426 }
1427
1428 // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
1429 // Note that we don't need to release any memory effects, but we do need to ensure that the reference
1430 // count decrement happens-after the CAS on the head.
1431 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1432 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1434 }
1435 }
1436
1437 return nullptr;
1438 }
1439
1440 // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
1441 N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
1442
1443 private:
1444 inline void add_knowing_refcount_is_zero(N* node)
1445 {
1446 // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
1447 // only one copy of this method per node at a time, i.e. the single thread case), then we know
1448 // we can safely change the next pointer of the node; however, once the refcount is back above
1449 // zero, then other threads could increase it (happens under heavy contention, when the refcount
1450 // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
1451 // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
1452 // to add the node to the actual list fails, decrease the refcount and leave the add operation to
1453 // the next thread who puts the refcount back at zero (which could be us, hence the loop).
1454 auto head = freeListHead.load(std::memory_order_relaxed);
1455 while (true) {
1456 node->freeListNext.store(head, std::memory_order_relaxed);
1457 node->freeListRefs.store(1, std::memory_order_release);
1458 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1459 // Hmm, the add failed, but we can only try again when the refcount goes back to zero
1460 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1461 continue;
1462 }
1463 }
1464 return;
1465 }
1466 }
1467
1468 private:
1469 // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
1470 std::atomic<N*> freeListHead;
1471
1472 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1473 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1474
1475#if MCDBGQ_NOLOCKFREE_FREELIST
1476 debug::DebugMutex mutex;
1477#endif
1478 };
1479
1480
1481 ///////////////////////////
1482 // Block
1483 ///////////////////////////
1484
1486
1487 struct Block
1488 {
1491 {
1492#if MCDBGQ_TRACKMEM
1493 owner = nullptr;
1494#endif
1495 }
1496
1497 template<InnerQueueContext context>
1498 inline bool is_empty() const
1499 {
1501 // Check flags
1502 for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1503 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1504 return false;
1505 }
1506 }
1507
1508 // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
1509 std::atomic_thread_fence(std::memory_order_acquire);
1510 return true;
1511 }
1512 else {
1513 // Check counter
1514 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1515 std::atomic_thread_fence(std::memory_order_acquire);
1516 return true;
1517 }
1518 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1519 return false;
1520 }
1521 }
1522
1523 // Returns true if the block is now empty (does not apply in explicit context)
1524 template<InnerQueueContext context>
1525 inline bool set_empty(index_t i)
1526 {
1528 // Set flag
1529 assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
1530 emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
1531 return false;
1532 }
1533 else {
1534 // Increment counter
1535 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1536 assert(prevVal < BLOCK_SIZE);
1537 return prevVal == BLOCK_SIZE - 1;
1538 }
1539 }
1540
1541 // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
1542 // Returns true if the block is now empty (does not apply in explicit context).
1543 template<InnerQueueContext context>
1544 inline bool set_many_empty(index_t i, size_t count)
1545 {
1547 // Set flags
1548 std::atomic_thread_fence(std::memory_order_release);
1549 i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
1550 for (size_t j = 0; j != count; ++j) {
1551 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1552 emptyFlags[i + j].store(true, std::memory_order_relaxed);
1553 }
1554 return false;
1555 }
1556 else {
1557 // Increment counter
1558 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1559 assert(prevVal + count <= BLOCK_SIZE);
1560 return prevVal + count == BLOCK_SIZE;
1561 }
1562 }
1563
1564 template<InnerQueueContext context>
1565 inline void set_all_empty()
1566 {
1568 // Set all flags
1569 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1570 emptyFlags[i].store(true, std::memory_order_relaxed);
1571 }
1572 }
1573 else {
1574 // Reset counter
1575 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1576 }
1577 }
1578
1579 template<InnerQueueContext context>
1580 inline void reset_empty()
1581 {
1583 // Reset flags
1584 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1585 emptyFlags[i].store(false, std::memory_order_relaxed);
1586 }
1587 }
1588 else {
1589 // Reset counter
1590 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1591 }
1592 }
1593
1594 inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1595 inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1596
1597 private:
1598 // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of
1599 // addresses returned by malloc, that alignment will be preserved. Apparently clang actually
1600 // generates code that uses this assumption for AVX instructions in some cases. Ideally, we
1601 // should also align Block to the alignment of T in case it's higher than malloc's 16-byte
1602 // alignment, but this is hard to do in a cross-platform way. Assert for this case:
1603 static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value, "The queue does not support super-aligned types at this time");
1604 // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since
1605 // otherwise the appropriate padding will not be added at the end of Block in order to make
1606 // arrays of Blocks all be properly aligned (not just the first one). We use a union to force
1607 // this.
1608 union {
1609 char elements[sizeof(T) * BLOCK_SIZE];
1611 };
1612 public:
1614 std::atomic<size_t> elementsCompletelyDequeued;
1616 public:
1617 std::atomic<std::uint32_t> freeListRefs;
1618 std::atomic<Block*> freeListNext;
1619 std::atomic<bool> shouldBeOnFreeList;
1620 bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
1621
1622#if MCDBGQ_TRACKMEM
1623 void* owner;
1624#endif
1625 };
1626 static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1627
1628
1629#if MCDBGQ_TRACKMEM
1630public:
1631 struct MemStats;
1632private:
1633#endif
1634
1635 ///////////////////////////
1636 // Producer base
1637 ///////////////////////////
1638
1640 {
1641 ProducerBase(ConcurrentQueue* parent_, bool isExplicit_) :
1642 tailIndex(0),
1643 headIndex(0),
1646 tailBlock(nullptr),
1647 isExplicit(isExplicit_),
1648 parent(parent_)
1649 {
1650 }
1651
1652 virtual ~ProducerBase() { };
1653
1654 template<typename U>
1655 inline bool dequeue(U& element)
1656 {
1657 if (isExplicit) {
1658 return static_cast<ExplicitProducer*>(this)->dequeue(element);
1659 }
1660 else {
1661 return static_cast<ImplicitProducer*>(this)->dequeue(element);
1662 }
1663 }
1664
1665 template<typename It>
1666 inline size_t dequeue_bulk(It& itemFirst, size_t max)
1667 {
1668 if (isExplicit) {
1669 return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1670 }
1671 else {
1672 return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1673 }
1674 }
1675
1676 inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
1677
1678 inline size_t size_approx() const
1679 {
1680 auto tail = tailIndex.load(std::memory_order_relaxed);
1681 auto head = headIndex.load(std::memory_order_relaxed);
1682 return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
1683 }
1684
1685 inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1686 protected:
1687 std::atomic<index_t> tailIndex; // Where to enqueue to next
1688 std::atomic<index_t> headIndex; // Where to dequeue from next
1689
1690 std::atomic<index_t> dequeueOptimisticCount;
1691 std::atomic<index_t> dequeueOvercommit;
1692
1694
1695 public:
1698
1699 protected:
1700#if MCDBGQ_TRACKMEM
1701 friend struct MemStats;
1702#endif
1703 };
1704
1705
1706 ///////////////////////////
1707 // Explicit queue
1708 ///////////////////////////
1709
1711 {
1713 ProducerBase(parent, true),
1714 blockIndex(nullptr),
1718 pr_blockIndexEntries(nullptr),
1719 pr_blockIndexRaw(nullptr)
1720 {
1721 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent->initialBlockPoolSize) >> 1;
1722 if (poolBasedIndexSize > pr_blockIndexSize) {
1723 pr_blockIndexSize = poolBasedIndexSize;
1724 }
1725
1726 new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
1727 }
1728
1730 {
1731 // Destruct any elements not yet dequeued.
1732 // Since we're in the destructor, we can assume all elements
1733 // are either completely dequeued or completely not (no halfways).
1734 if (this->tailBlock != nullptr) { // Note this means there must be a block index too
1735 // First find the block that's partially dequeued, if any
1736 Block* halfDequeuedBlock = nullptr;
1737 if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1738 // The head's not on a block boundary, meaning a block somewhere is partially dequeued
1739 // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
1741 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
1742 i = (i + 1) & (pr_blockIndexSize - 1);
1743 }
1744 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
1745 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1746 }
1747
1748 // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
1749 auto block = this->tailBlock;
1750 do {
1751 block = block->next;
1752 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1753 continue;
1754 }
1755
1756 size_t i = 0; // Offset into block
1757 if (block == halfDequeuedBlock) {
1758 i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1759 }
1760
1761 // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
1762 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1763 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1764 (*block)[i++]->~T();
1765 }
1766 } while (block != this->tailBlock);
1767 }
1768
1769 // Destroy all blocks that we own
1770 if (this->tailBlock != nullptr) {
1771 auto block = this->tailBlock;
1772 do {
1773 auto nextBlock = block->next;
1774 if (block->dynamicallyAllocated) {
1775 destroy(block);
1776 }
1777 else {
1778 this->parent->add_block_to_free_list(block);
1779 }
1780 block = nextBlock;
1781 } while (block != this->tailBlock);
1782 }
1783
1784 // Destroy the block indices
1785 auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw);
1786 while (header != nullptr) {
1787 auto prev = static_cast<BlockIndexHeader*>(header->prev);
1788 header->~BlockIndexHeader();
1789 (Traits::free)(header);
1790 header = prev;
1791 }
1792 }
1793
1794 template<AllocationMode allocMode, typename U>
1795 inline bool enqueue(U&& element)
1796 {
1797 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1798 index_t newTailIndex = 1 + currentTailIndex;
1799 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1800 // We reached the end of a block, start a new one
1801 auto startBlock = this->tailBlock;
1802 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1803 if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1804 // We can re-use the block ahead of us, it's empty!
1805 this->tailBlock = this->tailBlock->next;
1806 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1807
1808 // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
1809 // last block from it first -- except instead of removing then adding, we can just overwrite).
1810 // Note that there must be a valid block index here, since even if allocation failed in the ctor,
1811 // it would have been re-attempted when adding the first block to the queue; since there is such
1812 // a block, a block index must have been successfully allocated.
1813 }
1814 else {
1815 // Whatever head value we see here is >= the last value we saw here (relatively),
1816 // and <= its current value. Since we have the most recent tail, the head must be
1817 // <= to it.
1818 auto head = this->headIndex.load(std::memory_order_relaxed);
1819 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1820 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1822 // We can't enqueue in another block because there's not enough leeway -- the
1823 // tail could surpass the head by the time the block fills up! (Or we'll exceed
1824 // the size limit, if the second part of the condition was true.)
1825 return false;
1826 }
1827 // We're going to need a new block; check that the block index has room
1829 // Hmm, the circular block index is already full -- we'll need
1830 // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
1831 // the initial allocation failed in the constructor.
1832
1833 if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
1834 return false;
1835 }
1836 }
1837
1838 // Insert a new block in the circular linked list
1839 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1840 if (newBlock == nullptr) {
1841 return false;
1842 }
1843#if MCDBGQ_TRACKMEM
1844 newBlock->owner = this;
1845#endif
1846 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1847 if (this->tailBlock == nullptr) {
1848 newBlock->next = newBlock;
1849 }
1850 else {
1851 newBlock->next = this->tailBlock->next;
1852 this->tailBlock->next = newBlock;
1853 }
1854 this->tailBlock = newBlock;
1856 }
1857
1858 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
1859 // The constructor may throw. We want the element not to appear in the queue in
1860 // that case (without corrupting the queue):
1862 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1863 }
1864 MOODYCAMEL_CATCH (...) {
1865 // Revert change to the current block, but leave the new block available
1866 // for next time
1867 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1868 this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
1870 }
1871 }
1872 else {
1873 (void)startBlock;
1874 (void)originalBlockIndexSlotsUsed;
1875 }
1876
1877 // Add block to block index
1878 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1879 entry.base = currentTailIndex;
1880 entry.block = this->tailBlock;
1881 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1883
1884 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
1885 this->tailIndex.store(newTailIndex, std::memory_order_release);
1886 return true;
1887 }
1888 }
1889
1890 // Enqueue
1891 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1892
1893 this->tailIndex.store(newTailIndex, std::memory_order_release);
1894 return true;
1895 }
1896
1897 template<typename U>
1898 bool dequeue(U& element)
1899 {
1900 auto tail = this->tailIndex.load(std::memory_order_relaxed);
1901 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1902 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
1903 // Might be something to dequeue, let's give it a try
1904
1905 // Note that this if is purely for performance purposes in the common case when the queue is
1906 // empty and the values are eventually consistent -- we may enter here spuriously.
1907
1908 // Note that whatever the values of overcommit and tail are, they are not going to change (unless we
1909 // change them) and must be the same value at this point (inside the if) as when the if condition was
1910 // evaluated.
1911
1912 // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below.
1913 // This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in
1914 // the fetch_add below will result in a value at least as recent as that (and therefore at least as large).
1915 // Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all
1916 // read-modify-write operations are guaranteed to work on the latest value in the modification order), but
1917 // unfortunately that can't be shown to be correct using only the C++11 standard.
1918 // See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
1919 std::atomic_thread_fence(std::memory_order_acquire);
1920
1921 // Increment optimistic counter, then check if it went over the boundary
1922 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1923
1924 // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever
1925 // incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now
1926 // have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon
1927 // incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount.
1928 // However, we can't assert this since both dequeueOptimisticCount and dequeueOvercommit may (independently)
1929 // overflow; in such a case, though, the logic still holds since the difference between the two is maintained.
1930
1931 // Note that we reload tail here in case it changed; it will be the same value as before or greater, since
1932 // this load is sequenced after (happens after) the earlier load above. This is supported by read-read
1933 // coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order
1934 tail = this->tailIndex.load(std::memory_order_acquire);
1935 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1936 // Guaranteed to be at least one element to dequeue!
1937
1938 // Get the index. Note that since there's guaranteed to be at least one element, this
1939 // will never exceed tail. We need to do an acquire-release fence here since it's possible
1940 // that whatever condition got us to this point was for an earlier enqueued element (that
1941 // we already see the memory effects for), but that by the time we increment somebody else
1942 // has incremented it, and we need to see the memory effects for *that* element, which is
1943 // in such a case is necessarily visible on the thread that incremented it in the first
1944 // place with the more current condition (they must have acquired a tail that is at least
1945 // as recent).
1946 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
1947
1948
1949 // Determine which block the element is in
1950
1951 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
1952 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1953
1954 // We need to be careful here about subtracting and dividing because of index wrap-around.
1955 // When an index wraps, we need to preserve the sign of the offset when dividing it by the
1956 // block size (in order to get a correct signed block count offset in all cases):
1957 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1958 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
1959 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / BLOCK_SIZE);
1960 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
1961
1962 // Dequeue
1963 auto& el = *((*block)[index]);
1964 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
1965 // Make sure the element is still fully dequeued and destroyed even if the assignment
1966 // throws
1967 struct Guard {
1968 Block* block;
1969 index_t index;
1970
1971 ~Guard()
1972 {
1973 (*block)[index]->~T();
1974 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1975 }
1976 } guard = { block, index };
1977
1978 element = std::move(el);
1979 }
1980 else {
1981 element = std::move(el);
1982 el.~T();
1983 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1984 }
1985
1986 return true;
1987 }
1988 else {
1989 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
1990 this->dequeueOvercommit.fetch_add(1, std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
1991 }
1992 }
1993
1994 return false;
1995 }
1996
1997 template<AllocationMode allocMode, typename It>
1998 bool enqueue_bulk(It itemFirst, size_t count)
1999 {
2000 // First, we need to make sure we have enough room to enqueue all of the elements;
2001 // this means pre-allocating blocks and putting them in the block index (but only if
2002 // all the allocations succeeded).
2003 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2004 auto startBlock = this->tailBlock;
2005 auto originalBlockIndexFront = pr_blockIndexFront;
2006 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2007
2008 Block* firstAllocatedBlock = nullptr;
2009
2010 // Figure out how many blocks we'll need to allocate, and do so
2011 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2012 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2013 if (blockBaseDiff > 0) {
2014 // Allocate as many blocks as possible from ahead
2015 while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2016 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2017 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2018
2019 this->tailBlock = this->tailBlock->next;
2020 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2021
2022 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2023 entry.base = currentTailIndex;
2024 entry.block = this->tailBlock;
2026 }
2027
2028 // Now allocate as many blocks as necessary from the block pool
2029 while (blockBaseDiff > 0) {
2030 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2031 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2032
2033 auto head = this->headIndex.load(std::memory_order_relaxed);
2034 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2035 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2036 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2037 if (allocMode == CannotAlloc || full || !new_block_index(originalBlockIndexSlotsUsed)) {
2038 // Failed to allocate, undo changes (but keep injected blocks)
2039 pr_blockIndexFront = originalBlockIndexFront;
2040 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2041 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2042 return false;
2043 }
2044
2045 // pr_blockIndexFront is updated inside new_block_index, so we need to
2046 // update our fallback value too (since we keep the new index even if we
2047 // later fail)
2048 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2049 }
2050
2051 // Insert a new block in the circular linked list
2052 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2053 if (newBlock == nullptr) {
2054 pr_blockIndexFront = originalBlockIndexFront;
2055 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2056 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2057 return false;
2058 }
2059
2060#if MCDBGQ_TRACKMEM
2061 newBlock->owner = this;
2062#endif
2063 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2064 if (this->tailBlock == nullptr) {
2065 newBlock->next = newBlock;
2066 }
2067 else {
2068 newBlock->next = this->tailBlock->next;
2069 this->tailBlock->next = newBlock;
2070 }
2071 this->tailBlock = newBlock;
2072 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2073
2075
2076 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2077 entry.base = currentTailIndex;
2078 entry.block = this->tailBlock;
2080 }
2081
2082 // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2083 // publish the new block index front
2084 auto block = firstAllocatedBlock;
2085 while (true) {
2086 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2087 if (block == this->tailBlock) {
2088 break;
2089 }
2090 block = block->next;
2091 }
2092
2093 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2094 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2095 }
2096 }
2097
2098 // Enqueue, one block at a time
2099 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2100 currentTailIndex = startTailIndex;
2101 auto endBlock = this->tailBlock;
2102 this->tailBlock = startBlock;
2103 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2104 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2105 this->tailBlock = firstAllocatedBlock;
2106 }
2107 while (true) {
2108 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2109 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2110 stopIndex = newTailIndex;
2111 }
2112 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2113 while (currentTailIndex != stopIndex) {
2114 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2115 }
2116 }
2117 else {
2119 while (currentTailIndex != stopIndex) {
2120 // Must use copy constructor even if move constructor is available
2121 // because we may have to revert if there's an exception.
2122 // Sorry about the horrible templated next line, but it was the only way
2123 // to disable moving *at compile time*, which is important because a type
2124 // may only define a (noexcept) move constructor, and so calls to the
2125 // cctor will not compile, even if they are in an if branch that will never
2126 // be executed
2127 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2128 ++currentTailIndex;
2129 ++itemFirst;
2130 }
2131 }
2132 MOODYCAMEL_CATCH (...) {
2133 // Oh dear, an exception's been thrown -- destroy the elements that
2134 // were enqueued so far and revert the entire bulk operation (we'll keep
2135 // any allocated blocks in our linked list for later, though).
2136 auto constructedStopIndex = currentTailIndex;
2137 auto lastBlockEnqueued = this->tailBlock;
2138
2139 pr_blockIndexFront = originalBlockIndexFront;
2140 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2141 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2142
2144 auto block = startBlock;
2145 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2146 block = firstAllocatedBlock;
2147 }
2148 currentTailIndex = startTailIndex;
2149 while (true) {
2150 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2151 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2152 stopIndex = constructedStopIndex;
2153 }
2154 while (currentTailIndex != stopIndex) {
2155 (*block)[currentTailIndex++]->~T();
2156 }
2157 if (block == lastBlockEnqueued) {
2158 break;
2159 }
2160 block = block->next;
2161 }
2162 }
2164 }
2165 }
2166
2167 if (this->tailBlock == endBlock) {
2168 assert(currentTailIndex == newTailIndex);
2169 break;
2170 }
2171 this->tailBlock = this->tailBlock->next;
2172 }
2173
2174 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst))) && firstAllocatedBlock != nullptr) {
2175 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2176 }
2177
2178 this->tailIndex.store(newTailIndex, std::memory_order_release);
2179 return true;
2180 }
2181
2182 template<typename It>
2183 size_t dequeue_bulk(It& itemFirst, size_t max)
2184 {
2185 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2186 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2187 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2188 if (details::circular_less_than<size_t>(0, desiredCount)) {
2189 desiredCount = desiredCount < max ? desiredCount : max;
2190 std::atomic_thread_fence(std::memory_order_acquire);
2191
2192 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);;
2193
2194 tail = this->tailIndex.load(std::memory_order_acquire);
2195 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2196 if (details::circular_less_than<size_t>(0, actualCount)) {
2197 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2198 if (actualCount < desiredCount) {
2199 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2200 }
2201
2202 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2203 // will never exceed tail.
2204 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2205
2206 // Determine which block the first element is in
2207 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2208 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2209
2210 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2211 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2212 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
2213 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2214
2215 // Iterate the blocks and dequeue
2216 auto index = firstIndex;
2217 do {
2218 auto firstIndexInBlock = index;
2219 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2220 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2221 auto block = localBlockIndex->entries[indexIndex].block;
2222 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2223 while (index != endIndex) {
2224 auto& el = *((*block)[index]);
2225 *itemFirst++ = std::move(el);
2226 el.~T();
2227 ++index;
2228 }
2229 }
2230 else {
2232 while (index != endIndex) {
2233 auto& el = *((*block)[index]);
2234 *itemFirst = std::move(el);
2235 ++itemFirst;
2236 el.~T();
2237 ++index;
2238 }
2239 }
2240 MOODYCAMEL_CATCH (...) {
2241 // It's too late to revert the dequeue, but we can make sure that all
2242 // the dequeued objects are properly destroyed and the block index
2243 // (and empty count) are properly updated before we propagate the exception
2244 do {
2245 block = localBlockIndex->entries[indexIndex].block;
2246 while (index != endIndex) {
2247 (*block)[index++]->~T();
2248 }
2249 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2250 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2251
2252 firstIndexInBlock = index;
2253 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2254 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2255 } while (index != firstIndex + actualCount);
2256
2258 }
2259 }
2260 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2261 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2262 } while (index != firstIndex + actualCount);
2263
2264 return actualCount;
2265 }
2266 else {
2267 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2268 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2269 }
2270 }
2271
2272 return 0;
2273 }
2274
2275 private:
2281
2283 {
2284 size_t size;
2285 std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
2287 void* prev;
2288 };
2289
2290
2291 bool new_block_index(size_t numberOfFilledSlotsToExpose)
2292 {
2293 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2294
2295 // Create the new block
2296 pr_blockIndexSize <<= 1;
2297 auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize));
2298 if (newRawPtr == nullptr) {
2299 pr_blockIndexSize >>= 1; // Reset to allow graceful retry
2300 return false;
2301 }
2302
2303 auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader)));
2304
2305 // Copy in all the old indices, if any
2306 size_t j = 0;
2307 if (pr_blockIndexSlotsUsed != 0) {
2308 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2309 do {
2310 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2311 i = (i + 1) & prevBlockSizeMask;
2312 } while (i != pr_blockIndexFront);
2313 }
2314
2315 // Update everything
2316 auto header = new (newRawPtr) BlockIndexHeader;
2317 header->size = pr_blockIndexSize;
2318 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2319 header->entries = newBlockIndexEntries;
2320 header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
2321
2323 pr_blockIndexEntries = newBlockIndexEntries;
2324 pr_blockIndexRaw = newRawPtr;
2325 blockIndex.store(header, std::memory_order_release);
2326
2327 return true;
2328 }
2329
2330 private:
2331 std::atomic<BlockIndexHeader*> blockIndex;
2332
2333 // To be used by producer only -- consumer must use the ones in referenced by blockIndex
2336 size_t pr_blockIndexFront; // Next slot (not current)
2339
2340#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2341 public:
2342 ExplicitProducer* nextExplicitProducer;
2343 private:
2344#endif
2345
2346#if MCDBGQ_TRACKMEM
2347 friend struct MemStats;
2348#endif
2349 };
2350
2351
2352 //////////////////////////////////
2353 // Implicit queue
2354 //////////////////////////////////
2355
2357 {
2365
2367 {
2368 // Note that since we're in the destructor we can assume that all enqueue/dequeue operations
2369 // completed already; this means that all undequeued elements are placed contiguously across
2370 // contiguous blocks, and that only the first and last remaining blocks can be only partially
2371 // empty (all other remaining blocks must be completely full).
2372
2373#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2374 // Unregister ourselves for thread termination notification
2375 if (!this->inactive.load(std::memory_order_relaxed)) {
2376 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2377 }
2378#endif
2379
2380 // Destroy all remaining elements!
2381 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2382 auto index = this->headIndex.load(std::memory_order_relaxed);
2383 Block* block = nullptr;
2384 assert(index == tail || details::circular_less_than(index, tail));
2385 bool forceFreeLastBlock = index != tail; // If we enter the loop, then the last (tail) block will not be freed
2386 while (index != tail) {
2387 if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
2388 if (block != nullptr) {
2389 // Free the old block
2390 this->parent->add_block_to_free_list(block);
2391 }
2392
2393 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2394 }
2395
2396 ((*block)[index])->~T();
2397 ++index;
2398 }
2399 // Even if the queue is empty, there's still one block that's not on the free list
2400 // (unless the head index reached the end of it, in which case the tail will be poised
2401 // to create a new block).
2402 if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2404 }
2405
2406 // Destroy block index
2407 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2408 if (localBlockIndex != nullptr) {
2409 for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2410 localBlockIndex->index[i]->~BlockIndexEntry();
2411 }
2412 do {
2413 auto prev = localBlockIndex->prev;
2414 localBlockIndex->~BlockIndexHeader();
2415 (Traits::free)(localBlockIndex);
2416 localBlockIndex = prev;
2417 } while (localBlockIndex != nullptr);
2418 }
2419 }
2420
2421 template<AllocationMode allocMode, typename U>
2422 inline bool enqueue(U&& element)
2423 {
2424 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2425 index_t newTailIndex = 1 + currentTailIndex;
2426 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2427 // We reached the end of a block, start a new one
2428 auto head = this->headIndex.load(std::memory_order_relaxed);
2429 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2430 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2431 return false;
2432 }
2433#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2434 debug::DebugLock lock(mutex);
2435#endif
2436 // Find out where we'll be inserting this block in the block index
2437 BlockIndexEntry* idxEntry;
2438 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2439 return false;
2440 }
2441
2442 // Get ahold of a new block
2443 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2444 if (newBlock == nullptr) {
2446 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2447 return false;
2448 }
2449#if MCDBGQ_TRACKMEM
2450 newBlock->owner = this;
2451#endif
2452 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2453
2454 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
2455 // May throw, try to insert now before we publish the fact that we have this new block
2457 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2458 }
2459 MOODYCAMEL_CATCH (...) {
2461 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2462 this->parent->add_block_to_free_list(newBlock);
2464 }
2465 }
2466
2467 // Insert the new block into the index
2468 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2469
2470 this->tailBlock = newBlock;
2471
2472 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
2473 this->tailIndex.store(newTailIndex, std::memory_order_release);
2474 return true;
2475 }
2476 }
2477
2478 // Enqueue
2479 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2480
2481 this->tailIndex.store(newTailIndex, std::memory_order_release);
2482 return true;
2483 }
2484
2485 template<typename U>
2486 bool dequeue(U& element)
2487 {
2488 // See ExplicitProducer::dequeue for rationale and explanation
2489 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2490 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2491 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2492 std::atomic_thread_fence(std::memory_order_acquire);
2493
2494 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2495 tail = this->tailIndex.load(std::memory_order_acquire);
2496 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2497 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2498
2499 // Determine which block the element is in
2500 auto entry = get_block_index_entry_for_index(index);
2501
2502 // Dequeue
2503 auto block = entry->value.load(std::memory_order_relaxed);
2504 auto& el = *((*block)[index]);
2505
2506 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2507#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2508 // Note: Acquiring the mutex with every dequeue instead of only when a block
2509 // is released is very sub-optimal, but it is, after all, purely debug code.
2510 debug::DebugLock lock(producer->mutex);
2511#endif
2512 struct Guard {
2513 Block* block;
2514 index_t index;
2515 BlockIndexEntry* entry;
2516 ConcurrentQueue* parent;
2517
2518 ~Guard()
2519 {
2520 (*block)[index]->~T();
2521 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2522 entry->value.store(nullptr, std::memory_order_relaxed);
2523 parent->add_block_to_free_list(block);
2524 }
2525 }
2526 } guard = { block, index, entry, this->parent };
2527
2528 element = std::move(el);
2529 }
2530 else {
2531 element = std::move(el);
2532 el.~T();
2533
2534 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2535 {
2536#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2537 debug::DebugLock lock(mutex);
2538#endif
2539 // Add the block back into the global free pool (and remove from block index)
2540 entry->value.store(nullptr, std::memory_order_relaxed);
2541 }
2542 this->parent->add_block_to_free_list(block); // releases the above store
2543 }
2544 }
2545
2546 return true;
2547 }
2548 else {
2549 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2550 }
2551 }
2552
2553 return false;
2554 }
2555
2556 template<AllocationMode allocMode, typename It>
2557 bool enqueue_bulk(It itemFirst, size_t count)
2558 {
2559 // First, we need to make sure we have enough room to enqueue all of the elements;
2560 // this means pre-allocating blocks and putting them in the block index (but only if
2561 // all the allocations succeeded).
2562
2563 // Note that the tailBlock we start off with may not be owned by us any more;
2564 // this happens if it was filled up exactly to the top (setting tailIndex to
2565 // the first index of the next block which is not yet allocated), then dequeued
2566 // completely (putting it on the free list) before we enqueue again.
2567
2568 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2569 auto startBlock = this->tailBlock;
2570 Block* firstAllocatedBlock = nullptr;
2571 auto endBlock = this->tailBlock;
2572
2573 // Figure out how many blocks we'll need to allocate, and do so
2574 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2575 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2576 if (blockBaseDiff > 0) {
2577#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2578 debug::DebugLock lock(mutex);
2579#endif
2580 do {
2581 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2582 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2583
2584 // Find out where we'll be inserting this block in the block index
2585 BlockIndexEntry* idxEntry = nullptr; // initialization here unnecessary but compiler can't always tell
2586 Block* newBlock;
2587 bool indexInserted = false;
2588 auto head = this->headIndex.load(std::memory_order_relaxed);
2589 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2590 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2591 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) {
2592 // Index allocation or block allocation failed; revert any other allocations
2593 // and index insertions done so far for this operation
2594 if (indexInserted) {
2596 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2597 }
2598 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2599 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2600 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2601 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2602 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2604 }
2605 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2606 this->tailBlock = startBlock;
2607
2608 return false;
2609 }
2610
2611#if MCDBGQ_TRACKMEM
2612 newBlock->owner = this;
2613#endif
2614 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2615 newBlock->next = nullptr;
2616
2617 // Insert the new block into the index
2618 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2619
2620 // Store the chain of blocks so that we can undo if later allocations fail,
2621 // and so that we can find the blocks when we do the actual enqueueing
2622 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) {
2623 assert(this->tailBlock != nullptr);
2624 this->tailBlock->next = newBlock;
2625 }
2626 this->tailBlock = newBlock;
2627 endBlock = newBlock;
2628 firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2629 } while (blockBaseDiff > 0);
2630 }
2631
2632 // Enqueue, one block at a time
2633 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2634 currentTailIndex = startTailIndex;
2635 this->tailBlock = startBlock;
2636 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2637 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2638 this->tailBlock = firstAllocatedBlock;
2639 }
2640 while (true) {
2641 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2642 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2643 stopIndex = newTailIndex;
2644 }
2645 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2646 while (currentTailIndex != stopIndex) {
2647 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2648 }
2649 }
2650 else {
2652 while (currentTailIndex != stopIndex) {
2653 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2654 ++currentTailIndex;
2655 ++itemFirst;
2656 }
2657 }
2658 MOODYCAMEL_CATCH (...) {
2659 auto constructedStopIndex = currentTailIndex;
2660 auto lastBlockEnqueued = this->tailBlock;
2661
2663 auto block = startBlock;
2664 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2665 block = firstAllocatedBlock;
2666 }
2667 currentTailIndex = startTailIndex;
2668 while (true) {
2669 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2670 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2671 stopIndex = constructedStopIndex;
2672 }
2673 while (currentTailIndex != stopIndex) {
2674 (*block)[currentTailIndex++]->~T();
2675 }
2676 if (block == lastBlockEnqueued) {
2677 break;
2678 }
2679 block = block->next;
2680 }
2681 }
2682
2683 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2684 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2685 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2686 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2687 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2689 }
2690 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2691 this->tailBlock = startBlock;
2693 }
2694 }
2695
2696 if (this->tailBlock == endBlock) {
2697 assert(currentTailIndex == newTailIndex);
2698 break;
2699 }
2700 this->tailBlock = this->tailBlock->next;
2701 }
2702 this->tailIndex.store(newTailIndex, std::memory_order_release);
2703 return true;
2704 }
2705
2706 template<typename It>
2707 size_t dequeue_bulk(It& itemFirst, size_t max)
2708 {
2709 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2710 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2711 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2712 if (details::circular_less_than<size_t>(0, desiredCount)) {
2713 desiredCount = desiredCount < max ? desiredCount : max;
2714 std::atomic_thread_fence(std::memory_order_acquire);
2715
2716 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2717
2718 tail = this->tailIndex.load(std::memory_order_acquire);
2719 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2720 if (details::circular_less_than<size_t>(0, actualCount)) {
2721 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2722 if (actualCount < desiredCount) {
2723 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2724 }
2725
2726 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2727 // will never exceed tail.
2728 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2729
2730 // Iterate the blocks and dequeue
2731 auto index = firstIndex;
2732 BlockIndexHeader* localBlockIndex;
2733 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2734 do {
2735 auto blockStartIndex = index;
2736 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2737 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2738
2739 auto entry = localBlockIndex->index[indexIndex];
2740 auto block = entry->value.load(std::memory_order_relaxed);
2741 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2742 while (index != endIndex) {
2743 auto& el = *((*block)[index]);
2744 *itemFirst++ = std::move(el);
2745 el.~T();
2746 ++index;
2747 }
2748 }
2749 else {
2751 while (index != endIndex) {
2752 auto& el = *((*block)[index]);
2753 *itemFirst = std::move(el);
2754 ++itemFirst;
2755 el.~T();
2756 ++index;
2757 }
2758 }
2759 MOODYCAMEL_CATCH (...) {
2760 do {
2761 entry = localBlockIndex->index[indexIndex];
2762 block = entry->value.load(std::memory_order_relaxed);
2763 while (index != endIndex) {
2764 (*block)[index++]->~T();
2765 }
2766
2767 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2768#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2769 debug::DebugLock lock(mutex);
2770#endif
2771 entry->value.store(nullptr, std::memory_order_relaxed);
2772 this->parent->add_block_to_free_list(block);
2773 }
2774 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2775
2776 blockStartIndex = index;
2777 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2778 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2779 } while (index != firstIndex + actualCount);
2780
2782 }
2783 }
2784 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2785 {
2786#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2787 debug::DebugLock lock(mutex);
2788#endif
2789 // Note that the set_many_empty above did a release, meaning that anybody who acquires the block
2790 // we're about to free can use it safely since our writes (and reads!) will have happened-before then.
2791 entry->value.store(nullptr, std::memory_order_relaxed);
2792 }
2793 this->parent->add_block_to_free_list(block); // releases the above store
2794 }
2795 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2796 } while (index != firstIndex + actualCount);
2797
2798 return actualCount;
2799 }
2800 else {
2801 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2802 }
2803 }
2804
2805 return 0;
2806 }
2807
2808 private:
2809 // The block size must be > 1, so any number with the low bit set is an invalid block base index
2810 static const index_t INVALID_BLOCK_BASE = 1;
2811
2813 {
2814 std::atomic<index_t> key;
2815 std::atomic<Block*> value;
2816 };
2817
2826
2827 template<AllocationMode allocMode>
2828 inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
2829 {
2830 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
2831 if (localBlockIndex == nullptr) {
2832 return false; // this can happen if new_block_index failed in the constructor
2833 }
2834 auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2835 idxEntry = localBlockIndex->index[newTail];
2836 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2837 idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
2838
2839 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2840 localBlockIndex->tail.store(newTail, std::memory_order_release);
2841 return true;
2842 }
2843
2844 // No room in the old block index, try to allocate another one!
2845 if (allocMode == CannotAlloc || !new_block_index()) {
2846 return false;
2847 }
2848 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2849 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2850 idxEntry = localBlockIndex->index[newTail];
2851 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2852 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2853 localBlockIndex->tail.store(newTail, std::memory_order_release);
2854 return true;
2855 }
2856
2858 {
2859 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2860 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2861 }
2862
2864 {
2865 BlockIndexHeader* localBlockIndex;
2866 auto idx = get_block_index_index_for_index(index, localBlockIndex);
2867 return localBlockIndex->index[idx];
2868 }
2869
2870 inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const
2871 {
2872#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2873 debug::DebugLock lock(mutex);
2874#endif
2875 index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2876 localBlockIndex = blockIndex.load(std::memory_order_acquire);
2877 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2878 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2879 assert(tailBase != INVALID_BLOCK_BASE);
2880 // Note: Must use division instead of shift because the index may wrap around, causing a negative
2881 // offset, whose negativity we want to preserve
2882 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / BLOCK_SIZE);
2883 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2884 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
2885 return idx;
2886 }
2887
2889 {
2890 auto prev = blockIndex.load(std::memory_order_relaxed);
2891 size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
2892 auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
2893 auto raw = static_cast<char*>((Traits::malloc)(
2894 sizeof(BlockIndexHeader) +
2895 std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount +
2896 std::alignment_of<BlockIndexEntry*>::value - 1 + sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
2897 if (raw == nullptr) {
2898 return false;
2899 }
2900
2901 auto header = new (raw) BlockIndexHeader;
2902 auto entries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader)));
2903 auto index = reinterpret_cast<BlockIndexEntry**>(details::align_for<BlockIndexEntry*>(reinterpret_cast<char*>(entries) + sizeof(BlockIndexEntry) * entryCount));
2904 if (prev != nullptr) {
2905 auto prevTail = prev->tail.load(std::memory_order_relaxed);
2906 auto prevPos = prevTail;
2907 size_t i = 0;
2908 do {
2909 prevPos = (prevPos + 1) & (prev->capacity - 1);
2910 index[i++] = prev->index[prevPos];
2911 } while (prevPos != prevTail);
2912 assert(i == prevCapacity);
2913 }
2914 for (size_t i = 0; i != entryCount; ++i) {
2915 new (entries + i) BlockIndexEntry;
2916 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
2917 index[prevCapacity + i] = entries + i;
2918 }
2919 header->prev = prev;
2920 header->entries = entries;
2921 header->index = index;
2922 header->capacity = nextBlockIndexCapacity;
2923 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
2924
2925 blockIndex.store(header, std::memory_order_release);
2926
2928
2929 return true;
2930 }
2931
2932 private:
2934 std::atomic<BlockIndexHeader*> blockIndex;
2935
2936#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2937 public:
2938 details::ThreadExitListener threadExitListener;
2939 private:
2940#endif
2941
2942#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2943 public:
2944 ImplicitProducer* nextImplicitProducer;
2945 private:
2946#endif
2947
2948#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2949 mutable debug::DebugMutex mutex;
2950#endif
2951#if MCDBGQ_TRACKMEM
2952 friend struct MemStats;
2953#endif
2954 };
2955
2956
2957 //////////////////////////////////
2958 // Block pool manipulation
2959 //////////////////////////////////
2960
2961 void populate_initial_block_list(size_t blockCount)
2962 {
2963 initialBlockPoolSize = blockCount;
2964 if (initialBlockPoolSize == 0) {
2965 initialBlockPool = nullptr;
2966 return;
2967 }
2968
2969 initialBlockPool = create_array<Block>(blockCount);
2970 if (initialBlockPool == nullptr) {
2972 }
2973 for (size_t i = 0; i < initialBlockPoolSize; ++i) {
2975 }
2976 }
2977
2979 {
2980 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
2981 return nullptr;
2982 }
2983
2984 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
2985
2986 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
2987 }
2988
2989 inline void add_block_to_free_list(Block* block)
2990 {
2991#if MCDBGQ_TRACKMEM
2992 block->owner = nullptr;
2993#endif
2994 freeList.add(block);
2995 }
2996
2997 inline void add_blocks_to_free_list(Block* block)
2998 {
2999 while (block != nullptr) {
3000 auto next = block->next;
3002 block = next;
3003 }
3004 }
3005
3007 {
3008 return freeList.try_get();
3009 }
3010
3011 // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
3012 template<AllocationMode canAlloc>
3014 {
3015 auto block = try_get_block_from_initial_pool();
3016 if (block != nullptr) {
3017 return block;
3018 }
3019
3021 if (block != nullptr) {
3022 return block;
3023 }
3024
3025 if (canAlloc == CanAlloc) {
3026 return create<Block>();
3027 }
3028
3029 return nullptr;
3030 }
3031
3032
3033#if MCDBGQ_TRACKMEM
3034 public:
3035 struct MemStats {
3036 size_t allocatedBlocks;
3037 size_t usedBlocks;
3038 size_t freeBlocks;
3039 size_t ownedBlocksExplicit;
3040 size_t ownedBlocksImplicit;
3041 size_t implicitProducers;
3042 size_t explicitProducers;
3043 size_t elementsEnqueued;
3044 size_t blockClassBytes;
3045 size_t queueClassBytes;
3046 size_t implicitBlockIndexBytes;
3047 size_t explicitBlockIndexBytes;
3048
3049 friend class ConcurrentQueue;
3050
3051 private:
3052 static MemStats getFor(ConcurrentQueue* q)
3053 {
3054 MemStats stats = { 0 };
3055
3056 stats.elementsEnqueued = q->size_approx();
3057
3058 auto block = q->freeList.head_unsafe();
3059 while (block != nullptr) {
3060 ++stats.allocatedBlocks;
3061 ++stats.freeBlocks;
3062 block = block->freeListNext.load(std::memory_order_relaxed);
3063 }
3064
3065 for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3066 bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3067 stats.implicitProducers += implicit ? 1 : 0;
3068 stats.explicitProducers += implicit ? 0 : 1;
3069
3070 if (implicit) {
3071 auto prod = static_cast<ImplicitProducer*>(ptr);
3072 stats.queueClassBytes += sizeof(ImplicitProducer);
3073 auto head = prod->headIndex.load(std::memory_order_relaxed);
3074 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3075 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3076 if (hash != nullptr) {
3077 for (size_t i = 0; i != hash->capacity; ++i) {
3078 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3079 ++stats.allocatedBlocks;
3080 ++stats.ownedBlocksImplicit;
3081 }
3082 }
3083 stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3084 for (; hash != nullptr; hash = hash->prev) {
3085 stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3086 }
3087 }
3088 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3089 //auto block = prod->get_block_index_entry_for_index(head);
3090 ++stats.usedBlocks;
3091 }
3092 }
3093 else {
3094 auto prod = static_cast<ExplicitProducer*>(ptr);
3095 stats.queueClassBytes += sizeof(ExplicitProducer);
3096 auto tailBlock = prod->tailBlock;
3097 bool wasNonEmpty = false;
3098 if (tailBlock != nullptr) {
3099 auto block = tailBlock;
3100 do {
3101 ++stats.allocatedBlocks;
3102 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3103 ++stats.usedBlocks;
3104 wasNonEmpty = wasNonEmpty || block != tailBlock;
3105 }
3106 ++stats.ownedBlocksExplicit;
3107 block = block->next;
3108 } while (block != tailBlock);
3109 }
3110 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3111 while (index != nullptr) {
3112 stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3113 index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3114 }
3115 }
3116 }
3117
3118 auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3119 stats.allocatedBlocks += freeOnInitialPool;
3120 stats.freeBlocks += freeOnInitialPool;
3121
3122 stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3123 stats.queueClassBytes += sizeof(ConcurrentQueue);
3124
3125 return stats;
3126 }
3127 };
3128
3129 // For debugging only. Not thread-safe.
3130 MemStats getMemStats()
3131 {
3132 return MemStats::getFor(this);
3133 }
3134 private:
3135 friend struct MemStats;
3136#endif
3137
3138
3139 //////////////////////////////////
3140 // Producer list manipulation
3141 //////////////////////////////////
3142
3144 {
3145 bool recycled;
3146 return recycle_or_create_producer(isExplicit, recycled);
3147 }
3148
3149 ProducerBase* recycle_or_create_producer(bool isExplicit, bool& recycled)
3150 {
3151#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3152 debug::DebugLock lock(implicitProdMutex);
3153#endif
3154 // Try to re-use one first
3155 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3156 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3157 bool expected = true;
3158 if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
3159 // We caught one! It's been marked as activated, the caller can have it
3160 recycled = true;
3161 return ptr;
3162 }
3163 }
3164 }
3165
3166 recycled = false;
3167 return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this)) : create<ImplicitProducer>(this));
3168 }
3169
3171 {
3172 // Handle failed memory allocation
3173 if (producer == nullptr) {
3174 return nullptr;
3175 }
3176
3177 producerCount.fetch_add(1, std::memory_order_relaxed);
3178
3179 // Add it to the lock-free list
3180 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3181 do {
3182 producer->next = prevTail;
3183 } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3184
3185#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3186 if (producer->isExplicit) {
3187 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3188 do {
3189 static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3190 } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3191 }
3192 else {
3193 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3194 do {
3195 static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3196 } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3197 }
3198#endif
3199
3200 return producer;
3201 }
3202
3204 {
3205 // After another instance is moved-into/swapped-with this one, all the
3206 // producers we stole still think their parents are the other queue.
3207 // So fix them up!
3208 for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
3209 ptr->parent = this;
3210 }
3211 }
3212
3213
3214 //////////////////////////////////
3215 // Implicit producer hash
3216 //////////////////////////////////
3217
3219 {
3220 std::atomic<details::thread_id_t> key;
3221 ImplicitProducer* value; // No need for atomicity since it's only read by the thread that sets it in the first place
3222
3224
3226 {
3227 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3228 value = other.value;
3229 }
3230
3232 {
3233 swap(other);
3234 return *this;
3235 }
3236
3238 {
3239 if (this != &other) {
3240 details::swap_relaxed(key, other.key);
3241 std::swap(value, other.value);
3242 }
3243 }
3244 };
3245
3246 template<typename XT, typename XTraits>
3248
3255
3257 {
3258 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3259
3260 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3261 auto hash = &initialImplicitProducerHash;
3263 hash->entries = &initialImplicitProducerHashEntries[0];
3264 for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3265 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3266 }
3267 hash->prev = nullptr;
3268 implicitProducerHash.store(hash, std::memory_order_relaxed);
3269 }
3270
3272 {
3273 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3274
3275 // Swap (assumes our implicit producer hash is initialized)
3279
3281
3283 if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3284 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3285 }
3286 else {
3288 for (hash = implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3289 continue;
3290 }
3292 }
3293 if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3294 other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3295 }
3296 else {
3298 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3299 continue;
3300 }
3301 hash->prev = &other.initialImplicitProducerHash;
3302 }
3303 }
3304
3305 // Only fails (returns nullptr) if memory allocation fails
3307 {
3308 // Note that since the data is essentially thread-local (key is thread ID),
3309 // there's a reduced need for fences (memory ordering is already consistent
3310 // for any individual thread), except for the current table itself.
3311
3312 // Start by looking for the thread ID in the current and all previous hash tables.
3313 // If it's not found, it must not be in there yet, since this same thread would
3314 // have added it previously to one of the tables that we traversed.
3315
3316 // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3317
3318#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3319 debug::DebugLock lock(implicitProdMutex);
3320#endif
3321
3322 auto id = details::thread_id();
3323 auto hashedId = details::hash_thread_id(id);
3324
3325 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3326 for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3327 // Look for the id in this hash
3328 auto index = hashedId;
3329 while (true) { // Not an infinite loop because at least one slot is free in the hash table
3330 index &= hash->capacity - 1;
3331
3332 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3333 if (probedKey == id) {
3334 // Found it! If we had to search several hashes deep, though, we should lazily add it
3335 // to the current main hash table to avoid the extended search next time.
3336 // Note there's guaranteed to be room in the current hash table since every subsequent
3337 // table implicitly reserves space for all previous tables (there's only one
3338 // implicitProducerHashCount).
3339 auto value = hash->entries[index].value;
3340 if (hash != mainHash) {
3341 index = hashedId;
3342 while (true) {
3343 index &= mainHash->capacity - 1;
3344 probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3345 auto empty = details::invalid_thread_id;
3346#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3347 auto reusable = details::invalid_thread_id2;
3348 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3349 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3350#else
3351 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3352#endif
3353 mainHash->entries[index].value = value;
3354 break;
3355 }
3356 ++index;
3357 }
3358 }
3359
3360 return value;
3361 }
3362 if (probedKey == details::invalid_thread_id) {
3363 break; // Not in this hash table
3364 }
3365 ++index;
3366 }
3367 }
3368
3369 // Insert!
3370 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3371 while (true) {
3372 if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3373 // We've acquired the resize lock, try to allocate a bigger hash table.
3374 // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
3375 // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
3376 // locked block).
3377 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3378 if (newCount >= (mainHash->capacity >> 1)) {
3379 auto newCapacity = mainHash->capacity << 1;
3380 while (newCount >= (newCapacity >> 1)) {
3381 newCapacity <<= 1;
3382 }
3383 auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
3384 if (raw == nullptr) {
3385 // Allocation failed
3386 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3387 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3388 return nullptr;
3389 }
3390
3391 auto newHash = new (raw) ImplicitProducerHash;
3392 newHash->capacity = newCapacity;
3393 newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
3394 for (size_t i = 0; i != newCapacity; ++i) {
3395 new (newHash->entries + i) ImplicitProducerKVP;
3396 newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3397 }
3398 newHash->prev = mainHash;
3399 implicitProducerHash.store(newHash, std::memory_order_release);
3400 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3401 mainHash = newHash;
3402 }
3403 else {
3404 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3405 }
3406 }
3407
3408 // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
3409 // to finish being allocated by another thread (and if we just finished allocating above, the condition will
3410 // always be true)
3411 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3412 bool recycled;
3413 auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled));
3414 if (producer == nullptr) {
3415 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3416 return nullptr;
3417 }
3418 if (recycled) {
3419 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3420 }
3421
3422#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3423 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3424 producer->threadExitListener.userData = producer;
3425 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3426#endif
3427
3428 auto index = hashedId;
3429 while (true) {
3430 index &= mainHash->capacity - 1;
3431 auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3432
3433 auto empty = details::invalid_thread_id;
3434#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3435 auto reusable = details::invalid_thread_id2;
3436 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3437 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3438#else
3439 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3440#endif
3441 mainHash->entries[index].value = producer;
3442 break;
3443 }
3444 ++index;
3445 }
3446 return producer;
3447 }
3448
3449 // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
3450 // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
3451 // we try to allocate ourselves).
3452 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3453 }
3454 }
3455
3456#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3457 void implicit_producer_thread_exited(ImplicitProducer* producer)
3458 {
3459 // Remove from thread exit listeners
3460 details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3461
3462 // Remove from hash
3463#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3464 debug::DebugLock lock(implicitProdMutex);
3465#endif
3466 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3467 assert(hash != nullptr); // The thread exit listener is only registered if we were added to a hash in the first place
3468 auto id = details::thread_id();
3469 auto hashedId = details::hash_thread_id(id);
3470 details::thread_id_t probedKey;
3471
3472 // We need to traverse all the hashes just in case other threads aren't on the current one yet and are
3473 // trying to add an entry thinking there's a free slot (because they reused a producer)
3474 for (; hash != nullptr; hash = hash->prev) {
3475 auto index = hashedId;
3476 do {
3477 index &= hash->capacity - 1;
3478 probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3479 if (probedKey == id) {
3480 hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
3481 break;
3482 }
3483 ++index;
3484 } while (probedKey != details::invalid_thread_id); // Can happen if the hash has changed but we weren't put back in it yet, or if we weren't added to this hash in the first place
3485 }
3486
3487 // Mark the queue as being recyclable
3488 producer->inactive.store(true, std::memory_order_release);
3489 }
3490
3491 static void implicit_producer_thread_exited_callback(void* userData)
3492 {
3493 auto producer = static_cast<ImplicitProducer*>(userData);
3494 auto queue = producer->parent;
3495 queue->implicit_producer_thread_exited(producer);
3496 }
3497#endif
3498
3499 //////////////////////////////////
3500 // Utility functions
3501 //////////////////////////////////
3502
3503 template<typename U>
3504 static inline U* create_array(size_t count)
3505 {
3506 assert(count > 0);
3507 auto p = static_cast<U*>((Traits::malloc)(sizeof(U) * count));
3508 if (p == nullptr) {
3509 return nullptr;
3510 }
3511
3512 for (size_t i = 0; i != count; ++i) {
3513 new (p + i) U();
3514 }
3515 return p;
3516 }
3517
3518 template<typename U>
3519 static inline void destroy_array(U* p, size_t count)
3520 {
3521 if (p != nullptr) {
3522 assert(count > 0);
3523 for (size_t i = count; i != 0; ) {
3524 (p + --i)->~U();
3525 }
3526 (Traits::free)(p);
3527 }
3528 }
3529
3530 template<typename U>
3531 static inline U* create()
3532 {
3533 auto p = (Traits::malloc)(sizeof(U));
3534 return p != nullptr ? new (p) U : nullptr;
3535 }
3536
3537 template<typename U, typename A1>
3538 static inline U* create(A1&& a1)
3539 {
3540 auto p = (Traits::malloc)(sizeof(U));
3541 return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
3542 }
3543
3544 template<typename U>
3545 static inline void destroy(U* p)
3546 {
3547 if (p != nullptr) {
3548 p->~U();
3549 }
3550 (Traits::free)(p);
3551 }
3552
3553private:
3554 std::atomic<ProducerBase*> producerListTail;
3555 std::atomic<std::uint32_t> producerCount;
3556
3557 std::atomic<size_t> initialBlockPoolIndex;
3560
3561#if !MCDBGQ_USEDEBUGFREELIST
3563#else
3564 debug::DebugFreeList<Block> freeList;
3565#endif
3566
3567 std::atomic<ImplicitProducerHash*> implicitProducerHash;
3568 std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
3570 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3572
3573 std::atomic<std::uint32_t> nextExplicitConsumerId;
3574 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3575
3576#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3577 debug::DebugMutex implicitProdMutex;
3578#endif
3579
3580#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3581 std::atomic<ExplicitProducer*> explicitProducers;
3582 std::atomic<ImplicitProducer*> implicitProducers;
3583#endif
3584};
3585
3586
3587template<typename T, typename Traits>
3589 : producer(queue.recycle_or_create_producer(true))
3590{
3591 if (producer != nullptr) {
3592 producer->token = this;
3593 }
3594}
3595
3596template<typename T, typename Traits>
3598 : producer(reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true))
3599{
3600 if (producer != nullptr) {
3601 producer->token = this;
3602 }
3603}
3604
3605template<typename T, typename Traits>
3607 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3608{
3609 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3611}
3612
3613template<typename T, typename Traits>
3615 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3616{
3617 initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3619}
3620
3621template<typename T, typename Traits>
3626
3628{
3629 a.swap(b);
3630}
3631
3633{
3634 a.swap(b);
3635}
3636
3637template<typename T, typename Traits>
3642
3643}
3644
3645#if defined(__GNUC__)
3646#pragma GCC diagnostic pop
3647#endif
#define MOODYCAMEL_NOEXCEPT
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)
#define MOODYCAMEL_THREADLOCAL
#define MOODYCAMEL_DELETE_FUNCTION
#define MOODYCAMEL_CATCH(...)
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr)
#define MOODYCAMEL_RETHROW
#define MOODYCAMEL_TRY
bool try_enqueue(producer_token_t const &token, T &&item)
ProducerBase * recycle_or_create_producer(bool isExplicit)
ConcurrentQueue & swap_internal(ConcurrentQueue &other)
bool enqueue(producer_token_t const &token, T const &item)
bool enqueue(producer_token_t const &token, T &&item)
ConcurrentQueue(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)
static const size_t MAX_SUBQUEUE_SIZE
bool update_current_producer_after_rotation(consumer_token_t &token)
ConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ConcurrentQueue & operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
bool inner_enqueue_bulk(It itemFirst, size_t count)
ImplicitProducer * get_or_add_implicit_producer()
std::atomic< std::uint32_t > producerCount
static void destroy_array(U *p, size_t count)
std::array< ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE > initialImplicitProducerHashEntries
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
bool enqueue_bulk(It itemFirst, size_t count)
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
std::atomic< std::uint32_t > globalExplicitConsumerOffset
bool try_dequeue_from_producer(producer_token_t const &producer, U &item)
void add_blocks_to_free_list(Block *block)
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ImplicitProducerHash initialImplicitProducerHash
bool try_dequeue(consumer_token_t &token, U &item)
void add_block_to_free_list(Block *block)
static U * create_array(size_t count)
::moodycamel::ProducerToken producer_token_t
void populate_initial_block_list(size_t blockCount)
void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
std::atomic< size_t > implicitProducerHashCount
size_t try_dequeue_bulk(It itemFirst, size_t max)
std::atomic< ProducerBase * > producerListTail
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
ProducerBase * recycle_or_create_producer(bool isExplicit, bool &recycled)
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
std::atomic< size_t > initialBlockPoolIndex
bool try_enqueue(T const &item)
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
void swap_implicit_producer_hashes(ConcurrentQueue &other)
std::atomic< ImplicitProducerHash * > implicitProducerHash
std::atomic_flag implicitProducerHashResizeInProgress
bool inner_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ConcurrentQueue & operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
::moodycamel::ConsumerToken consumer_token_t
ProducerBase * add_producer(ProducerBase *producer)
bool enqueue(T const &item)
std::atomic< std::uint32_t > nextExplicitConsumerId
bool try_enqueue(producer_token_t const &token, T const &item)
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
friend void moodycamel::swap(typename ConcurrentQueue< XT, XTraits >::ImplicitProducerKVP &, typename ConcurrentQueue< XT, XTraits >::ImplicitProducerKVP &) MOODYCAMEL_NOEXCEPT
bool try_dequeue_non_interleaved(U &item)
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
static const size_t BLOCK_SIZE
ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
bool try_enqueue_bulk(It itemFirst, size_t count)
bool inner_enqueue(producer_token_t const &token, U &&element)
static const thread_id_t invalid_thread_id2
static bool unlikely(bool x)
static bool circular_less_than(T a, T b)
static void swap_relaxed(std::atomic< T > &left, std::atomic< T > &right)
std::max_align_t std_max_align_t
static auto deref_noexcept(It &it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
static const thread_id_t invalid_thread_id
static size_t hash_thread_id(thread_id_t id)
static thread_id_t thread_id()
static T const & nomove(T const &x)
static bool likely(bool x)
static char * align_for(char *ptr)
std::uintptr_t thread_id_t
static T ceil_to_pow_2(T x)
void swap(typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &a, typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
bool set_many_empty(index_t i, size_t count)
char elements[sizeof(T) *BLOCK_SIZE]
std::atomic< std::uint32_t > freeListRefs
std::atomic< bool > emptyFlags[BLOCK_SIZE<=EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE :1]
std::atomic< size_t > elementsCompletelyDequeued
T * operator[](index_t idx) MOODYCAMEL_NOEXCEPT
T const * operator[](index_t idx) const MOODYCAMEL_NOEXCEPT
std::atomic< BlockIndexHeader * > blockIndex
size_t dequeue_bulk(It &itemFirst, size_t max)
bool new_block_index(size_t numberOfFilledSlotsToExpose)
bool enqueue_bulk(It itemFirst, size_t count)
std::atomic< std::uint32_t > freeListRefs
static const std::uint32_t SHOULD_BE_ON_FREELIST
static const std::uint32_t REFS_MASK
FreeList(FreeList const &) MOODYCAMEL_DELETE_FUNCTION
FreeList & operator=(FreeList const &) MOODYCAMEL_DELETE_FUNCTION
std::atomic< details::thread_id_t > key
ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT
void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT
ImplicitProducerKVP & operator=(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT
bool enqueue_bulk(It itemFirst, size_t count)
size_t dequeue_bulk(It &itemFirst, size_t max)
bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex)
BlockIndexEntry * get_block_index_entry_for_index(index_t index) const
size_t get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex) const
std::atomic< BlockIndexHeader * > blockIndex
size_t dequeue_bulk(It &itemFirst, size_t max)
ProducerBase(ConcurrentQueue *parent_, bool isExplicit_)
std::uint32_t itemsConsumedFromCurrent
ConsumerToken(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
ConsumerToken(ConcurrentQueue< T, Traits > &q)
ConsumerToken & operator=(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
details::ConcurrentQueueProducerTypelessBase * desiredProducer
std::uint32_t lastKnownGlobalOffset
ConsumerToken & operator=(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
details::ConcurrentQueueProducerTypelessBase * currentProducer
ConsumerToken(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
void swap(ConsumerToken &other) MOODYCAMEL_NOEXCEPT
ProducerToken & operator=(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
ProducerToken(ConcurrentQueue< T, Traits > &queue)
details::ConcurrentQueueProducerTypelessBase * producer
ProducerToken(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
void swap(ProducerToken &other) MOODYCAMEL_NOEXCEPT
ProducerToken(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
ProducerToken & operator=(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
static std::uint64_t hash(std::uint64_t h)
static std::uint32_t hash(std::uint32_t h)
static auto eval(U &&x) -> decltype(std::forward< U >(x))
static T const & eval(T const &x)
static thread_id_hash_t prehash(thread_id_t const &x)