38#pragma GCC diagnostic push
39#pragma GCC diagnostic ignored "-Wconversion"
41#ifdef MCDBGQ_USE_RELACY
42#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
47#include "TargetConditionals.h"
50#ifdef MCDBGQ_USE_RELACY
51#include "relacy/relacy_std.hpp"
52#include "relacy_shims.h"
82#if defined(MCDBGQ_USE_RELACY)
89#elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
92extern "C" __declspec(dllimport)
unsigned long __stdcall GetCurrentThreadId(
void);
94 static_assert(
sizeof(
unsigned long) ==
sizeof(std::uint32_t),
"Expected size of unsigned long to be 32 bits on Windows");
100#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
102 static_assert(
sizeof(std::thread::id) == 4 ||
sizeof(std::thread::id) == 8,
"std::thread::id is expected to be either 4 or 8 bytes");
112 template<std::
size_t>
struct thread_id_size { };
113 template<>
struct thread_id_size<4> {
typedef std::uint32_t numeric_t; };
114 template<>
struct thread_id_size<8> {
typedef std::uint64_t numeric_t; };
116 template<>
struct thread_id_converter<
thread_id_t> {
127 return std::hash<std::thread::id>()(x);
138#if defined(__GNUC__) || defined(__INTEL_COMPILER)
139#define MOODYCAMEL_THREADLOCAL __thread
140#elif defined(_MSC_VER)
141#define MOODYCAMEL_THREADLOCAL __declspec(thread)
144#define MOODYCAMEL_THREADLOCAL thread_local
155#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
156#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
157#define MOODYCAMEL_EXCEPTIONS_ENABLED
165#if (defined(LIBCARLA_NO_EXCEPTIONS) && defined(MOODYCAMEL_EXCEPTIONS_ENABLED))
166# undef MOODYCAMEL_EXCEPTIONS_ENABLED
169#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
170#define MOODYCAMEL_TRY try
171#define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
172#define MOODYCAMEL_RETHROW throw
173#define MOODYCAMEL_THROW(expr) ::carla::throw_exception(expr)
175#define MOODYCAMEL_TRY if (true)
176#define MOODYCAMEL_CATCH(...) else if (false)
177#define MOODYCAMEL_RETHROW
178#define MOODYCAMEL_THROW(expr) ::carla::throw_exception(expr)
183#ifndef MOODYCAMEL_NOEXCEPT
184#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
185#define MOODYCAMEL_NOEXCEPT
186#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
187#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
188#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
191#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
192#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
193#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
194#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
195#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
196#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
197#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
199#define MOODYCAMEL_NOEXCEPT noexcept
200#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
201#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
205#ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
206#ifdef MCDBGQ_USE_RELACY
207#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
212#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
221#ifndef MOODYCAMEL_DELETE_FUNCTION
222#if defined(_MSC_VER) && _MSC_VER < 1800
223#define MOODYCAMEL_DELETE_FUNCTION
225#define MOODYCAMEL_DELETE_FUNCTION = delete
232 static inline bool (
likely)(
bool x) {
return __builtin_expect((x),
true); }
233 static inline bool (
unlikely)(
bool x) {
return __builtin_expect((x),
false); }
235 static inline bool (
likely)(
bool x) {
return x; }
236 static inline bool (
unlikely)(
bool x) {
return x; }
240#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
241#include "internal/concurrentqueue_internal_debug.h"
248 static_assert(std::is_integral<T>::value,
"const_numeric_max can only be used with integers");
249 static const T
value = std::numeric_limits<T>::is_signed
250 ? (
static_cast<T
>(1) << (
sizeof(T) * CHAR_BIT - 1)) -
static_cast<T
>(1)
251 : static_cast<T>(-1);
254#if defined(__GLIBCXX__)
332#ifndef MCDBGQ_USE_RELACY
335#if defined(malloc) || defined(free)
338 static inline void* WORKAROUND_malloc(
size_t size) {
return malloc(size); }
339 static inline void WORKAROUND_free(
void* ptr) {
return free(ptr); }
340 static inline void* (
malloc)(
size_t size) {
return WORKAROUND_malloc(size); }
341 static inline void (
free)(
void* ptr) {
return WORKAROUND_free(ptr); }
343 static inline void*
malloc(
size_t size) {
return std::malloc(size); }
344 static inline void free(
void* ptr) {
return std::free(ptr); }
349 static inline void*
malloc(
size_t size) {
return rl::rl_malloc(size, $); }
350 static inline void free(
void* ptr) {
return rl::rl_free(ptr, $); }
365template<
typename T,
typename Traits>
class ConcurrentQueue;
367class ConcurrentQueueTests;
385 static inline std::uint32_t
hash(std::uint32_t h)
395 return h ^ (h >> 16);
399 static inline std::uint64_t
hash(std::uint64_t h)
402 h *= 0xff51afd7ed558ccd;
404 h *= 0xc4ceb9fe1a85ec53;
405 return h ^ (h >> 33);
412 static_assert(
sizeof(
thread_id_t) <= 8,
"Expected a platform where thread IDs are at most 64-bit values");
422#pragma warning(disable: 4554)
424 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
"circular_less_than is intended to be used only with unsigned integer types");
425 return static_cast<T
>(a - b) >
static_cast<T
>(
static_cast<T
>(1) <<
static_cast<T
>(
sizeof(T) * CHAR_BIT - 1));
434 const std::size_t alignment = std::alignment_of<U>::value;
435 return ptr + (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) % alignment;
441 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
"ceil_to_pow_2 is intended to be used only with unsigned integer types");
448 for (std::size_t i = 1; i <
sizeof(T); i <<= 1) {
456 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
458 T temp = std::move(left.load(std::memory_order_relaxed));
459 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
460 right.store(std::move(temp), std::memory_order_relaxed);
464 static inline T
const&
nomove(T
const& x)
469 template<
bool Enable>
473 static inline T
const&
eval(T
const& x)
484 ->
decltype(std::forward<U>(x))
486 return std::forward<U>(x);
490 template<
typename It>
496#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
502#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
503#ifdef MCDBGQ_USE_RELACY
504 typedef RelacyThreadExitListener ThreadExitListener;
505 typedef RelacyThreadExitNotifier ThreadExitNotifier;
507 struct ThreadExitListener
509 typedef void (*callback_t)(
void*);
513 ThreadExitListener* next;
517 class ThreadExitNotifier
520 static void subscribe(ThreadExitListener* listener)
522 auto& tlsInst = instance();
523 listener->next = tlsInst.tail;
524 tlsInst.tail = listener;
527 static void unsubscribe(ThreadExitListener* listener)
529 auto& tlsInst = instance();
530 ThreadExitListener** prev = &tlsInst.tail;
531 for (
auto ptr = tlsInst.tail; ptr !=
nullptr; ptr = ptr->next) {
532 if (ptr == listener) {
541 ThreadExitNotifier() : tail(nullptr) { }
545 ~ThreadExitNotifier()
548 assert(
this == &instance() &&
"If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
549 for (
auto ptr = tail; ptr !=
nullptr; ptr = ptr->next) {
550 ptr->callback(ptr->userData);
555 static inline ThreadExitNotifier& instance()
557 static thread_local ThreadExitNotifier notifier;
562 ThreadExitListener* tail;
581 template<
typename T,
typename Traits>
584 template<
typename T,
typename Traits>
590 other.producer =
nullptr;
604 std::swap(
producer, other.producer);
608 if (other.producer !=
nullptr) {
646 template<
typename T,
typename Traits>
649 template<
typename T,
typename Traits>
690template<
typename T,
typename Traits>
694template<
typename T,
typename Traits = ConcurrentQueueDefaultTraits>
704 static const size_t BLOCK_SIZE =
static_cast<size_t>(Traits::BLOCK_SIZE);
712#pragma warning(disable: 4307)
713#pragma warning(disable: 4309)
720 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
"Traits::size_t must be an unsigned integral type");
721 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
"Traits::index_t must be an unsigned integral type");
722 static_assert(
sizeof(
index_t) >=
sizeof(
size_t),
"Traits::index_t must be at least as wide as Traits::size_t");
752#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
757 explicitProducers.store(
nullptr, std::memory_order_relaxed);
758 implicitProducers.store(
nullptr, std::memory_order_relaxed);
765 ConcurrentQueue(
size_t minCapacity,
size_t maxExplicitProducers,
size_t maxImplicitProducers)
774 size_t blocks = (((minCapacity +
BLOCK_SIZE - 1) /
BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
777#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
778 explicitProducers.store(
nullptr, std::memory_order_relaxed);
779 implicitProducers.store(
nullptr, std::memory_order_relaxed);
790 while (ptr !=
nullptr) {
791 auto next = ptr->next_prod();
792 if (ptr->token !=
nullptr) {
793 ptr->token->producer =
nullptr;
802 while (hash !=
nullptr) {
803 auto prev = hash->prev;
804 if (prev !=
nullptr) {
805 for (
size_t i = 0; i != hash->capacity; ++i) {
806 hash->entries[i].~ImplicitProducerKVP();
808 hash->~ImplicitProducerHash();
809 (Traits::free)(hash);
816 auto block =
freeList.head_unsafe();
817 while (block !=
nullptr) {
818 auto next = block->freeListNext.load(std::memory_order_relaxed);
819 if (block->dynamicallyAllocated) {
841 producerCount(other.producerCount.load(std::memory_order_relaxed)),
845 freeList(std::move(other.freeList)),
854 other.producerListTail.store(
nullptr, std::memory_order_relaxed);
855 other.producerCount.store(0, std::memory_order_relaxed);
856 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
857 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
859#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
860 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
861 other.explicitProducers.store(
nullptr, std::memory_order_relaxed);
862 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
863 other.implicitProducers.store(
nullptr, std::memory_order_relaxed);
866 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
867 other.initialBlockPoolSize = 0;
868 other.initialBlockPool =
nullptr;
891 if (
this == &other) {
909#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
926 return inner_enqueue<CanAlloc>(item);
937 return inner_enqueue<CanAlloc>(std::move(item));
946 return inner_enqueue<CanAlloc>(token, item);
955 return inner_enqueue<CanAlloc>(token, std::move(item));
964 template<
typename It>
968 return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
977 template<
typename It>
980 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
991 return inner_enqueue<CannotAlloc>(item);
1002 return inner_enqueue<CannotAlloc>(std::move(item));
1010 return inner_enqueue<CannotAlloc>(token, item);
1018 return inner_enqueue<CannotAlloc>(token, std::move(item));
1028 template<
typename It>
1032 return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1040 template<
typename It>
1043 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1052 template<
typename U>
1057 size_t nonEmptyCount = 0;
1059 size_t bestSize = 0;
1060 for (
auto ptr =
producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr !=
nullptr; ptr = ptr->next_prod()) {
1063 if (size > bestSize) {
1073 if (nonEmptyCount > 0) {
1077 for (
auto ptr =
producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1078 if (ptr != best && ptr->
dequeue(item)) {
1095 template<
typename U>
1098 for (
auto ptr =
producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1099 if (ptr->dequeue(item)) {
1110 template<
typename U>
1136 if (ptr ==
nullptr) {
1140 if (ptr->dequeue(item)) {
1145 ptr = ptr->next_prod();
1146 if (ptr ==
nullptr) {
1158 template<
typename It>
1162 for (
auto ptr =
producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1163 count += ptr->dequeue_bulk(itemFirst, max - count);
1176 template<
typename It>
1197 if (ptr ==
nullptr) {
1203 if (dequeued != 0) {
1207 if (dequeued == max) {
1211 ptr = ptr->next_prod();
1212 if (ptr ==
nullptr) {
1227 template<
typename U>
1240 template<
typename It>
1256 for (
auto ptr =
producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1257 size += ptr->size_approx();
1294 template<AllocationMode canAlloc,
typename U>
1297 return static_cast<ExplicitProducer*
>(token.
producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1300 template<AllocationMode canAlloc,
typename U>
1304 return producer ==
nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1307 template<AllocationMode canAlloc,
typename It>
1310 return static_cast<ExplicitProducer*
>(token.
producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1313 template<AllocationMode canAlloc,
typename It>
1317 return producer ==
nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1327 auto prodCount =
producerCount.load(std::memory_order_relaxed);
1333 std::uint32_t offset = prodCount - 1 - (token.
initialOffset % prodCount);
1335 for (std::uint32_t i = 0; i != offset; ++i) {
1344 if (delta >= prodCount) {
1345 delta = delta % prodCount;
1347 for (std::uint32_t i = 0; i != delta; ++i) {
1365 template <
typename N>
1377 template<
typename N>
1389#if MCDBGQ_NOLOCKFREE_FREELIST
1390 debug::DebugLock lock(mutex);
1403#if MCDBGQ_NOLOCKFREE_FREELIST
1404 debug::DebugLock lock(mutex);
1406 auto head =
freeListHead.load(std::memory_order_acquire);
1407 while (head !=
nullptr) {
1408 auto prevHead = head;
1409 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1410 if ((refs &
REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1417 auto next = head->freeListNext.load(std::memory_order_relaxed);
1418 if (
freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1424 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1431 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1454 auto head =
freeListHead.load(std::memory_order_relaxed);
1456 node->freeListNext.store(head, std::memory_order_relaxed);
1457 node->freeListRefs.store(1, std::memory_order_release);
1458 if (!
freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1475#if MCDBGQ_NOLOCKFREE_FREELIST
1476 debug::DebugMutex mutex;
1497 template<InnerQueueContext context>
1503 if (!
emptyFlags[i].load(std::memory_order_relaxed)) {
1509 std::atomic_thread_fence(std::memory_order_acquire);
1515 std::atomic_thread_fence(std::memory_order_acquire);
1524 template<InnerQueueContext context>
1543 template<InnerQueueContext context>
1548 std::atomic_thread_fence(std::memory_order_release);
1550 for (
size_t j = 0; j != count; ++j) {
1551 assert(!
emptyFlags[i + j].load(std::memory_order_relaxed));
1552 emptyFlags[i + j].store(
true, std::memory_order_relaxed);
1564 template<InnerQueueContext context>
1570 emptyFlags[i].store(
true, std::memory_order_relaxed);
1579 template<InnerQueueContext context>
1585 emptyFlags[i].store(
false, std::memory_order_relaxed);
1603 static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value,
"The queue does not support super-aligned types at this time");
1626 static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value,
"Internal error: Blocks must be at least as aligned as the type they are wrapping");
1654 template<
typename U>
1665 template<
typename It>
1680 auto tail =
tailIndex.load(std::memory_order_relaxed);
1681 auto head =
headIndex.load(std::memory_order_relaxed);
1701 friend struct MemStats;
1736 Block* halfDequeuedBlock =
nullptr;
1751 block = block->
next;
1752 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1757 if (block == halfDequeuedBlock) {
1764 (*block)[i++]->~T();
1773 auto nextBlock = block->
next;
1774 if (block->dynamicallyAllocated) {
1786 while (header !=
nullptr) {
1788 header->~BlockIndexHeader();
1789 (Traits::free)(header);
1794 template<AllocationMode allocMode,
typename U>
1797 index_t currentTailIndex = this->
tailIndex.load(std::memory_order_relaxed);
1798 index_t newTailIndex = 1 + currentTailIndex;
1803 if (this->
tailBlock !=
nullptr && this->
tailBlock->
next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1806 this->
tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1818 auto head = this->
headIndex.load(std::memory_order_relaxed);
1819 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1820 if (!details::circular_less_than<index_t>(head, currentTailIndex +
BLOCK_SIZE)
1839 auto newBlock = this->
parent->ConcurrentQueue::template requisition_block<allocMode>();
1840 if (newBlock ==
nullptr) {
1844 newBlock->owner =
this;
1846 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1848 newBlock->next = newBlock;
1862 new ((*this->
tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1874 (void)originalBlockIndexSlotsUsed;
1879 entry.base = currentTailIndex;
1885 this->
tailIndex.store(newTailIndex, std::memory_order_release);
1891 new ((*this->
tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1893 this->
tailIndex.store(newTailIndex, std::memory_order_release);
1897 template<
typename U>
1900 auto tail = this->
tailIndex.load(std::memory_order_relaxed);
1902 if (details::circular_less_than<index_t>(this->
dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
1919 std::atomic_thread_fence(std::memory_order_acquire);
1934 tail = this->
tailIndex.load(std::memory_order_acquire);
1935 if ((
details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1946 auto index = this->
headIndex.fetch_add(1, std::memory_order_acq_rel);
1951 auto localBlockIndex =
blockIndex.load(std::memory_order_acquire);
1952 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1957 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1958 auto blockBaseIndex = index & ~static_cast<index_t>(
BLOCK_SIZE - 1);
1959 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(blockBaseIndex - headBase) /
BLOCK_SIZE);
1960 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
1963 auto& el = *((*block)[index]);
1973 (*block)[index]->~T();
1974 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1976 } guard = { block, index };
1978 element = std::move(el);
1981 element = std::move(el);
1983 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1997 template<AllocationMode allocMode,
typename It>
2008 Block* firstAllocatedBlock =
nullptr;
2011 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(
BLOCK_SIZE - 1));
2013 if (blockBaseDiff > 0) {
2015 while (blockBaseDiff > 0 && this->
tailBlock !=
nullptr && this->
tailBlock->
next != firstAllocatedBlock && this->tailBlock->
next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2020 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->
tailBlock : firstAllocatedBlock;
2023 entry.base = currentTailIndex;
2029 while (blockBaseDiff > 0) {
2033 auto head = this->
headIndex.load(std::memory_order_relaxed);
2034 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2041 this->
tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2048 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2052 auto newBlock = this->
parent->ConcurrentQueue::template requisition_block<allocMode>();
2053 if (newBlock ==
nullptr) {
2056 this->
tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2061 newBlock->owner =
this;
2063 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2065 newBlock->next = newBlock;
2072 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->
tailBlock : firstAllocatedBlock;
2077 entry.base = currentTailIndex;
2084 auto block = firstAllocatedBlock;
2086 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2090 block = block->next;
2099 index_t newTailIndex = startTailIndex +
static_cast<index_t>(count);
2100 currentTailIndex = startTailIndex;
2103 assert((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr || count == 0);
2104 if ((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2109 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2110 stopIndex = newTailIndex;
2113 while (currentTailIndex != stopIndex) {
2114 new ((*this->
tailBlock)[currentTailIndex++]) T(*itemFirst++);
2119 while (currentTailIndex != stopIndex) {
2136 auto constructedStopIndex = currentTailIndex;
2137 auto lastBlockEnqueued = this->
tailBlock;
2141 this->
tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2144 auto block = startBlock;
2146 block = firstAllocatedBlock;
2148 currentTailIndex = startTailIndex;
2151 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2152 stopIndex = constructedStopIndex;
2154 while (currentTailIndex != stopIndex) {
2155 (*block)[currentTailIndex++]->~T();
2157 if (block == lastBlockEnqueued) {
2160 block = block->
next;
2168 assert(currentTailIndex == newTailIndex);
2178 this->
tailIndex.store(newTailIndex, std::memory_order_release);
2182 template<
typename It>
2185 auto tail = this->
tailIndex.load(std::memory_order_relaxed);
2187 auto desiredCount =
static_cast<size_t>(tail - (this->
dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2188 if (details::circular_less_than<size_t>(0, desiredCount)) {
2189 desiredCount = desiredCount < max ? desiredCount : max;
2190 std::atomic_thread_fence(std::memory_order_acquire);
2194 tail = this->
tailIndex.load(std::memory_order_acquire);
2195 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2196 if (details::circular_less_than<size_t>(0, actualCount)) {
2197 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2198 if (actualCount < desiredCount) {
2199 this->
dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2204 auto firstIndex = this->
headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2207 auto localBlockIndex =
blockIndex.load(std::memory_order_acquire);
2208 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2210 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2211 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(
BLOCK_SIZE - 1);
2212 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(firstBlockBaseIndex - headBase) /
BLOCK_SIZE);
2213 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2216 auto index = firstIndex;
2218 auto firstIndexInBlock = index;
2220 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
2221 auto block = localBlockIndex->entries[indexIndex].block;
2223 while (index != endIndex) {
2224 auto& el = *((*block)[index]);
2225 *itemFirst++ = std::move(el);
2232 while (index != endIndex) {
2233 auto& el = *((*block)[index]);
2234 *itemFirst = std::move(el);
2245 block = localBlockIndex->entries[indexIndex].block;
2246 while (index != endIndex) {
2247 (*block)[index++]->~T();
2249 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2250 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2252 firstIndexInBlock = index;
2254 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
2255 }
while (index != firstIndex + actualCount);
2260 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2261 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2262 }
while (index != firstIndex + actualCount);
2298 if (newRawPtr ==
nullptr) {
2311 i = (i + 1) & prevBlockSizeMask;
2318 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2319 header->entries = newBlockIndexEntries;
2325 blockIndex.store(header, std::memory_order_release);
2340#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2347 friend struct MemStats;
2373#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2375 if (!this->
inactive.load(std::memory_order_relaxed)) {
2376 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2381 auto tail = this->
tailIndex.load(std::memory_order_relaxed);
2382 auto index = this->
headIndex.load(std::memory_order_relaxed);
2383 Block* block =
nullptr;
2385 bool forceFreeLastBlock = index != tail;
2386 while (index != tail) {
2388 if (block !=
nullptr) {
2396 ((*block)[index])->~T();
2407 auto localBlockIndex =
blockIndex.load(std::memory_order_relaxed);
2408 if (localBlockIndex !=
nullptr) {
2409 for (
size_t i = 0; i != localBlockIndex->capacity; ++i) {
2410 localBlockIndex->index[i]->~BlockIndexEntry();
2413 auto prev = localBlockIndex->prev;
2414 localBlockIndex->~BlockIndexHeader();
2415 (Traits::free)(localBlockIndex);
2416 localBlockIndex = prev;
2417 }
while (localBlockIndex !=
nullptr);
2421 template<AllocationMode allocMode,
typename U>
2424 index_t currentTailIndex = this->
tailIndex.load(std::memory_order_relaxed);
2425 index_t newTailIndex = 1 + currentTailIndex;
2428 auto head = this->
headIndex.load(std::memory_order_relaxed);
2429 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2433#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2434 debug::DebugLock lock(mutex);
2438 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2443 auto newBlock = this->
parent->ConcurrentQueue::template requisition_block<allocMode>();
2444 if (newBlock ==
nullptr) {
2446 idxEntry->
value.store(
nullptr, std::memory_order_relaxed);
2450 newBlock->owner =
this;
2452 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2457 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2461 idxEntry->
value.store(
nullptr, std::memory_order_relaxed);
2468 idxEntry->
value.store(newBlock, std::memory_order_relaxed);
2473 this->
tailIndex.store(newTailIndex, std::memory_order_release);
2479 new ((*this->
tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2481 this->
tailIndex.store(newTailIndex, std::memory_order_release);
2485 template<
typename U>
2491 if (details::circular_less_than<index_t>(this->
dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2492 std::atomic_thread_fence(std::memory_order_acquire);
2495 tail = this->
tailIndex.load(std::memory_order_acquire);
2496 if ((
details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2503 auto block = entry->value.load(std::memory_order_relaxed);
2504 auto& el = *((*block)[index]);
2507#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2510 debug::DebugLock lock(producer->mutex);
2520 (*block)[index]->~T();
2521 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2522 entry->
value.store(
nullptr, std::memory_order_relaxed);
2526 } guard = { block, index, entry, this->
parent };
2528 element = std::move(el);
2531 element = std::move(el);
2534 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2536#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2537 debug::DebugLock lock(mutex);
2540 entry->value.store(
nullptr, std::memory_order_relaxed);
2556 template<AllocationMode allocMode,
typename It>
2570 Block* firstAllocatedBlock =
nullptr;
2574 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(
BLOCK_SIZE - 1));
2576 if (blockBaseDiff > 0) {
2577#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2578 debug::DebugLock lock(mutex);
2587 bool indexInserted =
false;
2588 auto head = this->
headIndex.load(std::memory_order_relaxed);
2589 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2591 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->
parent->ConcurrentQueue::template requisition_block<allocMode>()) ==
nullptr) {
2594 if (indexInserted) {
2596 idxEntry->
value.store(
nullptr, std::memory_order_relaxed);
2598 currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1);
2599 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->
next) {
2602 idxEntry->
value.store(
nullptr, std::memory_order_relaxed);
2612 newBlock->owner =
this;
2614 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2615 newBlock->
next =
nullptr;
2618 idxEntry->
value.store(newBlock, std::memory_order_relaxed);
2622 if ((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr) {
2627 endBlock = newBlock;
2628 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? newBlock : firstAllocatedBlock;
2629 }
while (blockBaseDiff > 0);
2633 index_t newTailIndex = startTailIndex +
static_cast<index_t>(count);
2634 currentTailIndex = startTailIndex;
2636 assert((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr || count == 0);
2637 if ((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2642 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2643 stopIndex = newTailIndex;
2646 while (currentTailIndex != stopIndex) {
2647 new ((*this->
tailBlock)[currentTailIndex++]) T(*itemFirst++);
2652 while (currentTailIndex != stopIndex) {
2659 auto constructedStopIndex = currentTailIndex;
2660 auto lastBlockEnqueued = this->
tailBlock;
2663 auto block = startBlock;
2665 block = firstAllocatedBlock;
2667 currentTailIndex = startTailIndex;
2670 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2671 stopIndex = constructedStopIndex;
2673 while (currentTailIndex != stopIndex) {
2674 (*block)[currentTailIndex++]->~T();
2676 if (block == lastBlockEnqueued) {
2679 block = block->
next;
2683 currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1);
2684 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->
next) {
2687 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2697 assert(currentTailIndex == newTailIndex);
2702 this->
tailIndex.store(newTailIndex, std::memory_order_release);
2706 template<
typename It>
2709 auto tail = this->
tailIndex.load(std::memory_order_relaxed);
2711 auto desiredCount =
static_cast<size_t>(tail - (this->
dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2712 if (details::circular_less_than<size_t>(0, desiredCount)) {
2713 desiredCount = desiredCount < max ? desiredCount : max;
2714 std::atomic_thread_fence(std::memory_order_acquire);
2718 tail = this->
tailIndex.load(std::memory_order_acquire);
2719 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2720 if (details::circular_less_than<size_t>(0, actualCount)) {
2721 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2722 if (actualCount < desiredCount) {
2723 this->
dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2728 auto firstIndex = this->
headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2731 auto index = firstIndex;
2735 auto blockStartIndex = index;
2737 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
2739 auto entry = localBlockIndex->
index[indexIndex];
2740 auto block = entry->
value.load(std::memory_order_relaxed);
2742 while (index != endIndex) {
2743 auto& el = *((*block)[index]);
2744 *itemFirst++ = std::move(el);
2751 while (index != endIndex) {
2752 auto& el = *((*block)[index]);
2753 *itemFirst = std::move(el);
2761 entry = localBlockIndex->
index[indexIndex];
2762 block = entry->
value.load(std::memory_order_relaxed);
2763 while (index != endIndex) {
2764 (*block)[index++]->~T();
2767 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex,
static_cast<size_t>(endIndex - blockStartIndex))) {
2768#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2769 debug::DebugLock lock(mutex);
2771 entry->value.store(
nullptr, std::memory_order_relaxed);
2774 indexIndex = (indexIndex + 1) & (localBlockIndex->
capacity - 1);
2776 blockStartIndex = index;
2778 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
2779 }
while (index != firstIndex + actualCount);
2784 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex,
static_cast<size_t>(endIndex - blockStartIndex))) {
2786#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2787 debug::DebugLock lock(mutex);
2791 entry->value.store(
nullptr, std::memory_order_relaxed);
2795 indexIndex = (indexIndex + 1) & (localBlockIndex->
capacity - 1);
2796 }
while (index != firstIndex + actualCount);
2827 template<AllocationMode allocMode>
2830 auto localBlockIndex =
blockIndex.load(std::memory_order_relaxed);
2831 if (localBlockIndex ==
nullptr) {
2834 auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2835 idxEntry = localBlockIndex->index[newTail];
2837 idxEntry->
value.load(std::memory_order_relaxed) ==
nullptr) {
2839 idxEntry->
key.store(blockStartIndex, std::memory_order_relaxed);
2840 localBlockIndex->tail.store(newTail, std::memory_order_release);
2848 localBlockIndex =
blockIndex.load(std::memory_order_relaxed);
2849 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2850 idxEntry = localBlockIndex->index[newTail];
2852 idxEntry->
key.store(blockStartIndex, std::memory_order_relaxed);
2853 localBlockIndex->tail.store(newTail, std::memory_order_release);
2859 auto localBlockIndex =
blockIndex.load(std::memory_order_relaxed);
2860 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2867 return localBlockIndex->
index[idx];
2872#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2873 debug::DebugLock lock(mutex);
2875 index &= ~static_cast<index_t>(
BLOCK_SIZE - 1);
2876 localBlockIndex =
blockIndex.load(std::memory_order_acquire);
2877 auto tail = localBlockIndex->
tail.load(std::memory_order_acquire);
2878 auto tailBase = localBlockIndex->
index[tail]->
key.load(std::memory_order_relaxed);
2882 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(index - tailBase) /
BLOCK_SIZE);
2883 size_t idx = (tail + offset) & (localBlockIndex->
capacity - 1);
2884 assert(localBlockIndex->
index[idx]->
key.load(std::memory_order_relaxed) == index && localBlockIndex->
index[idx]->
value.load(std::memory_order_relaxed) !=
nullptr);
2890 auto prev =
blockIndex.load(std::memory_order_relaxed);
2891 size_t prevCapacity = prev ==
nullptr ? 0 : prev->capacity;
2893 auto raw =
static_cast<char*
>((Traits::malloc)(
2895 std::alignment_of<BlockIndexEntry>::value - 1 +
sizeof(
BlockIndexEntry) * entryCount +
2897 if (raw ==
nullptr) {
2903 auto index =
reinterpret_cast<BlockIndexEntry**
>(details::align_for<BlockIndexEntry*>(
reinterpret_cast<char*
>(entries) +
sizeof(
BlockIndexEntry) * entryCount));
2904 if (prev !=
nullptr) {
2905 auto prevTail = prev->tail.load(std::memory_order_relaxed);
2906 auto prevPos = prevTail;
2909 prevPos = (prevPos + 1) & (prev->capacity - 1);
2910 index[i++] = prev->index[prevPos];
2911 }
while (prevPos != prevTail);
2912 assert(i == prevCapacity);
2914 for (
size_t i = 0; i != entryCount; ++i) {
2917 index[prevCapacity + i] = entries + i;
2919 header->prev = prev;
2920 header->entries = entries;
2921 header->index = index;
2925 blockIndex.store(header, std::memory_order_release);
2936#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2938 details::ThreadExitListener threadExitListener;
2942#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2948#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2949 mutable debug::DebugMutex mutex;
2952 friend struct MemStats;
2992 block->owner =
nullptr;
2999 while (block !=
nullptr) {
3000 auto next = block->
next;
3012 template<AllocationMode canAlloc>
3016 if (block !=
nullptr) {
3021 if (block !=
nullptr) {
3026 return create<Block>();
3036 size_t allocatedBlocks;
3039 size_t ownedBlocksExplicit;
3040 size_t ownedBlocksImplicit;
3041 size_t implicitProducers;
3042 size_t explicitProducers;
3043 size_t elementsEnqueued;
3044 size_t blockClassBytes;
3045 size_t queueClassBytes;
3046 size_t implicitBlockIndexBytes;
3047 size_t explicitBlockIndexBytes;
3054 MemStats stats = { 0 };
3058 auto block = q->
freeList.head_unsafe();
3059 while (block !=
nullptr) {
3060 ++stats.allocatedBlocks;
3062 block = block->freeListNext.load(std::memory_order_relaxed);
3065 for (
auto ptr = q->
producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3066 bool implicit =
dynamic_cast<ImplicitProducer*
>(ptr) !=
nullptr;
3067 stats.implicitProducers += implicit ? 1 : 0;
3068 stats.explicitProducers += implicit ? 0 : 1;
3071 auto prod =
static_cast<ImplicitProducer*
>(ptr);
3072 stats.queueClassBytes +=
sizeof(ImplicitProducer);
3073 auto head = prod->headIndex.load(std::memory_order_relaxed);
3074 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3075 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3076 if (hash !=
nullptr) {
3077 for (
size_t i = 0; i != hash->capacity; ++i) {
3078 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) !=
nullptr) {
3079 ++stats.allocatedBlocks;
3080 ++stats.ownedBlocksImplicit;
3083 stats.implicitBlockIndexBytes += hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry);
3084 for (; hash !=
nullptr; hash = hash->prev) {
3085 stats.implicitBlockIndexBytes +=
sizeof(
typename ImplicitProducer::BlockIndexHeader) + hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry*);
3088 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3094 auto prod =
static_cast<ExplicitProducer*
>(ptr);
3095 stats.queueClassBytes +=
sizeof(ExplicitProducer);
3096 auto tailBlock = prod->tailBlock;
3097 bool wasNonEmpty =
false;
3098 if (tailBlock !=
nullptr) {
3099 auto block = tailBlock;
3101 ++stats.allocatedBlocks;
3102 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3104 wasNonEmpty = wasNonEmpty || block != tailBlock;
3106 ++stats.ownedBlocksExplicit;
3107 block = block->next;
3108 }
while (block != tailBlock);
3110 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3111 while (index !=
nullptr) {
3112 stats.explicitBlockIndexBytes +=
sizeof(
typename ExplicitProducer::BlockIndexHeader) + index->size *
sizeof(
typename ExplicitProducer::BlockIndexEntry);
3113 index =
static_cast<typename ExplicitProducer::BlockIndexHeader*
>(index->prev);
3119 stats.allocatedBlocks += freeOnInitialPool;
3120 stats.freeBlocks += freeOnInitialPool;
3122 stats.blockClassBytes =
sizeof(Block) * stats.allocatedBlocks;
3123 stats.queueClassBytes +=
sizeof(ConcurrentQueue);
3130 MemStats getMemStats()
3132 return MemStats::getFor(
this);
3135 friend struct MemStats;
3151#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3152 debug::DebugLock lock(implicitProdMutex);
3155 for (
auto ptr =
producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3156 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3157 bool expected =
true;
3158 if (ptr->inactive.compare_exchange_strong(expected,
false, std::memory_order_acquire, std::memory_order_relaxed)) {
3167 return add_producer(isExplicit ?
static_cast<ProducerBase*
>(create<ExplicitProducer>(
this)) : create<ImplicitProducer>(
this));
3173 if (producer ==
nullptr) {
3182 producer->
next = prevTail;
3183 }
while (!
producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3185#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3187 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3189 static_cast<ExplicitProducer*
>(producer)->nextExplicitProducer = prevTailExplicit;
3190 }
while (!explicitProducers.compare_exchange_weak(prevTailExplicit,
static_cast<ExplicitProducer*
>(producer), std::memory_order_release, std::memory_order_relaxed));
3193 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3195 static_cast<ImplicitProducer*
>(producer)->nextImplicitProducer = prevTailImplicit;
3196 }
while (!implicitProducers.compare_exchange_weak(prevTailImplicit,
static_cast<ImplicitProducer*
>(producer), std::memory_order_release, std::memory_order_relaxed));
3208 for (
auto ptr =
producerListTail.load(std::memory_order_relaxed); ptr !=
nullptr; ptr = ptr->next_prod()) {
3220 std::atomic<details::thread_id_t>
key;
3227 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3228 value = other.value;
3239 if (
this != &other) {
3241 std::swap(
value, other.value);
3246 template<
typename XT,
typename XTraits>
3267 hash->prev =
nullptr;
3318#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3319 debug::DebugLock lock(implicitProdMutex);
3326 for (
auto hash = mainHash; hash !=
nullptr; hash = hash->prev) {
3328 auto index = hashedId;
3330 index &= hash->capacity - 1;
3332 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3333 if (probedKey ==
id) {
3339 auto value = hash->entries[index].value;
3340 if (hash != mainHash) {
3343 index &= mainHash->capacity - 1;
3344 probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3346#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3348 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3349 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_acquire, std::memory_order_acquire))) {
3351 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3353 mainHash->entries[index].value = value;
3378 if (newCount >= (mainHash->capacity >> 1)) {
3379 auto newCapacity = mainHash->capacity << 1;
3380 while (newCount >= (newCapacity >> 1)) {
3384 if (raw ==
nullptr) {
3394 for (
size_t i = 0; i != newCapacity; ++i) {
3398 newHash->prev = mainHash;
3411 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3414 if (producer ==
nullptr) {
3422#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3423 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3424 producer->threadExitListener.userData = producer;
3425 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3428 auto index = hashedId;
3430 index &= mainHash->capacity - 1;
3431 auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3434#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3436 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3437 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_acquire, std::memory_order_acquire))) {
3439 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3441 mainHash->entries[index].value = producer;
3456#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3460 details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3463#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3464 debug::DebugLock lock(implicitProdMutex);
3467 assert(hash !=
nullptr);
3474 for (; hash !=
nullptr; hash = hash->prev) {
3475 auto index = hashedId;
3477 index &= hash->capacity - 1;
3478 probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3479 if (probedKey ==
id) {
3488 producer->
inactive.store(
true, std::memory_order_release);
3491 static void implicit_producer_thread_exited_callback(
void* userData)
3494 auto queue = producer->
parent;
3495 queue->implicit_producer_thread_exited(producer);
3503 template<
typename U>
3507 auto p =
static_cast<U*
>((Traits::malloc)(
sizeof(U) * count));
3512 for (
size_t i = 0; i != count; ++i) {
3518 template<
typename U>
3523 for (
size_t i = count; i != 0; ) {
3530 template<
typename U>
3533 auto p = (Traits::malloc)(
sizeof(U));
3534 return p !=
nullptr ?
new (p) U :
nullptr;
3537 template<
typename U,
typename A1>
3540 auto p = (Traits::malloc)(
sizeof(U));
3541 return p !=
nullptr ?
new (p) U(std::forward<A1>(a1)) :
nullptr;
3544 template<
typename U>
3561#if !MCDBGQ_USEDEBUGFREELIST
3564 debug::DebugFreeList<Block>
freeList;
3576#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3577 debug::DebugMutex implicitProdMutex;
3580#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3581 std::atomic<ExplicitProducer*> explicitProducers;
3582 std::atomic<ImplicitProducer*> implicitProducers;
3587template<
typename T,
typename Traits>
3596template<
typename T,
typename Traits>
3598 : producer(reinterpret_cast<
ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true))
3605template<
typename T,
typename Traits>
3607 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3613template<
typename T,
typename Traits>
3615 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3621template<
typename T,
typename Traits>
3637template<
typename T,
typename Traits>
3645#if defined(__GNUC__)
3646#pragma GCC diagnostic pop
#define MOODYCAMEL_NOEXCEPT
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)
#define MOODYCAMEL_THREADLOCAL
#define MOODYCAMEL_DELETE_FUNCTION
#define MOODYCAMEL_CATCH(...)
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr)
#define MOODYCAMEL_RETHROW
bool try_enqueue(producer_token_t const &token, T &&item)
ProducerBase * recycle_or_create_producer(bool isExplicit)
ConcurrentQueue & swap_internal(ConcurrentQueue &other)
bool enqueue(producer_token_t const &token, T const &item)
bool enqueue(producer_token_t const &token, T &&item)
static void destroy(U *p)
void populate_initial_implicit_producer_hash()
bool try_dequeue(U &item)
ConcurrentQueue(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)
static const size_t MAX_SUBQUEUE_SIZE
size_t initialBlockPoolSize
bool update_current_producer_after_rotation(consumer_token_t &token)
ConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ConcurrentQueue & operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
bool inner_enqueue_bulk(It itemFirst, size_t count)
FreeList< Block > freeList
Block * try_get_block_from_initial_pool()
ImplicitProducer * get_or_add_implicit_producer()
std::atomic< std::uint32_t > producerCount
static void destroy_array(U *p, size_t count)
std::array< ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE > initialImplicitProducerHashEntries
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Block * try_get_block_from_free_list()
bool enqueue_bulk(It itemFirst, size_t count)
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
std::atomic< std::uint32_t > globalExplicitConsumerOffset
bool try_dequeue_from_producer(producer_token_t const &producer, U &item)
void add_blocks_to_free_list(Block *block)
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ImplicitProducerHash initialImplicitProducerHash
bool try_dequeue(consumer_token_t &token, U &item)
friend class ConcurrentQueueTests
static U * create(A1 &&a1)
void add_block_to_free_list(Block *block)
static U * create_array(size_t count)
::moodycamel::ProducerToken producer_token_t
void populate_initial_block_list(size_t blockCount)
void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
std::atomic< size_t > implicitProducerHashCount
bool inner_enqueue(U &&element)
size_t try_dequeue_bulk(It itemFirst, size_t max)
std::atomic< ProducerBase * > producerListTail
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
ProducerBase * recycle_or_create_producer(bool isExplicit, bool &recycled)
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
std::atomic< size_t > initialBlockPoolIndex
bool try_enqueue(T const &item)
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
void swap_implicit_producer_hashes(ConcurrentQueue &other)
std::atomic< ImplicitProducerHash * > implicitProducerHash
std::atomic_flag implicitProducerHashResizeInProgress
bool inner_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ConcurrentQueue & operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
::moodycamel::ConsumerToken consumer_token_t
ProducerBase * add_producer(ProducerBase *producer)
bool enqueue(T const &item)
std::atomic< std::uint32_t > nextExplicitConsumerId
static bool is_lock_free()
bool try_enqueue(producer_token_t const &token, T const &item)
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
friend void moodycamel::swap(typename ConcurrentQueue< XT, XTraits >::ImplicitProducerKVP &, typename ConcurrentQueue< XT, XTraits >::ImplicitProducerKVP &) MOODYCAMEL_NOEXCEPT
bool try_dequeue_non_interleaved(U &item)
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
static const size_t BLOCK_SIZE
ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
size_t size_approx() const
bool try_enqueue_bulk(It itemFirst, size_t count)
bool try_enqueue(T &&item)
bool inner_enqueue(producer_token_t const &token, U &&element)
Block * requisition_block()
static const thread_id_t invalid_thread_id2
static bool unlikely(bool x)
static bool circular_less_than(T a, T b)
static void swap_relaxed(std::atomic< T > &left, std::atomic< T > &right)
std::max_align_t std_max_align_t
static auto deref_noexcept(It &it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
static const thread_id_t invalid_thread_id
static size_t hash_thread_id(thread_id_t id)
static thread_id_t thread_id()
static T const & nomove(T const &x)
static bool likely(bool x)
static char * align_for(char *ptr)
std::uintptr_t thread_id_t
static T ceil_to_pow_2(T x)
void swap(typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &a, typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
static void * malloc(size_t size)
static void free(void *ptr)
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
static const size_t BLOCK_SIZE
static const size_t MAX_SUBQUEUE_SIZE
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
bool set_empty(index_t i)
bool set_many_empty(index_t i, size_t count)
char elements[sizeof(T) *BLOCK_SIZE]
std::atomic< Block * > freeListNext
std::atomic< bool > shouldBeOnFreeList
details::max_align_t dummy
std::atomic< std::uint32_t > freeListRefs
std::atomic< bool > emptyFlags[BLOCK_SIZE<=EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE :1]
std::atomic< size_t > elementsCompletelyDequeued
bool dynamicallyAllocated
T * operator[](index_t idx) MOODYCAMEL_NOEXCEPT
T const * operator[](index_t idx) const MOODYCAMEL_NOEXCEPT
std::atomic< BlockIndexHeader * > blockIndex
size_t dequeue_bulk(It &itemFirst, size_t max)
size_t pr_blockIndexFront
BlockIndexEntry * pr_blockIndexEntries
bool new_block_index(size_t numberOfFilledSlotsToExpose)
size_t pr_blockIndexSlotsUsed
bool enqueue_bulk(It itemFirst, size_t count)
ExplicitProducer(ConcurrentQueue *parent)
bool enqueue(U &&element)
std::atomic< std::uint32_t > freeListRefs
std::atomic< N * > freeListNext
void add_knowing_refcount_is_zero(N *node)
void swap(FreeList &other)
static const std::uint32_t SHOULD_BE_ON_FREELIST
FreeList(FreeList &&other)
static const std::uint32_t REFS_MASK
std::atomic< N * > freeListHead
FreeList(FreeList const &) MOODYCAMEL_DELETE_FUNCTION
FreeList & operator=(FreeList const &) MOODYCAMEL_DELETE_FUNCTION
ImplicitProducerHash * prev
ImplicitProducerKVP * entries
std::atomic< details::thread_id_t > key
ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT
void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT
ImplicitProducerKVP & operator=(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT
std::atomic< index_t > key
std::atomic< Block * > value
size_t nextBlockIndexCapacity
bool enqueue_bulk(It itemFirst, size_t count)
size_t dequeue_bulk(It &itemFirst, size_t max)
bool enqueue(U &&element)
ImplicitProducer(ConcurrentQueue *parent)
bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex)
void rewind_block_index_tail()
BlockIndexEntry * get_block_index_entry_for_index(index_t index) const
size_t get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex) const
static const index_t INVALID_BLOCK_BASE
std::atomic< BlockIndexHeader * > blockIndex
ProducerBase * next_prod() const
std::atomic< index_t > dequeueOptimisticCount
size_t dequeue_bulk(It &itemFirst, size_t max)
ProducerBase(ConcurrentQueue *parent_, bool isExplicit_)
std::atomic< index_t > dequeueOvercommit
size_t size_approx() const
std::atomic< index_t > tailIndex
std::atomic< index_t > headIndex
std::uint32_t itemsConsumedFromCurrent
ConsumerToken(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
ConsumerToken(ConcurrentQueue< T, Traits > &q)
ConsumerToken & operator=(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
friend class ConcurrentQueueTests
details::ConcurrentQueueProducerTypelessBase * desiredProducer
std::uint32_t lastKnownGlobalOffset
ConsumerToken & operator=(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
details::ConcurrentQueueProducerTypelessBase * currentProducer
ConsumerToken(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
std::uint32_t initialOffset
void swap(ConsumerToken &other) MOODYCAMEL_NOEXCEPT
friend class ConcurrentQueueTests
ProducerToken & operator=(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
ProducerToken(ConcurrentQueue< T, Traits > &queue)
details::ConcurrentQueueProducerTypelessBase * producer
ProducerToken(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
void swap(ProducerToken &other) MOODYCAMEL_NOEXCEPT
ProducerToken(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
ProducerToken & operator=(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
std::atomic< bool > inactive
ConcurrentQueueProducerTypelessBase * next
ConcurrentQueueProducerTypelessBase()
static std::uint64_t hash(std::uint64_t h)
static std::uint32_t hash(std::uint32_t h)
static auto eval(U &&x) -> decltype(std::forward< U >(x))
static T const & eval(T const &x)
thread_id_t thread_id_numeric_size_t
thread_id_t thread_id_hash_t
static thread_id_hash_t prehash(thread_id_t const &x)