CARLA
 
载入中...
搜索中...
未找到
ConcurrentQueue.h
浏览该文件的文档.
1// 提供多生产者、多消费者无锁队列的 C++ 11实现。
2// 这里提供了一个概述,包括基准测试结果:
3// http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
4// 完整的设计也有详细的描述:
5// http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
6
7//简化的 BSD 许可证:
8// 版权所有 (c) 2013-2016,Cameron Desrochers。
9// 保留所有权利。
10//
11// 允许源代码和二进制形式的再分发和使用,无论是否经过修改,只要满足以下条件:
12//
13// 源代码的再分发必须保留上述版权声明、条件列表和免责声明。
14// 二进制形式的再分发必须在文档和/或其他材料中再现上述版权声明、条件列表和免责声明。
15//
16//该软件由版权持有者和贡献者“按原样”提供,没有任何明示或暗示的担保,包括但不限于适销性和特定用途适用性的暗示担保
17//在任何情况下,版权持有者或贡献者不对因使用该软件而产生的任何直接、间接、附带、特别、惩罚性或结果性的损害负责,
18//包括但不限于采购替代商品或服务、使用、数据或利润的损失或业务中断,是否基于合同、严格责任或侵权(包括疏忽或其他)理论,即使已经被告知可能发生这样的损害。
19// 注意:这个文件为了被 CARLA 使用做了略微的修改。
20
21#pragma once//程序预处理
22
23#if defined(__GNUC__)
24// 禁用 -Wconversion 警告(当 Traits::size_t 和 Traits::index_t 设置为小于 32 位时,整数提升可能引发这些警告
25// 在赋值计算值时会出现警告)
26#pragma GCC diagnostic push//保存当前的编译器诊断设置。
27#pragma GCC diagnostic ignored "-Wconversion"//用于忽略特定的警告或错误。
28
29#ifdef MCDBGQ_USE_RELACY
30#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"//
31#endif
32#endif
33
34#if defined(__APPLE__)
35#include "TargetConditionals.h"
36#endif
37
38#ifdef MCDBGQ_USE_RELACY
39#include "relacy/relacy_std.hpp"
40#include "relacy_shims.h"
41// 我们只使用 malloc/free,因此 delete 宏会干扰 `= delete` 方法声明。
42// 我们将自己覆盖默认的 trait malloc,而不使用宏。
43#undef new
44#undef delete
45#undef malloc
46#undef free
47#else
48#include <atomic> // Requires C++11. Sorry VS2010.
49#include <cassert>
50#endif
51#include <cstddef> // for max_align_t
52#include <cstdint>
53#include <cstdlib>
54#include <type_traits>
55#include <algorithm>
56#include <utility>
57#include <limits>
58#include <climits> // for CHAR_BIT
59#include <array>
60#include <thread> // 部分用于 __WINPTHREADS_VERSION如果在 MinGW-w64 上带有 POSIX 线程
61
62// 平台特定的数字线程 ID 类型和无效值定义
63namespace moodycamel { namespace details {
64// 模板结构体 thread_id_converter,用于将线程 ID 转换为数值类型和哈希值
65 template<typename thread_id_t> struct thread_id_converter {
68 static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
69 };
70} }
71#if defined(MCDBGQ_USE_RELACY)
72namespace moodycamel { namespace details {
73 typedef std::uint32_t thread_id_t;
74 static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
75 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
76 static inline thread_id_t thread_id() { return rl::thread_index(); }
77} }
78#elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
79// 在头文件中引入 windows.h 没有意义,我们将手动声明所用的函数
80// 并依赖向后兼容性确保这不会破坏
81extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
82namespace moodycamel { namespace details {
83 static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
84 typedef std::uint32_t thread_id_t;
85 static const thread_id_t invalid_thread_id = 0; //查看 http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
86 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // 在技术上不能保证无效,但在实践中从未使用过。请注意,所有 Win32 线程 ID 目前都是 4 的倍数。
87 static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
88} }
89#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
90namespace moodycamel { namespace details {
91 static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
92
93 typedef std::thread::id thread_id_t;
94 static const thread_id_t invalid_thread_id; // 默认 ctor 创建无效 ID
95
96 // 请注意,我们不定义 invalid_thread_id2,因为 std::thread::id 没有无效值;它
97 // 仅在 MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 定义时才会使用,但实际上不会定义它。
98 static inline thread_id_t thread_id() { return std::this_thread::get_id(); }// 定义一个内联函数thread_id(),用于返回当前线程的ID。
99// std::this_thread::get_id() 是C++标准库中获取当前线程ID的函数。
100
101 template<std::size_t> struct thread_id_size { };// 定义一个模板结构体thread_id_size,它接受一个std::size_t类型的模板参数,
102// 这里先进行一个通用的模板声明,后续会特化该模板来针对不同大小情况进行处理。
103 template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; };// 针对模板参数为4的情况特化thread_id_size结构体,
104// 表示当相关类型大小为4字节时,定义其内部的numeric_t类型别名为std::uint32_t。
105 template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; };// 针对模板参数为8的情况特化thread_id_size结构体,
106// 表示当相关类型大小为8字节时,定义其内部的numeric_t类型别名为std::uint64_t。
107
108 template<> struct thread_id_converter<thread_id_t> {// 特化thread_id_converter模板结构体,针对thread_id_t类型进行特化处理。
109// 这个结构体大概率是用于对线程ID进行一些转换相关的操作。
110 typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;// 根据thread_id_t类型的大小(通过sizeof获取),
111 // 从thread_id_size特化结构体中获取对应的numeric_t类型,
112 // 并将其重命名为thread_id_numeric_size_t,用于后续操作。
113#ifndef __APPLE__// 在非苹果系统下,定义thread_id_hash_t类型别名为std::size_t,
114 // 可能用于后续哈希相关计算中表示哈希值的类型。
115 typedef std::size_t thread_id_hash_t;// 定义一个静态函数prehash,用于对传入的线程ID(thread_id_t类型的x)进行预处理(可能是哈希相关的前置计算)。
116#else
117 typedef thread_id_numeric_size_t thread_id_hash_t;// 在苹果系统下,定义thread_id_hash_t类型与thread_id_numeric_size_t类型相同,
118 // 说明在苹果系统下哈希值相关类型的处理与其他情况有所不同。
119#endif
120
121 static thread_id_hash_t prehash(thread_id_t const& x)// 定义一个静态函数prehash,用于对传入的线程ID(thread_id_t类型的x)进行预处理(可能是哈希相关的前置计算)。
122 {
123#ifndef __APPLE__
124 return std::hash<std::thread::id>()(x);// 在非苹果系统下,使用C++标准库中的std::hash对线程ID进行哈希计算,
125 // std::hash<std::thread::id>()(x)会返回对应的哈希值,其类型由前面定义的thread_id_hash_t决定(非苹果下为std::size_t)。
126#else
127 return *reinterpret_cast<thread_id_hash_t const*>(&x);// 在苹果系统下,通过将线程ID的地址进行重新解释转换(reinterpret_cast)为thread_id_hash_t类型的指针,
128 // 然后取其指向的值作为哈希值返回,这种方式是针对苹果系统特有的对线程ID生成哈希值的处理逻辑。
129#endif
130 }
131 };
132} }
133#else
134// 使用这个答案中的巧妙方法:http://stackoverflow.com/a/8438730/21475
135// 为了以平台无关的方式获取数字线程 ID,我们使用线程局部静态变量的地址作为线程标识符 :-) :-)
136#if defined(__GNUC__) || defined(__INTEL_COMPILER)
137#define MOODYCAMEL_THREADLOCAL __thread
138#elif defined(_MSC_VER)
139#define MOODYCAMEL_THREADLOCAL __declspec(thread)
140#else
141//假设编译器符合 C++11 标准
142#define MOODYCAMEL_THREADLOCAL thread_local
143#endif
144namespace moodycamel { namespace details {
145 typedef std::uintptr_t thread_id_t;
146 static const thread_id_t invalid_thread_id = 0; //地址不能为 nullptr
147 static const thread_id_t invalid_thread_id2 = 1; // 对 null 指针的成员访问通常也是无效的。另外,它没有对齐。
148 static inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }
149} }
150#endif
151
152//异常
153#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
154#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
155#define MOODYCAMEL_EXCEPTIONS_ENABLED
156#endif
157#endif
158
159// ~~~ @begin 为 CARLA 所做的修改 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
160
161#include <carla/Exception.h>
162
163#if (defined(LIBCARLA_NO_EXCEPTIONS) && defined(MOODYCAMEL_EXCEPTIONS_ENABLED))
164# undef MOODYCAMEL_EXCEPTIONS_ENABLED// 如果定义了LIBCARLA_NO_EXCEPTIONS并且同时定义了MOODYCAMEL_EXCEPTIONS_ENABLED,
165// 那么取消定义MOODYCAMEL_EXCEPTIONS_ENABLED,意味着在这种特定配置下不启用相关异常功能
166#endif
167
168#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED// 根据是否定义了MOODYCAMEL_EXCEPTIONS_ENABLED来进行不同宏定义,以适配不同的异常处理情况
169#define MOODYCAMEL_TRY try// 当MOODYCAMEL_EXCEPTIONS_ENABLED被定义时,宏MOODYCAMEL_TRY定义为C++中的try关键字,
170 // 用于开始一个异常处理块,尝试执行可能抛出异常的代码
171#define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)// 宏MOODYCAMEL_CATCH用于定义catch块,接收参数(__VA_ARGS__表示可变参数),用于捕获特定类型的异常
172#define MOODYCAMEL_RETHROW throw// 宏MOODYCAMEL_RETHROW定义为C++中的throw关键字,用于重新抛出当前捕获的异常
173#define MOODYCAMEL_THROW(expr) ::carla::throw_exception(expr)// 宏MOODYCAMEL_THROW用于抛出一个异常,这里调用了carla命名空间下的throw_exception函数来抛出指定表达式的异常
174#else
175#define MOODYCAMEL_TRY if (true)// 当MOODYCAMEL_EXCEPTIONS_ENABLED未被定义时,宏MOODYCAMEL_TRY定义为一个恒为真的if语句,
176 // 实际上相当于去掉了try-catch这种异常处理机制的结构,可能是为了在不支持异常的环境下做一种模拟处理
177#define MOODYCAMEL_CATCH(...) else if (false)// 宏MOODYCAMEL_CATCH定义为一个恒为假的else if语句,意味着不会真正进入这个“catch”块,
178 // 同样是在不支持异常的情况下的一种模拟定义方式
179#define MOODYCAMEL_RETHROW// 宏MOODYCAMEL_RETHROW为空定义,因为在不支持异常的环境下不存在重新抛出异常的操作
180#define MOODYCAMEL_THROW(expr) ::carla::throw_exception(expr)// 宏MOODYCAMEL_THROW同样是调用carla命名空间下的throw_exception函数来抛出异常,
181 // 即使在不支持常规异常处理的环境下,可能也通过这种方式来进行错误报告等相关处理
182#endif
183
184// ~~~ @end 为 CARLA 所做的修改 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
185
186#ifndef MOODYCAMEL_NOEXCEPT
187#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
188#define MOODYCAMEL_NOEXCEPT
189#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
190#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
191#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
192// VS2012 的 std::is_nothrow_[move_]constructible 存在问题,返回 true 时不应如此 :-(
193// 我们必须假设 VS2012 上的所有非平凡构造函数可能会抛出异常!
194#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
195#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
196#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
197#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
198#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
199#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
200#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
201#else
202#define MOODYCAMEL_NOEXCEPT noexcept
203#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
204#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
205#endif
206#endif
207
208#ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
209#ifdef MCDBGQ_USE_RELACY
210#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
211#else
212// VS2013 不支持 `thread_local`,而 MinGW-w64 与 POSIX 线程的组合存在一个严重的 bug: http://sourceforge.net/p/mingw-w64/bugs/445
213// g++ 版本 <=4.7 也不支持 `thread_local`。
214// 最后,iOS/ARM 不支持 `thread_local`,虽然 g++/ARM 允许编译,但尚未确认是否实际有效
215#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
216// 假设所有其他 C++11 编译器/平台都完全支持 `thread_local`
217//#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // 由于多个用户报告存在问题,因此目前总是禁用
218#endif
219#endif
220#endif
221
222// VS2012 不支持已删除的函数。
223// 在这种情况下,我们正常声明函数但不定义它。如果调用该函数,会生成链接错误。
224#ifndef MOODYCAMEL_DELETE_FUNCTION
225#if defined(_MSC_VER) && _MSC_VER < 1800
226#define MOODYCAMEL_DELETE_FUNCTION
227#else
228#define MOODYCAMEL_DELETE_FUNCTION = delete
229#endif
230#endif
231
232// 编译器特定的 likely/unlikely 提示
233namespace moodycamel { namespace details {// 定义在moodycamel命名空间下的details子命名空间,通常这样的嵌套命名空间用于对相关功能模块进行更细致的组织和隔离。
234#if defined(__GNUC__)// 针对GNU编译器(__GNUC__宏在使用GNU编译器时被定义)进行条件编译。
235 static inline bool (likely)(bool x) { return __builtin_expect((x), true); }// __builtin_expect是GNU C/C++ 编译器提供的一个内建函数,用于向编译器提供分支预测的提示信息,
236// 帮助编译器优化代码执行顺序,提高性能。
237// likely函数表示期望传入的布尔值参数大概率为真,通过__builtin_expect向编译器传达这个预期。
238 static inline bool (unlikely)(bool x) { return __builtin_expect((x), false); }// unlikely函数表示期望传入的布尔值参数大概率为假,同样通过__builtin_expect向编译器传达这个预期,
239 // 以便编译器在生成代码时能基于这种概率情况进行优化。
240#else
241 static inline bool (likely)(bool x) { return x; }// 如果不是GNU编译器环境,则以下两个函数只是简单地返回传入的布尔值。
242 static inline bool (unlikely)(bool x) { return x; }// 因为没有对应的编译器内建机制来提供分支预测提示了。
243#endif
244} }
245
246#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
247#include "internal/concurrentqueue_internal_debug.h"// 如果定义了MOODYCAMEL_QUEUE_INTERNAL_DEBUG宏,就包含名为"internal/concurrentqueue_internal_debug.h"的头文件,
248// 这通常意味着在开启了特定内部调试功能时,引入相关的调试代码实现。
249#endif
250
251namespace moodycamel {
252namespace details {// 再次进入moodycamel命名空间下的details子命名空间,继续定义相关的模板结构体等内容
253 template<typename T>
254 struct const_numeric_max {// 定义一个模板结构体const_numeric_max,用于获取特定整数类型的最大值,
255 // 这个结构体是基于模板的,意味着可以针对不同的整数类型来获取其对应的最大值。
256 static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");// 使用静态断言(static_assert)来确保模板参数T必须是整数类型(std::is_integral<T>::value为真时才合法),
257 // 如果传入的不是整数类型,编译时就会报错并显示后面的提示信息。
258 static const T value = std::numeric_limits<T>::is_signed// 通过条件表达式来计算并定义静态常量value,用于表示类型T对应的数值最大值。
259 // 如果类型T是有符号整数类型(std::numeric_limits<T>::is_signed为真)。
260 ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
261 : static_cast<T>(-1);// 则通过位运算和类型转换来计算其最大值(先将1左移到符号位前面的最高位,再减1得到有符号整数的最大值)。
262 // 如果类型T是无符号整数类型(即std::numeric_limits<T>::is_signed为假),则直接将 -1 转换为该无符号类型,
263 // 对于无符号整数来说, -1 转换后的结果就是其所能表示的最大值。
264 };
265
266#if defined(__GLIBCXX__)
267 typedef ::max_align_t std_max_align_t; // libstdc++ 一段时间内忘记将其添加到 std:: 中
268#else
269 typedef std::max_align_t std_max_align_t; // 其他编译器(例如 MSVC)坚持认为它只能通过 std:: 访问
270
271
272 // 一些平台错误地将 max_align_t 设置为一个对齐小于 8 字节的类型,即便它支持 8 字节对齐的标量值(*咳* 32 位 iOS)。
273 //用我们自己的联合体解决这个问题。参见问题 #64
274 typedef union {
276 long long y;
277 void* z;
278 } max_align_t;
279}
280
281 // ConcurrentQueue 的默认特性。
282 // 要改变一些特性而无需重新实现所有特性,可以从这个结构体继承并覆盖你希望不同的声明;
283 // 由于这些特性作为模板类型参数使用,覆盖的声明将在定义的地方使用,其他地方则使用默认值。
285{
286 // 通用大小类型。强烈推荐使用 std::size_t。
287 typedef std::size_t size_t;
288
289 // 用于入队和出队索引的类型。必须至少与 size_t 一样大。
290 // 应该比你预期一次性容纳的元素数量大得多,特别是当你有高周转率时;
291 // 例如,在 32 位 x86 上,如果你预期有超过一亿个元素或在非常短的时间内处理几百万个元素,
292 // 使用 32 位类型 *可能* 会触发竞争条件。在这种情况下,推荐使用 64 位整数类型,
293 // 实际上将防止竞争条件,无论队列的使用情况如何。
294 // 请注意,队列是否在使用 64 位整数类型时无锁,取决于 std::atomic<std::uint64_t> 是否无锁,这具有平台特性。
295
296 typedef std::size_t index_t;
297
298 // 内部所有元素都从多元素块中入队和出队;这是最小的可控单位。
299 // 如果你预计元素较少但生产者较多,应选择较小的块大小。
300 // 对于生产者较少和/或元素较多的情况,建议选择较大的块大小。
301 // 提供了一个合理的默认值。块大小必须是 2 的幂。
302
303 static const size_t BLOCK_SIZE = 32;
304
305 // 对于显式生产者(即使用生产者令牌时),通过迭代每个元素的标志列表来检查块是否为空。
306 // 对于较大的块大小,这种方法效率过低,改为基于原子计数器的方法更快。
307 // 当块大小严格大于此阈值时,会切换到这种方法。
308
310
311 // 单个显式生产者可以预期多少个完整块?这个值应反映该数量的最大值以获得最佳性能。
312 // 必须是 2 的幂。
313 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
314
315 // 单个隐式 producer 可以预期有多少个完整块?这应该
316 // 反映该数字的最大值以获得最佳性能。必须是 2 的幂。
317 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
318
319 // 线程 ID 到隐式生产者的哈希表的初始大小。
320 // 注意,每当哈希表填充到一半时,会进行调整。
321 // 必须是 2 的幂,并且为 0 或至少为 1。如果为 0,则禁用隐式生产(使用不带显式生产者令牌的入队方法)。
322
323 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
324
325 // 控制显式消费者(即带令牌的消费者)在导致所有消费者旋转并转到下一个内部队列之前
326 // 必须消费的项目数量。
328
329 // 子队列中最多可以排队的元素数量(包括)。如果入队操作会超过此限制,则操作将失败。
330 // 请注意,这个限制在块级别强制执行(为了性能原因),即它会被四舍五入到最接近的块大小。
332
333#ifndef MCDBGQ_USE_RELACY
334 // 如果需要,可以自定义内存分配。
335 // malloc 应该在失败时返回 nullptr,并处理对齐问题,就像 std::malloc 一样。
336#if defined(malloc) || defined(free)
337 // 哎,这已经是 2015 年了,停止定义破坏标准代码的宏吧!
338 // 解决 malloc/free 是特殊宏的问题:
339 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
340 static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
341 static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
342 static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
343#endif
344#endif
345
347
348
349#ifndef MCDBGQ_USE_RELACY
350 // 如果需要,可以自定义内存分配。
351 // malloc 应在失败时返回 nullptr,并像 std::malloc 一样处理对齐。
352#if defined(malloc) || defined(free)
353 // 噢,现在是 2015 年,别再定义违反标准代码的宏了!
354 // 解决 malloc/free 作为特殊宏的问题:
355 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
356 static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
357 static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
358 static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
359#else
360 static inline void* malloc(size_t size) { return std::malloc(size); }
361 static inline void free(void* ptr) { return std::free(ptr); }
362#endif
363#else
364 // 在使用 Relacy 竞态检测器运行时的调试版本(在用户代码中忽略这些)
365
366 static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
367 static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
368#endif
369};
370
371
372// 当生产或消费大量元素时,最有效的方法是:
373// 1) 使用队列的批量操作方法,并附带一个 token
374// 2) 如果不能使用 token,使用没有 token 的批量操作方法
375// 3) 如果仍然无法使用,创建一个 token,并使用它来调用单项方法
376// 4) 如果以上方法都不可用,使用队列的单参数方法
377// 需要注意的是,不要随意创建 tokens —— 理想情况下,每个线程(每种类型)应该最多只有一个 token。
378
379struct ProducerToken;
380struct ConsumerToken;// 前置声明两个结构体,通常用于表示生产者和消费者相关的某种标识或对象,具体定义可能在其他地方
381
382template<typename T, typename Traits> class ConcurrentQueue;// 声明一个模板类 ConcurrentQueue,用于实现并发队列,具体功能依赖于模板参数 T(队列中存储的数据类型)和 Traits(可能是一些特性相关的模板参数)
383template<typename T, typename Traits> class BlockingConcurrentQueue;// 声明一个模板类 BlockingConcurrentQueue,可能是基于 ConcurrentQueue 实现的具有阻塞特性的并发队列,同样依赖特定模板参数
384class ConcurrentQueueTests;// 声明一个类 ConcurrentQueueTests,推测是用于对并发队列相关功能进行测试的类,具体实现应该在别处定义
385
386
387namespace details// 定义名为 details 的命名空间,通常用于放置一些内部实现细节相关的代码,对外部隐藏这些具体实现逻辑
388{
389 struct ConcurrentQueueProducerTypelessBase// 定义 ConcurrentQueueProducerTypelessBase 结构体,它可能作为某种基础结构用于并发队列中生产者相关的逻辑实现
390 {
391 ConcurrentQueueProducerTypelessBase* next;// 指向下一个相关结构的指针,用于构建某种链表结构或者关联结构
392 std::atomic<bool> inactive;// 原子类型的布尔变量,用于标记是否处于非活动状态,可用于并发环境下安全地判断生产者的状态
393 ProducerToken* token;// 指向 ProducerToken 类型的指针,可能用于标识特定的生产者或者关联生产者相关的其他信息
394
395 ConcurrentQueueProducerTypelessBase()// 结构体的默认构造函数,初始化各个成员变量
396 : next(nullptr), inactive(false), token(nullptr)
397 {
398 }
399 };
400
401 template<bool use32> struct _hash_32_or_64 {// 定义一个模板结构体 _hash_32_or_64,根据模板参数 use32 的值来特化不同的哈希函数逻辑
402 static inline std::uint32_t hash(std::uint32_t h)// 当 use32 为 true 时(处理 32 位情况)的静态内联函数,实现对 32 位整数的哈希计算
403 {
404
405 // MurmurHash3 完成器 -- 参见 https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
406 // 由于线程 ID 已经是唯一的,我们真正要做的只是将这种唯一性均匀地传播到所有位上,
407 // 这样我们可以使用这些位的子集,同时显著减少碰撞。
408 h ^= h >> 16;
409 h *= 0x85ebca6b;
410 h ^= h >> 13;
411 h *= 0xc2b2ae35;
412 return h ^ (h >> 16);
413 }
414 };
415 template<> struct _hash_32_or_64<1> {// 对 _hash_32_or_64 结构体的特化,当 use32 为 1(即 true)时的情况,处理 64 位整数的哈希计算
416 static inline std::uint64_t hash(std::uint64_t h)// 实现对 64 位整数的哈希计算函数
417 {
418 h ^= h >> 33;
419 h *= 0xff51afd7ed558ccd;
420 h ^= h >> 33;
421 h *= 0xc4ceb9fe1a85ec53;
422 return h ^ (h >> 33);
423 }
424 };
425 template<std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { };// 定义一个根据 size 大小来继承不同 _hash_32_or_64 特化版本的模板结构体,用于选择合适的哈希计算方式(32 位或 64 位)
426
427 static inline size_t hash_thread_id(thread_id_t id)// 定义一个内联函数,用于对线程 ID 进行哈希计算
428 {
429 static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");// 静态断言确保线程 ID 的大小在期望范围内(最多 64 位),这是基于当前平台假设的约束条件
430 return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(// 通过调用 hash_32_or_64 结构体的哈希函数(根据线程 ID 转换后的哈希类型大小选择合适的特化版本)来计算线程 ID 的哈希值,
431 // 并将结果转换为 size_t 类型返回
433 }
434
435 template<typename T>// 定义一个模板函数,用于比较两个无符号整数类型的值,判断是否满足一种循环意义下的小于关系
436 static inline bool circular_less_than(T a, T b)
437 {
438#ifdef _MSC_VER
439#pragma warning(push)
440#pragma warning(disable: 4554)
441#endif
442 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");// 静态断言确保传入的类型是无符号整数类型,因为此函数是专门为这种类型设计用于特定的比较逻辑
443 return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1));// 比较逻辑:通过计算差值并判断是否大于无符号整数类型所能表示的最大值的一半来确定循环意义下的小于关系,
444 // 常用于循环队列等场景中判断元素位置的先后关系等情况
445#ifdef _MSC_VER
446#pragma warning(pop)
447#endif
448 }
449
450 template<typename U>// 定义一个模板函数,用于对指针进行对齐操作,使其按照指定类型 U 的对齐要求对齐
451 static inline char* align_for(char* ptr)
452 {
453 const std::size_t alignment = std::alignment_of<U>::value;// 获取类型 U 的对齐要求(字节数)
454 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;// 通过计算偏移量来调整指针,使其满足对齐要求并返回对齐后的指针
455 }
456
457 template<typename T>// 定义一个模板函数,用于将输入的无符号整数向上取整到最近的 2 的幂次方数
458 static inline T ceil_to_pow_2(T x)
459 {
460 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
461
462 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
463 --x;
464 x |= x >> 1;
465 x |= x >> 2;
466 x |= x >> 4;
467 for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
468 x |= x >> (i << 3);
469 }
470 ++x;
471 return x;
472 }
473
474 template<typename T>// 定义一个模板函数,用于在宽松内存顺序下交换两个原子类型的值,实现原子交换操作
475 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
476 {
477 T temp = std::move(left.load(std::memory_order_relaxed));// 先将左边原子变量的值读取到临时变量中(宽松内存顺序)
478 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);// 将右边原子变量的值存储到左边原子变量中(宽松内存顺序)
479 right.store(std::move(temp), std::memory_order_relaxed);// 将临时变量中保存的原来左边原子变量的值存储到右边原子变量中(宽松内存顺序)
480 }
481
482 template<typename T>// 定义一个模板函数,返回传入的常量引用本身,通常用于避免不必要的移动语义(比如在某些需要保持对象不可移动的场景中)
483 static inline T const& nomove(T const& x)
484 {
485 return x;
486 }
487
488 template<bool Enable>// 定义一个模板结构体 nomove_if,根据模板参数 Enable 的值来决定返回传入参数的不同方式(主要涉及移动语义相关处理)
490 {
491 template<typename T>// 当 Enable 为 true 时,模板函数 eval 返回传入的常量引用本身,类似于 nomove 函数的功能
492 static inline T const& eval(T const& x)
493 {
494 return x;
495 }
496 };
497
498 template<>
499 struct nomove_if<false>// 对 nomove_if 结构体的特化,当 Enable 为 false 时的情况,模板函数 eval 会根据传入参数的类型使用完美转发来返回相应的值,
500 // 可能在需要根据不同条件启用或禁用移动语义的场景中使用
501 {
502 template<typename U>
503 static inline auto eval(U&& x)
504 -> decltype(std::forward<U>(x))
505 {
506 return std::forward<U>(x);
507 }
508 };
509
510 template<typename It>// 定义一个模板函数,用于对迭代器进行解引用操作,并标记为 noexcept(表明此操作不会抛出异常),
511 // 返回迭代器所指向元素的引用,方便在一些要求不抛异常的上下文中使用迭代器解引用功能
512 static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
513 {
514 return *it;
515 }
516
517#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)// 根据不同的编译器版本来定义结构体 is_trivially_destructible,用于判断类型 T 是否具有平凡析构函数
518 // 在较新的编译器(__GNUC__ > 4 或者特定版本及以上)按照 C++11 标准的方式定义,否则按照旧的方式(通过判断是否有平凡析构器)定义
519 template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
520#else
521 template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
522#endif
523
524#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED// 根据是否支持 C++11 线程局部存储特性以及是否使用 Relacy 库来定义不同的线程退出监听器相关类型
525#ifdef MCDBGQ_USE_RELACY
526 typedef RelacyThreadExitListener ThreadExitListener;
527 typedef RelacyThreadExitNotifier ThreadExitNotifier;
528#else
529 struct ThreadExitListener
530 {
531 typedef void (*callback_t)(void*);// 定义一个函数指针类型,用于指向回调函数,回调函数接收一个 void* 类型的参数
532 callback_t callback;// 保存回调函数的指针
533 void* userData;// 保存传递给回调函数的用户数据指针
534
535 ThreadExitListener* next; // 保留供 ThreadExitNotifier 使用
536
537 };
538
539
540 class ThreadExitNotifier
541 {
542 public:
543
544 // 将监听器添加到订阅者列表中
545 static void subscribe(ThreadExitListener* listener)
546 {
547 auto& tlsInst = instance();
548 listener->next = tlsInst.tail;
549 tlsInst.tail = listener;
550 }
551
552 // 从订阅者列表中移除监听器
553 static void unsubscribe(ThreadExitListener* listener)
554 {
555 auto& tlsInst = instance();
556 ThreadExitListener** prev = &tlsInst.tail;
557 for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
558 if (ptr == listener) {
559 *prev = ptr->next;
560 break;
561 }
562 prev = &ptr->next;// 更新前一个节点的指针到当前节点的 next
563 }
564 }
565
566 private:
567 // 私有构造函数,确保使用单例模式
568 ThreadExitNotifier() : tail(nullptr) { }
569 ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
570 ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
571
572 ~ThreadExitNotifier()
573 {
574 // 该线程即将退出,通知所有人!
575 assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
576 for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
577 ptr->callback(ptr->userData);
578 }
579 }
580
581 // Thread-local
582 static inline ThreadExitNotifier& instance()
583 {
584 static thread_local ThreadExitNotifier notifier;
585 return notifier;
586 }
587
588 private:
589 ThreadExitListener* tail;
590 };
591#endif
592#endif
593
594 template<typename T> struct static_is_lock_free_num { enum { value = 0 }; };// 定义一个模板结构体 static_is_lock_free_num,用于判断特定类型是否是无锁(lock-free)的。
595// 这里先给出一个通用的默认定义,将 value 设为 0,表示默认不是无锁的,后续会针对具体类型进行特化来改变这个值。
596 template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };// 对 static_is_lock_free_num 结构体针对 signed char 类型进行特化,通过使用预定义的宏 ATOMIC_CHAR_LOCK_FREE
597// 来设置 value 的值,以表明 signed char 类型是否是无锁的(具体取决于对应平台下该宏的值)。
598 template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };// 对 static_is_lock_free_num 结构体针对 short 类型进行特化,使用 ATOMIC_SHORT_LOCK_FREE 宏来设置 value,
599// 判断 short 类型在对应平台下是否为无锁类型。
600 template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };// 对 static_is_lock_free_num 结构体针对 int 类型进行特化,依据 ATOMIC_INT_LOCK_FREE 宏设置 value,
601// 确定 int 类型在当前平台是否能以无锁方式操作。
602 template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };// 对 static_is_lock_free_num 结构体针对 long 类型进行特化,借助 ATOMIC_LONG_LOCK_FREE 宏来设定 value,
603// 表示 long 类型是否具备无锁特性(由平台相关定义决定)。
604 template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };// 对 static_is_lock_free_num 结构体针对 long long 类型进行特化,通过 ATOMIC_LLONG_LOCK_FREE 宏来设置 value,
605// 以此判断 long long 类型是否可以无锁操作(取决于平台对该宏的定义情况)。
606 template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };// 定义一个模板结构体 static_is_lock_free,它继承自 static_is_lock_free_num,
607// 并且传入的类型会先通过 std::make_signed 转换为有符号类型后再进行相关无锁判断,
608// 这样可以复用前面针对有符号基本整数类型定义的无锁判断逻辑。
609 template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };// 对 static_is_lock_free 结构体针对 bool 类型进行特化,直接使用 ATOMIC_BOOL_LOCK_FREE 宏来设置 value,
610// 确定 bool 类型在对应平台下是否能以无锁方式进行操作。
611 template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };// 对 static_is_lock_free 结构体针对指针类型(U*)进行特化,使用 ATOMIC_POINTER_LOCK_FREE 宏来设置 value,
612// 用于判断指针类型在当前平台是否支持无锁操作。
613}// 这里的大括号结束了前面的 details 命名空间(在完整代码中应该有对应的开头大括号来界定命名空间范围)
614
615
616struct ProducerToken// 定义 ProducerToken 结构体,从名字推测它用于标识生产者相关的一些信息,可能在并发队列等场景中使用。
617{
618 template<typename T, typename Traits>
619 explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);// 定义一个显式构造函数,接受一个 ConcurrentQueue 类型的引用作为参数,
620 // 用于创建与特定 ConcurrentQueue 关联的 ProducerToken 对象,具体实现可能在别处(这里只有声明)。
621 // 模板参数 T 表示队列中存储的数据类型,Traits 表示相关特性参数。
622
623 template<typename T, typename Traits>
624 explicit ProducerToken(BlockingConcurrentQueue<T, Traits>& queue);// 类似上面的构造函数,不过这里是针对 BlockingConcurrentQueue 类型的显式构造函数,
625 // 用于创建与特定 BlockingConcurrentQueue 相关联的 ProducerToken 对象(同样只有声明,具体实现在其他地方)。
626
628 : producer(other.producer)// 定义移动构造函数,用于通过移动语义从另一个 ProducerToken 对象构造当前对象,标记为 MOODYCAMEL_NOEXCEPT
629 // 表示此函数不会抛出异常(这在一些对异常敏感的场景很重要,比如在某些资源管理和并发操作中)。
630 // 它将传入的 other 对象的 producer 指针赋值给当前对象,并将 other 的 producer 指针置空,
631 // 然后如果当前对象的 producer 指针不为空,会将其关联的 token 指向当前对象本身,确保相关指针关系的正确性。
632 {
633 other.producer = nullptr;
634 if (producer != nullptr) {
635 producer->token = this;
636 }
637 }
638
639 inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT// 定义移动赋值运算符重载函数,同样标记为 MOODYCAMEL_NOEXCEPT,实现将另一个 ProducerToken 对象
640 // 通过移动语义赋值给当前对象的功能,内部通过调用 swap 函数来交换相关成员的状态,最后返回当前对象的引用。
641 {
642 swap(other);
643 return *this;
644 }
645
646 void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT// 定义一个交换函数,用于交换当前 ProducerToken 对象和传入的 other 对象的相关成员状态(这里主要是 producer 指针),
647 // 并且在交换后正确地更新相关指针(如 producer 所指向对象中的 token 指针)指向,确保数据结构的一致性和关联性,
648 // 同样标记为 MOODYCAMEL_NOEXCEPT 表示不会抛出异常。
649 {
650 std::swap(producer, other.producer);
651 if (producer != nullptr) {
652 producer->token = this;
653 }
654 if (other.producer != nullptr) {
655 other.producer->token = &other;
656 }
657 }
658
659 // 一个令牌通常是有效的,除非:
660 // 1) 在构造过程中内存分配失败
661 // 2) 通过移动构造函数移动了令牌
662 // (注意:赋值操作会进行交换,因此两个令牌都可能有效)
663 // 3) 关联的队列被销毁
664 // 注意,如果 valid() 返回 true,这仅表示令牌对于特定队列是有效的,
665 // 但并不能确定是哪一个队列;这需要由用户自己跟踪。
666
667 inline bool valid() const { return producer != nullptr; }
668
670 {
671 if (producer != nullptr) {
672 producer->token = nullptr;
673 producer->inactive.store(true, std::memory_order_release);
674 }
675 }
676
677 // 禁用复制和分配
678 ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;// 定义ProducerToken类,这里通过一些特殊的宏(MOODYCAMEL_DELETE_FUNCTION)禁止了拷贝构造函数和赋值运算符的默认生成
679// 意味着这个类的对象不能通过拷贝构造或者赋值的常规方式来复制
680 ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;// 声明赋值运算符为删除函数,禁止对该类对象进行赋值操作
681
682private:
683 template<typename T, typename Traits> friend class ConcurrentQueue;// 将ConcurrentQueue类模板(针对任意类型T和相关特性Traits)声明为友元,这样ConcurrentQueue类模板可以访问ProducerToken类的私有成员
684 friend class ConcurrentQueueTests;// 将ConcurrentQueueTests类声明为友元,该类可以访问ProducerToken类的私有成员,可能用于测试相关功能
685
686protected:
687 details::ConcurrentQueueProducerTypelessBase* producer;// 指向一个无类型的并发队列生产者基础类的指针,用于后续在并发队列相关操作中涉及生产者的功能实现
688};
689
690
691struct ConsumerToken// 定义ConsumerToken结构体,通常用于表示在并发队列消费者端相关操作的一些状态或标识信息
692{
693 template<typename T, typename Traits>
694 explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);// 显式的构造函数模板,用于根据指定的并发队列(类型为ConcurrentQueue<T, Traits>)来构造ConsumerToken对象
695 // 可能在创建消费者相关标识时,初始化一些与该队列相关的属性等
696
697 template<typename T, typename Traits>
698 explicit ConsumerToken(BlockingConcurrentQueue<T, Traits>& q);// 另一个显式的构造函数模板,用于根据指定的阻塞式并发队列(类型为BlockingConcurrentQueue<T, Traits>)来构造ConsumerToken对象
699 // 与上面不同的是针对阻塞式并发队列的情况进行相应初始化操作
700
701 ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT// 移动构造函数,用于将其他ConsumerToken对象(通过右值引用other接收)的资源转移到当前正在构造的对象中
702 // 这里按照成员变量逐个进行了初始化,实现了资源的高效转移,不会进行深拷贝等开销较大的操作,并且标记为不抛出异常(MOODYCAMEL_NOEXCEPT)
703 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
704 {
705 }
706
707 inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT// 移动赋值运算符,用于将右值引用的其他ConsumerToken对象(other)的资源转移到当前对象
708 // 通过调用swap函数来交换两个对象的内部成员,实现资源的转移,返回当前对象的引用,同样标记为不抛出异常(MOODYCAMEL_NOEXCEPT)
709 {
710 swap(other);
711 return *this;
712 }
713
714 void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT// 交换函数,用于交换当前ConsumerToken对象与另一个ConsumerToken对象(other)的内部成员变量的值
715 // 通过标准库的std::swap来实现各个成员变量的交换,达到交换两个对象状态的目的,标记为不抛出异常(MOODYCAMEL_NOEXCEPT)
716 {
717 std::swap(initialOffset, other.initialOffset);
718 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
719 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
720 std::swap(currentProducer, other.currentProducer);
721 std::swap(desiredProducer, other.desiredProducer);
722 }
723
724 // 禁用拷贝和赋值操作
727
728private:
729 template<typename T, typename Traits> friend class ConcurrentQueue;
731
732private: // 但与并发队列共享
733 std::uint32_t initialOffset;
738};
739
740// 需要前向声明这个 swap,因为它在一个命名空间中。
741// 参见 http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
742template<typename T, typename Traits>
743inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT;// 定义一个模板函数 swap,用于交换 ConcurrentQueue<T, Traits> 类型中 ImplicitProducerKVP 类型的两个对象 a 和 b。
744// 函数标记为 MOODYCAMEL_NOEXCEPT,表示此函数不会抛出异常,具体函数实现应该在其他地方(这里仅为声明)。
745// 注意这里使用 typename 关键字来明确指定 ImplicitProducerKVP 是 ConcurrentQueue<T, Traits> 内部定义的类型,避免编译歧义。
746
747
748template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
749class ConcurrentQueue// 定义 ConcurrentQueue 模板类,用于实现并发队列功能,它依赖两个模板参数 T(队列中存储的数据类型)和 Traits(用于定制队列相关特性的类型),
750// Traits 有一个默认值 ConcurrentQueueDefaultTraits,意味着如果用户没有显式指定 Traits 参数,将使用默认的特性配置。
751{
752public:
753 typedef ::moodycamel::ProducerToken producer_token_t;// 定义一个类型别名 producer_token_t,它指向 moodycamel 命名空间下的 ProducerToken 类型,
754 // 方便在 ConcurrentQueue 类内部使用这个代表生产者标识的类型,增强代码可读性。
755 typedef ::moodycamel::ConsumerToken consumer_token_t;// 类似地,定义类型别名 consumer_token_t,指向 moodycamel 命名空间下的 ConsumerToken 类型,用于表示消费者相关的标识类型。
756
757 typedef typename Traits::index_t index_t;// 根据传入的 Traits 类型中的 index_t 类型定义一个类型别名 index_t,用于在类内部表示索引相关的数据类型,
758 // 具体的类型由 Traits 来定制,一般用于队列内部元素索引等操作。
759 typedef typename Traits::size_t size_t;// 同样根据 Traits 中的 size_t 类型定义类型别名 size_t,用于表示与队列大小等相关的尺寸数据类型,
760 // 比如队列长度、块大小等相关计量使用该类型,要求是无符号整数类型(由后面的静态断言约束)。
761
762 static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);// 定义一个静态常量 BLOCK_SIZE,其值通过将 Traits 中定义的 BLOCK_SIZE 转换为 size_t 类型后得到,
763 // 这个常量通常用于表示并发队列内部数据块的大小,比如每次分配内存等操作以这个大小为单位进行。
764 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);// 定义静态常量 EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD,转换自 Traits 中的对应值,
765 // 可能用于在明确块为空的计数达到某个阈值时触发特定的操作,比如内存回收或者队列结构调整等相关逻辑(具体取决于实现)。
766 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);// 定义静态常量 EXPLICIT_INITIAL_INDEX_SIZE,由 Traits 中的对应值转换而来,
767 // 可能用于初始化队列索引相关结构的大小,确保其初始状态满足一定的要求(例如是2的幂次方等,由后面静态断言约束)。
768 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);// 定义静态常量 IMPLICIT_INITIAL_INDEX_SIZE,同样取自 Traits 中的对应值转换,
769 // 大概率用于和隐式操作(比如隐式入队等相关逻辑)相关的索引结构初始大小设定。
770 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);// 定义静态常量 INITIAL_IMPLICIT_PRODUCER_HASH_SIZE,转换自 Traits 里的对应值,
771 // 可能涉及到对生产者进行哈希相关操作时初始哈希表大小等方面的设定(比如用于快速查找生产者等功能)。
772 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);// 定义静态常量 EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE,转换自 Traits 中的对应值,
773 // 也许用于规定消费者在进行某种轮转(比如切换消费的数据块等情况)操作之前可以消费的数量配额,便于队列内部负载均衡等相关管理。
774#ifdef _MSC_VER
775#pragma warning(push)
776#pragma warning(disable: 4307) //+ 整型常量溢出(这就是三元表达式的用途!
777#pragma warning(disable: 4309) // static_cast:常量值的截断
778#endif
779 static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);// 定义静态常量 MAX_SUBQUEUE_SIZE,其值通过一个三元表达式来确定。
780 // 如果 (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) 这个条件成立,
781 // 则 MAX_SUBQUEUE_SIZE 取 details::const_numeric_max<size_t>::value(可能是 size_t 类型能表示的最大值),
782 // 否则取 ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE),
783 // 目的可能是确保子队列大小在满足一定条件下合理设置,避免出现一些边界情况导致的问题,比如内存溢出等情况。
784#ifdef _MSC_VER
785#pragma warning(pop)
786#endif
787
788 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type");// 静态断言,用于在编译期检查 Traits 中定义的 size_t 类型必须是无符号整数类型,
789 // 如果不符合要求,编译将会报错,确保后续基于该类型的各种操作(如队列大小计算等)符合预期的数学逻辑和内存使用逻辑。
790 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type");// 类似的静态断言,检查 Traits 中的 index_t 类型也必须是无符号整数类型,
791 // 因为索引通常是用于计数、定位等操作,使用无符号整数可以避免一些负数相关的逻辑错误和边界问题。
792 static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");// 静态断言要求 index_t 类型的字节大小至少要和 size_t 类型一样大,
793 // 这样在一些涉及到两者比较、赋值等操作时可以保证数据不会出现截断等错误情况,确保数据完整性和逻辑正确性。
794 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");// 静态断言检查 Traits 中定义的 BLOCK_SIZE 必须是2的幂次方且至少为2,
795 // 以满足并发队列内部可能基于2的幂次方进行内存对齐、分块等优化操作的要求,方便快速定位、计算等操作。
796 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");// 静态断言确保 EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD 是2的幂次方且大于1,
797 // 这样在基于计数触发相关操作时可以利用位运算等高效方式进行判断,同时大于1保证有实际的计数意义。
798 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");// 静态断言要求 EXPLICIT_INITIAL_INDEX_SIZE 是2的幂次方且大于1,
799 // 保证索引相关的初始结构大小设置合理,便于后续进行扩展、定位等操作(比如基于位运算的快速索引查找等)。
800 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");// 同样的静态断言,针对 IMPLICIT_INITIAL_INDEX_SIZE,确保其是2的幂次方且大于1,
801 // 用于保障和隐式操作相关的索引初始设置符合高效处理的要求。
802 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)), "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");// 静态断言检查 INITIAL_IMPLICIT_PRODUCER_HASH_SIZE 要么是0(可能表示禁用相关隐式入队等哈希操作),要么是2的幂次方,
803 // 这样在进行哈希相关计算、存储等操作时可以利用位运算等技巧提高效率,保证哈希表结构的合理性。
804 static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1, "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");// 静态断言要求 INITIAL_IMPLICIT_PRODUCER_HASH_SIZE 要么是0(表示禁用对应功能),要么至少是1,
805 // 保证在启用相关隐式操作涉及的哈希功能时有合理的初始状态和有效的哈希处理能力。
806
807public:
808
809 // 创建一个具有至少 `capacity` 元素槽的队列;注意实际能够插入的元素数量
810 // 取决于生产者的数量和块大小(例如,如果块大小等于 `capacity`,则只会
811 // 预先分配一个块,这意味着只有一个生产者能够在不进行额外分配的情况下
812 // 将元素入队 —— 块在生产者之间不会共享)。这个方法不是线程安全的 ——
813 // 用户需要确保队列在开始被其他线程使用之前已经完全构造(这包括
814 // 使构造的内存效果可见,可能需要使用内存屏障)。
815 explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
816 : producerListTail(nullptr),
817 producerCount(0),
821 {
822 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
824 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
825
826#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
827 // 使用完全解析的类型化列表跟踪所有生产者
828 // 每一种;这使得可以从
829 // 根队列对象(否则需要
830 // 不要在 Debugger 的 Expression Evaluator 中编译)。
831 explicitProducers.store(nullptr, std::memory_order_relaxed);
832 implicitProducers.store(nullptr, std::memory_order_relaxed);
833#endif
834 }
835
836 // 根据您希望在任何给定时间可用的最小元素数量和每种生产者的最大并发数量,
837 // 计算适当的预分配块数量。
838
839 ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)// ConcurrentQueue是一个类的构造函数,用于初始化并发队列对象,接受三个参数:
840// minCapacity表示并发队列期望的最小容量,即队列至少要能容纳这么多元素;
841// maxExplicitProducers表示显式生产者的最大数量,也就是可以明确创建并参与操作的生产者的最大个数;
842// maxImplicitProducers表示隐式生产者的最大数量,可能涉及一些间接或者默认创建参与队列操作的生产者的最大个数
843 : producerListTail(nullptr),// 初始化成员变量producerListTail为nullptr,它可能用于指向生产者列表的尾部,在这里初始化为空指针
844 producerCount(0),// 初始化成员变量producerCount为0,用于记录当前生产者的数量,构造时初始化为0个生产者
845 initialBlockPoolIndex(0),// 初始化成员变量initialBlockPoolIndex为0,可能用于标识初始的块池索引,初始值设为0
846 nextExplicitConsumerId(0),// 初始化成员变量nextExplicitConsumerId为0,用于给下一个显式消费者分配唯一标识,初始化为0
847 globalExplicitConsumerOffset(0)// 初始化成员变量globalExplicitConsumerOffset为0,可能用于记录显式消费者在全局范围内的偏移量等相关信息,初始化为0
848 {
849 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);// 清除隐式生产者哈希调整进行中的相关标识,采用宽松内存顺序(std::memory_order_relaxed),
850 // 这意味着该操作不需要与其他内存操作有严格的顺序依赖关系,主要用于多线程环境下的高效处理,减少不必要的同步开销
851 populate_initial_implicit_producer_hash();// 调用populate_initial_implicit_producer_hash函数,从函数名推测可能是用于初始化或填充初始的隐式生产者哈希相关数据结构,
852 // 比如设置初始的哈希表大小、插入默认的隐式生产者相关记录等
853 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);// 根据给定的参数计算需要初始化的块(blocks)的数量,计算逻辑是先根据最小容量和固定的块大小(BLOCK_SIZE)计算出大致的块数,
854 // 再结合显式生产者和隐式生产者的最大数量进行调整,以确保有足够的块来满足队列容量以及生产者操作的需求
855 populate_initial_block_list(blocks);// 调用populate_initial_block_list函数,根据计算出的块数量(blocks)来初始化或填充初始的块列表,
856 // 比如创建相应数量的块对象、设置块之间的连接关系等,为队列存储数据做准备
857
858#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
859 explicitProducers.store(nullptr, std::memory_order_relaxed);// 如果定义了MOODYCAMEL_QUEUE_INTERNAL_DEBUG宏(通常用于调试目的),
860 // 则将explicitProducers成员变量存储的值设为nullptr,采用宽松内存顺序(std::memory_order_relaxed),
861 // 可能用于调试场景下初始化显式生产者相关的数据结构或标识其初始状态为空
862 implicitProducers.store(nullptr, std::memory_order_relaxed);// 同样在调试宏定义的情况下,将implicitProducers成员变量存储的值设为nullptr,采用宽松内存顺序,
863 // 用于调试场景下初始化隐式生产者相关的数据结构或标识其初始状态为空
864#endif
865 }
866
867 // 注意:在队列被删除时不应同时访问它。用户需要同步这一点。
868 // 这个方法不是线程安全的。
869
871 {
872 // 销毁生产者
873 auto ptr = producerListTail.load(std::memory_order_relaxed);
874 while (ptr != nullptr) {
875 auto next = ptr->next_prod();
876 if (ptr->token != nullptr) {
877 ptr->token->producer = nullptr;
878 }
879 destroy(ptr);
880 ptr = next;
881 }
882
883 // 销毁隐式生产者哈希表
885 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
886 while (hash != nullptr) {
887 auto prev = hash->prev;
888 if (prev != nullptr) { // 最后一个哈希是此对象的一部分,不是动态分配的
889 for (size_t i = 0; i != hash->capacity; ++i) {
890 hash->entries[i].~ImplicitProducerKVP();
891 }
892 hash->~ImplicitProducerHash();
893 (Traits::free)(hash);
894 }
895 hash = prev;
896 }
897 }
898
899 // 销毁全局空闲列表
900 auto block = freeList.head_unsafe();
901 while (block != nullptr) {
902 auto next = block->freeListNext.load(std::memory_order_relaxed);
903 if (block->dynamicallyAllocated) {
904 destroy(block);
905 }
906 block = next;
907 }
908
909 // 销毁初始空闲列表
911 }
912
913 // 禁用复制构造函数和复制赋值运算符
916
917 // 移动操作是支持的,但请注意它 *不是* 线程安全的操作。
918 // 在队列被移动时,其他线程不能使用该队列,并且必须在其他线程可以使用它之前传播该移动的内存效果。
919 // 注意:当队列被移动时,它的令牌仍然有效,但只能与目标队列一起使用(即语义上它们也被随队列一起移动)。
920
922 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
923 producerCount(other.producerCount.load(std::memory_order_relaxed)),
924 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
925 initialBlockPool(other.initialBlockPool),
926 initialBlockPoolSize(other.initialBlockPoolSize),
927 freeList(std::move(other.freeList)),
928 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
929 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
930 {
931 // 将另一个队列移动到当前队列,并将另一个队列留为空队列
932 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
935
936 other.producerListTail.store(nullptr, std::memory_order_relaxed);
937 other.producerCount.store(0, std::memory_order_relaxed);
938 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
939 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
940
941#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
942 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
943 other.explicitProducers.store(nullptr, std::memory_order_relaxed);
944 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
945 other.implicitProducers.store(nullptr, std::memory_order_relaxed);
946#endif
947
948 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
949 other.initialBlockPoolSize = 0;
950 other.initialBlockPool = nullptr;
951
953 }
954
956 {
957 return swap_internal(other);
958 }
959
960 // 交换当前队列的状态与另一个队列的状态。此操作不是线程安全的。
961 // 交换两个队列不会使它们的令牌失效,然而
962 // 为一个队列创建的令牌必须只与交换后的队列一起使用(即,令牌与
963 // 队列的可移动状态相关联,而不是对象本身)。
964
966 {
967 swap_internal(other);
968 }
969
970private:
972 {
973 if (this == &other) {
974 return *this;
975 }
976
977 details::swap_relaxed(producerListTail, other.producerListTail);
978 details::swap_relaxed(producerCount, other.producerCount);
979 details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
980 std::swap(initialBlockPool, other.initialBlockPool);
981 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
982 freeList.swap(other.freeList);
983 details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
984 details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
985
987
989 other.reown_producers();
990
991#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
992 details::swap_relaxed(explicitProducers, other.explicitProducers);
993 details::swap_relaxed(implicitProducers, other.implicitProducers);
994#endif
995
996 return *this;
997 }
998
999public:
1000
1001 // 将单个项目(通过复制)入队。
1002 // 如有必要,分配内存。仅在内存分配失败时(或隐式
1003 // 生产被禁用,因为 Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE 为 0,
1004 // 或 Traits::MAX_SUBQUEUE_SIZE 已定义并可能被超越)才会失败。
1005 // 线程安全。
1006 inline bool enqueue(T const& item)
1007 {
1008 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1009 return inner_enqueue<CanAlloc>(item);
1010 }
1011
1012 // 将单个项目(如果可能,通过移动)入队。
1013 // 如有必要,分配内存。仅在内存分配失败时(或隐式
1014 // 生产被禁用,因为 Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE 为 0,
1015 // 或 Traits::MAX_SUBQUEUE_SIZE 已定义并可能被超越)才会失败。
1016 // 线程安全。
1017 inline bool enqueue(T&& item)
1018 {
1019 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1020 return inner_enqueue<CanAlloc>(std::move(item));
1021 }
1022
1023 // 使用显式生产者令牌将单个项目(通过复制)入队。
1024 // 如有必要,分配内存。仅在内存分配失败时(或
1025 // Traits::MAX_SUBQUEUE_SIZE 已定义并可能被超越)才会失败。
1026 // 线程安全。
1027 inline bool enqueue(producer_token_t const& token, T const& item)
1028 {
1029 return inner_enqueue<CanAlloc>(token, item);
1030 }
1031
1032 // 使用显式生产者令牌将单个项目(如果可能,通过移动)入队。
1033 // 如有必要,分配内存。仅在内存分配失败时(或
1034 // Traits::MAX_SUBQUEUE_SIZE 已定义并可能被超越)才会失败。
1035 // 线程安全。
1036 inline bool enqueue(producer_token_t const& token, T&& item)
1037 {
1038 return inner_enqueue<CanAlloc>(token, std::move(item));
1039 }
1040
1041 // 将多个项目入队。
1042 // 如有必要,分配内存。仅在内存分配失败时(或隐式生产被禁用,因为 Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE 为 0,
1043 // 或 Traits::MAX_SUBQUEUE_SIZE 已定义并可能被超越)才会失败。
1044 // 注意:如果要移动而非复制元素,请使用 std::make_move_iterator。
1045 // Thread-safe.
1046 template<typename It>
1047 bool enqueue_bulk(It itemFirst, size_t count)
1048 {
1049 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1050 return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1051 }
1052
1053 // 使用显式生产者令牌将多个项目入队。
1054 // Allocates memory if required. Only fails if memory allocation fails
1055 // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1056 // Note: Use std::make_move_iterator if the elements should be moved
1057 // instead of copied.
1058 // Thread-safe.
1059 template<typename It>
1060 bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1061 {
1062 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1063 }
1064
1065 // 将单个项目(通过复制)入队。
1066 // Does not allocate memory. Fails if not enough room to enqueue (or implicit
1067 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
1068 // is 0).
1069 // Thread-safe.
1070 inline bool try_enqueue(T const& item)
1071 {
1072 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1073 return inner_enqueue<CannotAlloc>(item);
1074 }
1075
1076 // 将单个项目入队(如果可能,通过移动)。
1077 // Does not allocate memory (except for one-time implicit producer).
1078 // Fails if not enough room to enqueue (or implicit production is
1079 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1080 // Thread-safe.
1081 inline bool try_enqueue(T&& item)
1082 {
1083 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1084 return inner_enqueue<CannotAlloc>(std::move(item));
1085 }
1086
1087 // 使用显式生产者令牌将单个项目入队(通过复制)。
1088 // Does not allocate memory. Fails if not enough room to enqueue.
1089 // Thread-safe.
1090 inline bool try_enqueue(producer_token_t const& token, T const& item)
1091 {
1092 return inner_enqueue<CannotAlloc>(token, item);
1093 }
1094
1095 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1096 // Does not allocate memory. Fails if not enough room to enqueue.
1097 // Thread-safe.
1098 inline bool try_enqueue(producer_token_t const& token, T&& item)
1099 {
1100 return inner_enqueue<CannotAlloc>(token, std::move(item));
1101 }
1102
1103 // 批量入队多个项目。
1104 // 不会分配内存(除了一个一次性的隐式生产者)。
1105 // Fails if not enough room to enqueue (or implicit production is
1106 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1107 // Note: Use std::make_move_iterator if the elements should be moved
1108 // instead of copied.
1109 // Thread-safe.
1110 template<typename It>
1111 bool try_enqueue_bulk(It itemFirst, size_t count)
1112 {
1113 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1114 return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1115 }
1116
1117 // 使用显式生产者令牌批量入队多个项目。
1118 // Does not allocate memory. Fails if not enough room to enqueue.
1119 // Note: Use std::make_move_iterator if the elements should be moved
1120 // instead of copied.
1121 // Thread-safe.
1122 template<typename It>
1123 bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1124 {
1125 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1126 }
1127
1128
1129
1130 // 尝试从队列中出队。
1131 // 如果在检查时所有生产者流都为空,则返回 false(因此队列可能为空,但不保证为空)。
1132 // 从不进行内存分配。线程安全。
1133 template<typename U>
1134 bool try_dequeue(U& item)
1135 {
1136
1137 // 我们不是简单地依次尝试每个生产者(这可能会导致第一个生产者出现不必要的竞争),
1138 // 而是通过启发式方法对它们进行评分。
1139 size_t nonEmptyCount = 0;
1140 ProducerBase* best = nullptr;
1141 size_t bestSize = 0;
1142 for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1143 auto size = ptr->size_approx();
1144 if (size > 0) {
1145 if (size > bestSize) {
1146 bestSize = size;
1147 best = ptr;
1148 }
1149 ++nonEmptyCount;
1150 }
1151 }
1152
1153 // 如果至少有一个非空队列,但在尝试从中出队时它似乎为空,我们需要确保每个队列都已被尝试过。
1154 if (nonEmptyCount > 0) {
1155 if ((details::likely)(best->dequeue(item))) {
1156 return true;
1157 }
1158 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1159 if (ptr != best && ptr->dequeue(item)) {
1160 return true;
1161 }
1162 }
1163 }
1164 return false;
1165 }
1166
1167 // 尝试从队列中出队。
1168 // 如果在检查时所有生产者流都为空,则返回 false(因此队列可能为空,但不保证为空)。
1169 // 与 try_dequeue(item) 方法不同的是,这个方法不会通过交错生产者流的出队顺序来减少竞争。
1170 // 因此,在竞争情况下使用这个方法可能会降低整体吞吐量,但在单线程消费者场景下会提供更可预测的结果。
1171 // 这主要适用于内部单元测试。
1172 // 从不进行内存分配。Thread-safe.
1173 template<typename U>
1175 {
1176 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1177 if (ptr->dequeue(item)) {
1178 return true;
1179 }
1180 }
1181 return false;
1182 }
1183
1184 // Attempts to dequeue from the queue using an explicit consumer token.
1185 // Returns false if all producer streams appeared empty at the time they
1186 // were checked (so, the queue is likely but not guaranteed to be empty).
1187 // Never allocates. Thread-safe.
1188 template<typename U>
1189 bool try_dequeue(consumer_token_t& token, U& item)
1190 {
1191
1192 // 大致思想如下:
1193 // 每处理来自一个生产者的 256 个项目,就让所有生产者进行轮换(增加全局偏移量)——这意味着效率最高的消费者在一定程度上决定了其他消费者的轮换速度。
1194 // 如果看到全局偏移量发生了变化,你必须重置你的消费计数器并移动到指定的位置。
1195 // 如果你所在的位置没有项目,继续移动直到找到一个有项目的生产者。
1196 // 如果全局偏移量没有变化,但你已经没有更多项目可以消费,继续从当前位置移动,直到找到一个有项目的生产者。
1197 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1199 return false;
1200 }
1201 }
1202
1203 // 如果至少有一个非空队列,但在尝试从中取出元素时它却显得为空,我们需要确保每个队列都已经被尝试过
1204 if (static_cast<ProducerBase*>(token.currentProducer)->dequeue(item)) {
1206 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1207 }
1208 return true;
1209 }
1210
1211 auto tail = producerListTail.load(std::memory_order_acquire);
1212 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1213 if (ptr == nullptr) {
1214 ptr = tail;
1215 }
1216 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1217 if (ptr->dequeue(item)) {
1218 token.currentProducer = ptr;
1219 token.itemsConsumedFromCurrent = 1;
1220 return true;
1221 }
1222 ptr = ptr->next_prod();
1223 if (ptr == nullptr) {
1224 ptr = tail;
1225 }
1226 }
1227 return false;
1228 }
1229
1230 // 尝试从队列中取出多个元素。
1231 // 返回实际取出的元素数量。
1232 // 如果所有生产者流在检查时都显得为空(因此,队列可能但不一定为空),则返回0。
1233 // Never allocates. Thread-safe.
1234 template<typename It>
1235 size_t try_dequeue_bulk(It itemFirst, size_t max)
1236 {
1237 size_t count = 0;
1238 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1239 count += ptr->dequeue_bulk(itemFirst, max - count);
1240 if (count == max) {
1241 break;
1242 }
1243 }
1244 return count;
1245 }
1246
1247 // 尝试使用显式消费者令牌从队列中取出多个元素。
1248 // Returns the number of items actually dequeued.
1249 // Returns 0 if all producer streams appeared empty at the time they
1250 // were checked (so, the queue is likely but not guaranteed to be empty).
1251 // Never allocates. Thread-safe.
1252 template<typename It>
1253 size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
1254 {
1255 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1256// 如果期望的生产者为空,或者上次已知的全局偏移量与当前全局显式消费者偏移量(通过原子加载,内存序为宽松模型)不一致,
1257 // 则需要更新当前生产者相关信息。
1259// 如果更新当前生产者信息失败,直接返回0,表示无法进行出队操作。
1260 return 0;
1261 }
1262 }
1263
1264 size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1265// 通过将当前生产者指针转换为ProducerBase*类型,并调用其dequeue_bulk方法尝试批量出队元素,
1266 // 返回实际出队的元素数量并赋值给count变量。
1267 if (count == max) {
1268// 如果实际出队数量等于期望的最大出队数量max,说明当前生产者的队列中元素足够。
1269 if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1270// 如果从当前生产者已消费的元素数量达到了轮转前的消费配额(通过累加并比较判断),
1271 // 则原子地增加全局显式消费者偏移量(内存序为宽松模型),表示消费进度的推进。
1272 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1273 }
1274 return max;
1275 }
1276 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1277// 将实际出队的元素数量累加到从当前生产者已消费的元素数量中。
1278 max -= count;
1279// 更新剩余还期望出队的元素数量。
1280 auto tail = producerListTail.load(std::memory_order_acquire);
1281// 原子地加载生产者列表的尾指针(内存序为获取模型,保证获取到的是最新写入的值)。
1282 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1283// 获取当前生产者的下一个生产者指针。
1284 if (ptr == nullptr) {
1285 ptr = tail;
1286 }
1287// 如果下一个生产者指针为空,则将其设置为生产者列表的尾指针,即回绕到列表末尾。
1288 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1289// 循环遍历其他生产者,直到再次回到当前生产者(形成一个循环查找的逻辑)。
1290 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1291// 尝试从当前遍历到的生产者队列中批量出队元素,返回实际出队数量并赋值给dequeued变量。
1292 count += dequeued;
1293// 将本次出队数量累加到总的出队数量count中。
1294 if (dequeued != 0) {
1295 token.currentProducer = ptr;
1296 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1297// 如果本次有元素出队,更新当前生产者指针为当前遍历到的这个生产者,
1298 // 并将从当前生产者已消费的元素数量更新为本次出队数量。
1299 }
1300 if (dequeued == max) {
1301 break;
1302 }
1303// 如果本次出队数量刚好等于剩余期望出队数量,说明已经满足需求,直接跳出循环。
1304 max -= dequeued;
1305 ptr = ptr->next_prod();
1306 if (ptr == nullptr) {
1307 ptr = tail;
1308 }
1309// 更新下一个要遍历的生产者指针,如果为空则回绕到生产者列表尾指针。
1310 }
1311 return count;
1312 }
1313
1314
1315
1316 // Attempts to dequeue from a specific producer's inner queue.
1317 // If you happen to know which producer you want to dequeue from, this
1318 // is significantly faster than using the general-case try_dequeue methods.
1319 // Returns false if the producer's queue appeared empty at the time it
1320 // was checked (so, the queue is likely but not guaranteed to be empty).
1321 // Never allocates. Thread-safe.
1322 template<typename U>
1323 inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
1324 {
1325 return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
1326// 将传入的生产者指针转换为ExplicitProducer*类型,并调用其dequeue方法尝试出队一个元素,
1327 // 返回出队操作的结果(成功为true,失败为false)。
1328}
1329 }
1330
1331 // Attempts to dequeue several elements from a specific producer's inner queue.
1332 // Returns the number of items actually dequeued.
1333 // If you happen to know which producer you want to dequeue from, this
1334 // is significantly faster than using the general-case try_dequeue methods.
1335 // Returns 0 if the producer's queue appeared empty at the time it
1336 // was checked (so, the queue is likely but not guaranteed to be empty).
1337 // Never allocates. Thread-safe.
1338 template<typename It>
1339 inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
1340 {
1341 return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
1342// 将传入的生产者指针转换为ExplicitProducer*类型,并调用其dequeue_bulk方法尝试批量出队元素,
1343 // 返回实际出队的元素数量。
1344 }
1346
1347 // Returns an estimate of the total number of elements currently in the queue. This
1348 // estimate is only accurate if the queue has completely stabilized before it is called
1349 // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1350 // visible on the calling thread, and no further operations start while this method is
1351 // being called).
1352// 返回当前队列中元素总数的一个估计值。只有在调用此方法之前队列已经完全稳定(即所有入队和出队操作都已完成,
1353// 并且它们的内存影响在调用线程上可见,并且在调用此方法时没有进一步的操作开始)时,这个估计值才是准确的。
1354 // Thread-safe.
1355 size_t size_approx() const
1357 size_t size = 0;
1358 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1359
1360// 原子地加载生产者列表的尾指针(内存序为获取模型),然后通过循环遍历每个生产者,
1361 // 只要当前生产者指针不为空,就获取下一个生产者指针继续循环。
1362 size += ptr->size_approx();
1363 }
1364 return size;
1365 }
1366
1368 // Returns true if the underlying atomic variables used by
1369 // the queue are lock-free (they should be on most platforms).
1370 // Thread-safe.
1371 static bool is_lock_free()
1372 {
1373 return
1380// 通过检查一系列类型对应的底层静态无锁判断(通过details命名空间下的相关函数或类型进行判断,具体实现细节应该在相应的地方定义),
1381 // 只有当所有这些类型对应的判断值都为2时(具体含义应该由相关实现定义,可能表示满足无锁条件等情况),才返回true,表示整个队列使用的原子变量是无锁的。
1382 }
1383
1384
1385private:
1386 friend struct ProducerToken;
1387 friend struct ConsumerToken;
1388 struct ExplicitProducer;
1389 friend struct ExplicitProducer;
1390 struct ImplicitProducer;
1391 friend struct ImplicitProducer;
1393
1395
1396
1397 ///////////////////////////////
1398 // Queue methods
1399 ///////////////////////////////
1400
1401 template<AllocationMode canAlloc, typename U>
1402 inline bool inner_enqueue(producer_token_t const& token, U&& element)
1404 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1405// 将传入的生产者令牌中的生产者指针转换为ExplicitProducer*类型,然后调用其内部的特定模板版本的enqueue方法(根据canAlloc参数决定是否可以分配内存的版本),
1406 // 传入右值引用形式的元素,尝试将元素入队,返回入队操作的结果(成功为true,失败为false)。
1407 }
1408
1409 template<AllocationMode canAlloc, typename U>
1410 inline bool inner_enqueue(U&& element)
1411 {
1412 auto producer = get_or_add_implicit_producer();
1413 return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1414 }
1415
1416 template<AllocationMode canAlloc, typename It>
1417 inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1418 {
1419 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1420 }
1421
1422 template<AllocationMode canAlloc, typename It>
1423 inline bool inner_enqueue_bulk(It itemFirst, size_t count)
1424 {
1425 auto producer = get_or_add_implicit_producer();
1426 return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1428
1430 {
1431 // 发生了轮换,找出我们应该处于的位置!
1432 auto tail = producerListTail.load(std::memory_order_acquire);
1433 if (token.desiredProducer == nullptr && tail == nullptr) {
1434 return false;
1435 }
1436 auto prodCount = producerCount.load(std::memory_order_relaxed);
1437 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1438 if ((details::unlikely)(token.desiredProducer == nullptr)) {
1439 // 我们第一次从队列中取出任何东西。
1440 // 确定我们的本地位置。
1441 // 注意:偏移量是从开始处计算的,而我们是从末尾遍历的——因此先从计数中减去偏移量。
1442
1443 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1444 token.desiredProducer = tail;
1445 for (std::uint32_t i = 0; i != offset; ++i) {
1446 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1447 if (token.desiredProducer == nullptr) {
1448 token.desiredProducer = tail;
1449 }
1450 }
1451// 通过循环,根据计算出的偏移量逐步向前移动期望的生产者指针,每次获取当前期望生产者的下一个生产者指针,
1452 // 如果遇到下一个生产者指针为空的情况,则回绕到生产者列表尾指针,直到移动了指定的偏移量次数。
1453 }
1454
1455 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1456 if (delta >= prodCount) {
1457 delta = delta % prodCount;
1459 for (std::uint32_t i = 0; i != delta; ++i) {
1460 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1461 if (token.desiredProducer == nullptr) {
1462 token.desiredProducer = tail;
1463 }
1464 }
1465
1466 token.lastKnownGlobalOffset = globalOffset;
1467 token.currentProducer = token.desiredProducer;
1468 token.itemsConsumedFromCurrent = 0;
1469 return true;
1470// 更新上次已知的全局偏移量为当前的全局偏移量,将当前生产者指针设置为期望的生产者指针,
1471 // 并将从当前生产者已消费的元素数量重置为0,最后返回true,表示成功更新了当前生产者相关信息。
1472 }
1473
1474
1475 ///////////////////////////
1476 // Free list
1477 ///////////////////////////
1478
1479 template <typename N>
1481 {
1483
1484 std::atomic<std::uint32_t> freeListRefs;
1485 std::atomic<N*> freeListNext;
1486 };
1487
1488 // 一个基于CAS的简单无锁空闲列表。
1489 // 在高争用的情况下不是最快的,但它简单且正确(假设节点在空闲列表销毁之前不会被释放)。
1490 // 在低争用的情况下相当快速。
1491 template<typename N> // N 必须继承自 FreeListNode 或具有相同的字段(以及它们的初始化)
1493 {
1494 FreeList() : freeListHead(nullptr) { }
1495 FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
1497
1500
1501 inline void add(N* node)
1503#if MCDBGQ_NOLOCKFREE_FREELIST
1504 debug::DebugLock lock(mutex);
1505#endif
1506
1507 // 我们知道此时应在自由列表上的标志位为 0,因此可以安全地使用 fetch_add 来设置它。
1508 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1510 // 我们是最后一个引用这个节点的,我们知道我们想将它添加到自由列表中,所以就这样做吧!
1512 }
1513 }
1514
1515 inline N* try_get()
1516 {
1517#if MCDBGQ_NOLOCKFREE_FREELIST
1518 debug::DebugLock lock(mutex);
1519#endif
1520 auto head = freeListHead.load(std::memory_order_acquire);
1521 while (head != nullptr) {
1522 auto prevHead = head;
1523 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1524 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1525 head = freeListHead.load(std::memory_order_acquire);
1526 continue;
1527 }
1528
1529 // 好的,引用计数已被递增(它之前不为零),这意味着我们可以安全地读取下一个值,而不必担心在现在和执行 CAS 操作之间它会发生变化。
1530 auto next = head->freeListNext.load(std::memory_order_relaxed);
1531 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1532 // 太好了,拿到了节点。这意味着节点原本在列表上,这也表明 shouldBeOnFreeList 必须是假的,无论 refcount 的值如何。
1533 // 这是因为在节点被取下之前,没有其他人知道它已经被取下,所以它不能被放回列表中。
1534 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1535
1536 // Decrease refcount twice, once for our ref, and once for the list's ref
1537 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1538 return head;
1539 }
1540
1541 // 好的,头指针必须已经发生变化,但我们仍然需要减少我们之前增加的 refcount。
1542 // 请注意,我们不需要释放任何内存效果,但我们需要确保 refcount 的递减操作在对头指针的 CAS 操作之后发生。
1543 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1544 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1546 }
1547 }
1548
1549 return nullptr;
1550 }
1551
1552 // 在没有争用的情况下遍历列表时很有用(例如,销毁剩余的节点)
1553 N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
1554
1555 private:
1556 inline void add_knowing_refcount_is_zero(N* node)
1557 {
1558 // 因为引用计数为零,并且一旦它为零,除了我们之外没有其他线程可以增加它(因为我们
1559 // 每次处理一个节点时只有一个线程在运行,即单线程情况),所以我们可以安全地改变
1560 // 节点的 next 指针。然而,一旦引用计数回到零以上,其他线程可能会增加它(这种情况
1561 // 发生在重度争用的情况下,当引用计数在 load 和 try_get 的节点的引用计数增加之间变为零,
1562 // 然后又回到非零值时,其他线程会完成引用计数的增加)——因此,如果将节点添加到实际
1563 // 列表的 CAS 操作失败,则减少引用计数,并将添加操作留给下一个将引用计数恢复到零的线程
1564 // (这可能是我们自己,因此循环)。
1565 auto head = freeListHead.load(std::memory_order_relaxed);
1566 while (true) {
1567 node->freeListNext.store(head, std::memory_order_relaxed);
1568 node->freeListRefs.store(1, std::memory_order_release);
1569 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1570 // 嗯,添加操作失败了,但我们只能在引用计数回到零时再尝试一次
1571 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1572 continue;
1573 }
1574 }
1575 return;
1576 }
1577 }
1578
1579 private:
1580 // 实现方式类似于栈,但节点的顺序不重要(在争用情况下,节点可能会无序插入)
1581 std::atomic<N*> freeListHead;
1582
1583 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1584 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1585
1586#if MCDBGQ_NOLOCKFREE_FREELIST
1587 debug::DebugMutex mutex;
1588#endif
1589 };
1590
1591
1592 ///////////////////////////
1593 // Block
1594 ///////////////////////////
1595
1597
1598 struct Block
1599 {
1602 {
1603#if MCDBGQ_TRACKMEM
1604 owner = nullptr;
1605#endif
1606 }
1607
1608 template<InnerQueueContext context>
1609 inline bool is_empty() const
1610 {
1612 // 检查标志
1613 for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1614 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1615 return false;
1616 }
1617 }
1618
1619 // 啊,空的;确保我们在设置空标志之前完成了所有其他的内存操作
1620 std::atomic_thread_fence(std::memory_order_acquire);
1621 return true;
1622 }
1623 else {
1624 // Check counter
1625 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1626 std::atomic_thread_fence(std::memory_order_acquire);
1627 return true;
1628 }
1629 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1630 return false;
1631 }
1632 }
1633
1634 // 如果块现在为空,则返回 true(在显式上下文中不适用)
1635 template<InnerQueueContext context>
1636 inline bool set_empty(index_t i)
1637 {
1639 // Set flag
1640 assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
1641 emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
1642 return false;
1643 }
1644 else {
1645 // Increment counter
1646 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1647 assert(prevVal < BLOCK_SIZE);
1648 return prevVal == BLOCK_SIZE - 1;
1649 }
1650 }
1651
1652 // 将多个连续的项状态设置为“空”(假设没有环绕,并且计数 > 0)。
1653 // 如果块现在为空,则返回 true(在显式上下文中不适用)。
1654 template<InnerQueueContext context>
1655 inline bool set_many_empty(index_t i, size_t count)
1656 {
1658 // Set flags
1659 std::atomic_thread_fence(std::memory_order_release);
1660 i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
1661 for (size_t j = 0; j != count; ++j) {
1662 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1663 emptyFlags[i + j].store(true, std::memory_order_relaxed);
1664 }
1665 return false;
1666 }
1667 else {
1668 // Increment counter
1669 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1670 assert(prevVal + count <= BLOCK_SIZE);
1671 return prevVal + count == BLOCK_SIZE;
1672 }
1673 }
1674
1675 template<InnerQueueContext context>
1676 inline void set_all_empty()
1677 {
1679 // Set all flags
1680 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1681 emptyFlags[i].store(true, std::memory_order_relaxed);
1682 }
1683 }
1684 else {
1685 // Reset counter
1686 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1687 }
1688 }
1689
1690 template<InnerQueueContext context>
1691 inline void reset_empty()
1692 {
1694 // Reset flags
1695 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1696 emptyFlags[i].store(false, std::memory_order_relaxed);
1697 }
1698 }
1699 else {
1700 // Reset counter
1701 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1702 }
1703 }
1704
1705 inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1706 inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1707
1708 private:
1709 // 重要提示:这必须是 Block 中的第一个成员,以确保如果 T 依赖于 malloc 返回地址的对齐方式,
1710 // 该对齐方式将被保留。显然,clang 在某些情况下为 AVX 指令生成的代码会利用这一假设。
1711 // 理想情况下,我们还应该将 Block 对齐到 T 的对齐方式,以防 T 的对齐要求高于 malloc 的 16 字节对齐,
1712 // 但在跨平台中很难做到这一点。对此情况进行断言:
1713 static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value, "The queue does not support super-aligned types at this time");
1714 // 此外,我们还需要确保 Block 自身的对齐方式是 max_align_t 的倍数,否则在 Block 的末尾将不会添加适当的填充,
1715 // 从而使 Block 数组中的所有元素都能正确对齐(而不仅仅是第一个)。我们使用一个联合体来强制实现这一点。
1716 union {
1717 char elements[sizeof(T) * BLOCK_SIZE];
1719 };
1720 public:
1722 std::atomic<size_t> elementsCompletelyDequeued;
1724 public:
1725 std::atomic<std::uint32_t> freeListRefs;
1726 std::atomic<Block*> freeListNext;
1727 std::atomic<bool> shouldBeOnFreeList;
1728 bool dynamicallyAllocated; // 这个名字可能更好:'isNotPartOfInitialBlockPool'
1729
1730#if MCDBGQ_TRACKMEM
1731 void* owner;
1732#endif
1733 };
1734 static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1735
1736
1737#if MCDBGQ_TRACKMEM
1738public:
1739 struct MemStats;
1740private:
1741#endif
1742
1743 ///////////////////////////
1744 // Producer base
1745 ///////////////////////////
1746
1748 {
1749 ProducerBase(ConcurrentQueue* parent_, bool isExplicit_) :
1750 tailIndex(0),
1751 headIndex(0),
1754 tailBlock(nullptr),
1755 isExplicit(isExplicit_),
1756 parent(parent_)
1757 {
1758 }
1759
1760 virtual ~ProducerBase() { };
1761
1762 template<typename U>
1763 inline bool dequeue(U& element)
1764 {
1765 if (isExplicit) {
1766 return static_cast<ExplicitProducer*>(this)->dequeue(element);
1767 }
1768 else {
1769 return static_cast<ImplicitProducer*>(this)->dequeue(element);
1770 }
1771 }
1772
1773 template<typename It>
1774 inline size_t dequeue_bulk(It& itemFirst, size_t max)
1775 {
1776 if (isExplicit) {
1777 return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1778 }
1779 else {
1780 return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1781 }
1782 }
1783
1784 inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
1785
1786 inline size_t size_approx() const
1787 {
1788 auto tail = tailIndex.load(std::memory_order_relaxed);
1789 auto head = headIndex.load(std::memory_order_relaxed);
1790 return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
1791 }
1792
1793 inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1794 protected:
1795 std::atomic<index_t> tailIndex; // Where to enqueue to next
1796 std::atomic<index_t> headIndex; // Where to dequeue from next
1797
1798 std::atomic<index_t> dequeueOptimisticCount;
1799 std::atomic<index_t> dequeueOvercommit;
1800
1802
1803 public:
1806
1807 protected:
1808#if MCDBGQ_TRACKMEM
1809 friend struct MemStats;
1810#endif
1811 };
1812
1813
1814 ///////////////////////////
1815 // Explicit queue
1816 ///////////////////////////
1817
1819 {
1822 blockIndex(nullptr),
1826 pr_blockIndexEntries(nullptr),
1827 pr_blockIndexRaw(nullptr)
1828 {
1829 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent->initialBlockPoolSize) >> 1;
1830 if (poolBasedIndexSize > pr_blockIndexSize) {
1831 pr_blockIndexSize = poolBasedIndexSize;
1832 }
1833
1834 new_block_index(0); // 这将创建一个具有当前条目数量两倍的索引,即 EXPLICIT_INITIAL_INDEX_SIZE
1835
1836 }
1837
1839 {
1840 // 析构任何尚未出队的元素。
1841 // 由于我们在析构函数中,我们可以假设所有元素
1842 // 要么完全出队,要么完全未出队(没有半途而废的情况)。
1843 // 如果 tailBlock 不是空指针,则必须存在一个块索引
1844 // 首先找到部分出队的块(如果有的话)
1845
1846 if (this->tailBlock != nullptr) { // Note this means there must be a block index too
1847 // First find the block that's partially dequeued, if any
1848 Block* halfDequeuedBlock = nullptr;
1849 if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1850 // 头部不在块边界上,意味着某个块部分出队
1851 // (或者头块是尾块且已完全出队,但头部/尾部仍未在边界上)
1853 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
1854 i = (i + 1) & (pr_blockIndexSize - 1);
1855 }
1856 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
1857 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1858 }
1859
1860 // 从头块开始(注意第一次迭代时,循环的第一行给我们提供了头部)
1861 auto block = this->tailBlock;
1862 do {
1863 block = block->next;
1864 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1865 continue;
1866 }
1867
1868 size_t i = 0; // 块中的偏移
1869 if (block == halfDequeuedBlock) {
1870 i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1871 }
1872
1873 // 遍历块中的所有项目;如果这是尾块,当达到尾索引时需要停止
1874 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1875 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1876 (*block)[i++]->~T();
1877 }
1878 } while (block != this->tailBlock);
1879 }
1880
1881 // 销毁所有我们拥有的块
1882 if (this->tailBlock != nullptr) {
1883 auto block = this->tailBlock;
1884 do {
1885 auto nextBlock = block->next;
1886 if (block->dynamicallyAllocated) {
1887 destroy(block);
1888 }
1889 else {
1890 this->parent->add_block_to_free_list(block);
1891 }
1892 block = nextBlock;
1893 } while (block != this->tailBlock);
1894 }
1895
1896 // 销毁块索引
1897 auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw);
1898 while (header != nullptr) {
1899 auto prev = static_cast<BlockIndexHeader*>(header->prev);
1900 header->~BlockIndexHeader();
1901 (Traits::free)(header);
1902 header = prev;
1903 }
1904 }
1905
1906 template<AllocationMode allocMode, typename U>
1907 inline bool enqueue(U&& element)
1908 {
1909 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1910 index_t newTailIndex = 1 + currentTailIndex;
1911 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1912 // 我们到达了一个块的末尾,开始一个新的块
1913 auto startBlock = this->tailBlock;
1914 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1915 if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1916 // 我们可以重用下一个块,它是空的!
1917 this->tailBlock = this->tailBlock->next;
1918 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1919
1920 // 我们将把块放到块索引中(由于我们先从中移除最后一个块,因此可以保证有空间 —— 除了移除再添加,我们可以直接覆盖)。
1921 // 请注意,这里必须有一个有效的块索引,因为即使在构造函数中分配失败,它也会在将第一个块添加到队列时重新尝试;由于存在这样的块,因此块索引必须成功分配。
1922 }
1923 else {
1924 // 我们看到的头部值大于或等于我们在此处看到的最后一个值(相对而言),并且小于等于其当前值。由于我们有最新的尾部,头部必须小于等于尾部。
1925 auto head = this->headIndex.load(std::memory_order_relaxed);
1926 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1927 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1929 // 我们不能在另一个块中入队,因为没有足够的余地 —— 尾部可能在块填满之前超越头部! (或者,如果第二部分条件为真,我们将超过大小限制。)
1930 return false;
1931 return false;
1932 }
1933 // 我们需要一个新的块;检查块索引是否有空间
1935 // 嗯,圆形块索引已经满了 —— 我们需要分配一个新的索引。请注意,pr_blockIndexRaw 只能为 nullptr,如果初始分配在构造函数中失败的话。
1936 if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
1937 return false;
1938 }
1939 }
1940
1941 // 在圆形链表中插入一个新块
1942 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1943 if (newBlock == nullptr) {
1944 return false;
1945 }
1946#if MCDBGQ_TRACKMEM
1947 newBlock->owner = this;
1948#endif
1949 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1950 if (this->tailBlock == nullptr) {
1951 newBlock->next = newBlock;
1952 }
1953 else {
1954 newBlock->next = this->tailBlock->next;
1955 this->tailBlock->next = newBlock;
1956 }
1957 this->tailBlock = newBlock;
1959 }
1960
1961 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
1962 // 构造函数可能抛出异常。在这种情况下,我们希望元素不出现在队列中(而不会损坏队列):
1964 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1965 }
1966 MOODYCAMEL_CATCH (...) {
1967 // 撤销对当前块的更改,但保留新块以供下次使用
1968 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1969 this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
1971 }
1972 }
1973 else {
1974 (void)startBlock;
1975 (void)originalBlockIndexSlotsUsed;
1976 }
1977
1978 // 将块添加到块索引中
1979 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1980 entry.base = currentTailIndex;
1981 entry.block = this->tailBlock;
1982 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1984
1985 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
1986 this->tailIndex.store(newTailIndex, std::memory_order_release);
1987 return true;
1988 }
1989 }
1990
1991 // 入队
1992 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1993
1994 this->tailIndex.store(newTailIndex, std::memory_order_release);
1995 return true;
1996 }
1997
1998 template<typename U>
1999 bool dequeue(U& element)
2000 {
2001 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2002 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2003 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2004 // 可能有东西要出队,让我们试试看
2005
2006 // 注意,这个if仅仅是为了提高性能,在队列为空且值最终一致的常见情况下
2007 // 我们可能会错误地进入这里。
2008
2009 // 注意,无论overcommit和tail的值如何,它们都不会改变(除非我们改变它们)
2010 // 并且在这里的if内部时,它们的值必须与if条件被评估时相同。
2011
2012 // 在此处插入一个获取屏障,以与下面增加dequeueOvercommit的释放同步。
2013 // 这确保了无论我们加载到overcommit中的值是什么,下面fetch_add中
2014 // 加载的dequeueOptimisticCount的值至少是那个值的最新值(因此至少一样大)。
2015 // 注意,我相信此处的编译器(信号)屏障是足够的,因为fetch_add的性质
2016 // (所有读-修改-写操作都保证在修改顺序中对最新值起作用),但不幸的是,
2017 // 仅使用C++11标准无法证明这一点是正确的。
2018 // 参见 http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
2019 std::atomic_thread_fence(std::memory_order_acquire);
2020
2021 // 增加乐观计数器,然后检查它是否超出了边界
2022 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2023
2024 // 注意,由于dequeueOvercommit必须小于等于dequeueOptimisticCount(因为dequeueOvercommit只会在
2025 // dequeueOptimisticCount之后增加——这是在下面的`else`块中强制执行的),并且由于我们现在有一个
2026 // 至少与overcommit一样新的dequeueOptimisticCount版本(由于增加dequeueOvercommit的释放以及
2027 // 与其同步的获取),所以overcommit <= myDequeueCount。
2028 // 但是我们不能断言这一点,因为dequeueOptimisticCount和dequeueOvercommit都可能(独立地)溢出;
2029 // 在这种情况下,逻辑仍然成立,因为两者之间的差异得以保持。
2030
2031 // 注意我们在这里重新加载tail以防它发生了变化;它将与之前的值相同或更大,因为
2032 // 这个加载是排在(发生在)上面的先前加载之后的。这由读取-读取一致性支持
2033 // (如标准中定义的),详见:http://en.cppreference.com/w/cpp/atomic/memory_order
2034 tail = this->tailIndex.load(std::memory_order_acquire);
2035 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2036 // 保证至少有一个元素要出队!
2037
2038 // 获取索引。注意,由于保证至少有一个元素,这
2039 // 将永远不会超过tail。我们需要在这里做一个获取-释放屏障,
2040 // 因为可能导致我们到达此点的条件是先前入队的元素(我们已经看到它的内存效应),
2041 // 但到我们增加时可能有人已将其增加,我们需要看到*那个*元素的内存效应,
2042 // 在这种情况下,该元素的内存效应在以更当前的条件首先增加它的线程上是可见的
2043 // (他们必须获取一个至少与最近一样新的tail)。
2044 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2045
2046
2047 // 确定元素在哪个块中
2048
2049 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2050 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2051
2052 // 我们在这里需要小心减法和除法,因为索引的环绕。
2053 // 当索引环绕时,我们在将其除以块大小时需要保持偏移的符号
2054 // (以便在所有情况下获得正确的有符号块计数偏移):
2055 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2056 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
2057 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / BLOCK_SIZE);
2058 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
2059
2060 // Dequeue
2061 auto& el = *((*block)[index]);
2062 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2063 // 确保即使赋值操作抛出异常,元素仍能完全出队并被销毁
2064 struct Guard {
2065 Block* block;
2066 index_t index;
2067 // 析构函数,当 Guard 离开作用域时调用
2068 ~Guard()
2069 {
2070 (*block)[index]->~T();
2071 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2072 }
2073 } guard = { block, index };
2074
2075 element = std::move(el);
2076 }
2077 else {
2078 element = std::move(el);
2079 el.~T();
2080 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2081 }
2082
2083 return true;
2084 }
2085 else {
2086 // 如果实际上没有元素可以出队,则更新出队过度计数,使其最终与实际一致
2087 this->dequeueOvercommit.fetch_add(1, std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
2088 }
2089 }
2090
2091 return false;
2092 }
2093
2094 template<AllocationMode allocMode, typename It>
2095 bool enqueue_bulk(It itemFirst, size_t count)
2096 {
2097 // 首先,我们需要确保有足够的空间来入队所有元素;
2098 // 这意味着需要预先分配块,并将它们放入块索引中(前提是所有分配都成功)。
2099 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2100 auto startBlock = this->tailBlock;
2101 auto originalBlockIndexFront = pr_blockIndexFront;
2102 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2103
2104 Block* firstAllocatedBlock = nullptr;
2105
2106 // 计算需要分配多少块,并进行分配
2107 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2108 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2109 if (blockBaseDiff > 0) {
2110 // 尽可能从现有的块链表中分配块
2111 while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2112 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2113 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2114
2115 this->tailBlock = this->tailBlock->next;
2116 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2117
2118 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2119 entry.base = currentTailIndex;
2120 entry.block = this->tailBlock;
2122 }
2123
2124 // 如果需要,继续从块池中分配新的块
2125 while (blockBaseDiff > 0) {
2126 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2127 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2128
2129 auto head = this->headIndex.load(std::memory_order_relaxed);
2130 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2131 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2132 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2133 if (allocMode == CannotAlloc || full || !new_block_index(originalBlockIndexSlotsUsed)) {
2134 // Failed to allocate, undo changes (but keep injected blocks)
2135 pr_blockIndexFront = originalBlockIndexFront;
2136 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2137 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2138 return false;
2139 }
2140 // pr_blockIndexFront 在 new_block_index 内部被更新,因此我们也需要更新备用值(因为即使后来失败,我们仍然保留新的索引)
2141 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2142 }
2143
2144 // 在循环链表中插入新块
2145 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2146 if (newBlock == nullptr) {
2147 pr_blockIndexFront = originalBlockIndexFront;
2148 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2149 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2150 return false;
2151 }
2152// 如果定义了MCDBGQ_TRACKMEM这个宏,则执行下面的代码,将新块的所有者设置为当前对象(this通常指代当前类的实例)
2153#if MCDBGQ_TRACKMEM
2154 newBlock->owner = this;
2155#endif
2156// 调用ConcurrentQueue::Block的模板函数set_all_empty<explicit_context>()来设置新块所有相关内容为空(具体功能取决于该模板函数的实现)
2157 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2158 if (this->tailBlock == nullptr) {
2159 newBlock->next = newBlock;
2160 }
2161// 如果尾块不为空,将新块插入到尾块后面,更新链表指针关系
2162 else {
2163 newBlock->next = this->tailBlock->next;
2164 this->tailBlock->next = newBlock;
2165 }
2166// 更新尾块指针,使其指向新插入的块
2167 this->tailBlock = newBlock;
2168 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2169// 已使用的块索引槽数量加1
2171
2172 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2173// 设置该元素的起始索引(base)为当前尾索引(currentTailIndex)
2174 entry.base = currentTailIndex;
2175// 设置该元素对应的块为当前尾块(this->tailBlock)
2176 entry.block = this->tailBlock;
2177// 更新前面使用的块索引槽位置,通过循环计数的方式(按位与操作保证在一定范围内循环)
2179 }
2180
2181 // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2182 // publish the new block index front
2183 auto block = firstAllocatedBlock;
2184 while (true) {
2185 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2186// 如果当前块就是尾块,说明遍历完所有需要处理的块了,跳出循环
2187 if (block == this->tailBlock) {
2188 break;
2189 }
2190// 移动到下一个块继续处理
2191 block = block->next;
2192 }
2193
2194 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2195 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2196 }
2197 }
2198
2199 // 一次入队一个块
2200 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2201 currentTailIndex = startTailIndex;
2202 auto endBlock = this->tailBlock;
2203 this->tailBlock = startBlock;
2204// 断言一些条件,比如起始尾索引按位与块大小减1的结果不为0或者第一个已分配块不为空或者入队数量为0,用于保证数据的合理性
2205 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2206 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2207 this->tailBlock = firstAllocatedBlock;
2208 }
2209 while (true) {
2210// 计算当前块的结束索引(按位与操作结合块大小计算)
2211 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2212 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2213 stopIndex = newTailIndex;
2214 }
2215 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2216 while (currentTailIndex != stopIndex) {
2217 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2218 }
2219 }
2220 else {
2222 while (currentTailIndex != stopIndex) {
2223 // 即使存在移动构造函数,也必须使用拷贝构造函数
2224 // 因为如果发生异常,我们可能需要进行恢复。
2225 // 对于下一行模板化代码很抱歉,但这是唯一能在编译时禁用移动构造的方式
2226 // 这很重要,因为一个类型可能只定义了一个(noexcept)移动构造函数,
2227 // 因此即使它们在一个永远不会被执行的 if 分支中,调用拷贝构造函数也无法编译。
2228 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2229 ++currentTailIndex;
2230 ++itemFirst;
2231 }
2232 }
2233 MOODYCAMEL_CATCH (...) {
2234 // 哎呀,抛出了异常——销毁已排队的元素
2235 // 并恢复整个批量操作(不过我们会保留任何已分配的块以备后用)。
2236 auto constructedStopIndex = currentTailIndex;
2237 auto lastBlockEnqueued = this->tailBlock;
2238
2239 pr_blockIndexFront = originalBlockIndexFront;
2240 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2241 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2242
2244 auto block = startBlock;
2245 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2246 block = firstAllocatedBlock;
2247 }
2248 currentTailIndex = startTailIndex;
2249 while (true) {
2250 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2251 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2252 stopIndex = constructedStopIndex;
2253 }
2254 while (currentTailIndex != stopIndex) {
2255 (*block)[currentTailIndex++]->~T();
2256 }
2257 if (block == lastBlockEnqueued) {
2258 break;
2259 }
2260 block = block->next;
2261 }
2262 }
2264 }
2265 }
2266
2267 if (this->tailBlock == endBlock) {
2268 assert(currentTailIndex == newTailIndex);
2269 break;
2270 }
2271 this->tailBlock = this->tailBlock->next;
2272 }
2273
2274 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst))) && firstAllocatedBlock != nullptr) {
2275 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2276 }
2277
2278 this->tailIndex.store(newTailIndex, std::memory_order_release);
2279 return true;
2280 }
2281
2282 template<typename It>
2283 size_t dequeue_bulk(It& itemFirst, size_t max)
2284 {
2285 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2286 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2287 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2288 if (details::circular_less_than<size_t>(0, desiredCount)) {
2289 desiredCount = desiredCount < max ? desiredCount : max;
2290 std::atomic_thread_fence(std::memory_order_acquire);
2291
2292 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);;
2293
2294 tail = this->tailIndex.load(std::memory_order_acquire);
2295 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2296 if (details::circular_less_than<size_t>(0, actualCount)) {
2297 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2298 if (actualCount < desiredCount) {
2299 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2300 }
2301
2302 // 获取第一个索引。注意,由于保证至少有 actualCount 个元素,这
2303 // 不会超过 tail。
2304 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2305
2306 // 确定第一个元素所在的块
2307 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2308 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2309
2310 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2311 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2312 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
2313 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2314
2315 // Iterate the blocks and dequeue
2316 auto index = firstIndex;
2317 do {
2318 auto firstIndexInBlock = index;
2319 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2320 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2321 auto block = localBlockIndex->entries[indexIndex].block;
2322 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2323 while (index != endIndex) {
2324 auto& el = *((*block)[index]);
2325 *itemFirst++ = std::move(el);
2326 el.~T();
2327 ++index;
2328 }
2329 }
2330 else {
2332 while (index != endIndex) {
2333 auto& el = *((*block)[index]);
2334 *itemFirst = std::move(el);
2335 ++itemFirst;
2336 el.~T();
2337 ++index;
2338 }
2339 }
2340 MOODYCAMEL_CATCH (...) {
2341 // 由于已为时已晚,无法恢复出队操作,但我们可以确保所有已出队的对象
2342 // 被正确销毁,并且块索引(以及空闲计数)在传播异常之前被正确更新。
2343 do {
2344 block = localBlockIndex->entries[indexIndex].block;
2345 while (index != endIndex) {
2346 (*block)[index++]->~T();
2347 }
2348 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2349 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2350
2351 firstIndexInBlock = index;
2352 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2353 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2354 } while (index != firstIndex + actualCount);
2355
2357 }
2358 }
2359 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2360 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2361 } while (index != firstIndex + actualCount);
2362
2363 return actualCount;
2364 }
2365 else {
2366 // 实际上没有任何东西可以出队;使有效的出队计数最终保持一致
2367 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2368 }
2369 }
2370
2371 return 0;
2372 }
2373
2374 private:
2380
2382 {
2383 size_t size;
2384 std::atomic<size_t> front; // 当前槽(而不是下一个槽,如 pr_blockIndexFront)
2385
2386
2388 void* prev;
2389 };
2390
2391
2392 bool new_block_index(size_t numberOfFilledSlotsToExpose)
2393 {
2394 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2395
2396 // Create the new block
2397 pr_blockIndexSize <<= 1;
2398 auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize));
2399 if (newRawPtr == nullptr) {
2400 pr_blockIndexSize >>= 1; / 重置以允许优雅地重试
2401 return false;
2402 }
2403
2404 auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader)));
2405
2406 // 复制所有旧的索引,如果有的话
2407 size_t j = 0;
2408 if (pr_blockIndexSlotsUsed != 0) {
2409 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2410 do {
2411 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2412 i = (i + 1) & prevBlockSizeMask;
2413 } while (i != pr_blockIndexFront);
2414 }
2415
2416 // Update everything
2417 auto header = new (newRawPtr) BlockIndexHeader;
2418 header->size = pr_blockIndexSize;
2419 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2420 header->entries = newBlockIndexEntries;
2421 header->prev = pr_blockIndexRaw; // 将新块链接到旧块,以便后续可以释放旧块
2422
2423 // 更新指向新块索引的指针
2425 pr_blockIndexEntries = newBlockIndexEntries;
2426 pr_blockIndexRaw = newRawPtr;
2427 blockIndex.store(header, std::memory_order_release);
2428
2429 return true;
2430 }
2431
2432 private:
2433 std::atomic<BlockIndexHeader*> blockIndex;
2434
2435 // 仅供生产者使用 —— 消费者必须使用由 blockIndex 引用的那些
2438 size_t pr_blockIndexFront; // 下一个槽位(而非当前槽位)
2441
2442#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2443 public:
2444 ExplicitProducer* nextExplicitProducer;
2445 private:
2446#endif
2447
2448#if MCDBGQ_TRACKMEM
2449 friend struct MemStats;
2450#endif
2451 };
2452
2453
2454 //////////////////////////////////
2455 // Implicit queue
2456 //////////////////////////////////
2457
2459 {
2467
2469 {
2470 // 请注意,由于我们在析构函数中,我们可以假设所有的入队/出队操作已经完成;
2471 // 这意味着所有未出队的元素都被连续地放置在相邻的块中,
2472 // 并且只有第一个和最后一个剩余的块可能是部分空的(所有其他剩余的块必须是完全满的)。
2473
2474#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2475
2476 // 注销我们自己以便接收线程终止通知
2477 if (!this->inactive.load(std::memory_order_relaxed)) {
2478 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2479 }
2480#endif
2481
2482 // 销毁所有剩余的元素!
2483 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2484 auto index = this->headIndex.load(std::memory_order_relaxed);
2485 Block* block = nullptr;
2486 assert(index == tail || details::circular_less_than(index, tail));
2487 bool forceFreeLastBlock = index != tail; // If we enter the loop, then the last (tail) block will not be freed
2488 while (index != tail) {
2489 if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
2490 if (block != nullptr) {
2491 // Free the old block
2492 this->parent->add_block_to_free_list(block);
2493 }
2494
2495 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2496 }
2497
2498 ((*block)[index])->~T();
2499 ++index;
2500 }
2501 // 即使队列为空,仍然会有一个块不在空闲列表中
2502 // (除非头索引已经到达块的末尾,在这种情况下尾部将准备创建一个新的块)。
2503 if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2504 this->parent->add_block_to_free_list(this->tailBlock);
2505 }
2506
2507 //销毁块索引
2508 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2509 if (localBlockIndex != nullptr) {
2510 for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2511 localBlockIndex->index[i]->~BlockIndexEntry();
2512 }
2513 do {
2514 auto prev = localBlockIndex->prev;
2515 localBlockIndex->~BlockIndexHeader();
2516 (Traits::free)(localBlockIndex);
2517 localBlockIndex = prev;
2518 } while (localBlockIndex != nullptr);
2519 }
2520 }
2521
2522 template<AllocationMode allocMode, typename U>
2523 inline bool enqueue(U&& element)
2524 {
2525 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2526 index_t newTailIndex = 1 + currentTailIndex;
2527 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2528 // 我们到达了一个块的末尾,开始一个新的块
2529 auto head = this->headIndex.load(std::memory_order_relaxed);
2530 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2531 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2532 return false;
2533 }
2534#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2535 debug::DebugLock lock(mutex);
2536#endif
2537 // Find out where we'll be inserting this block in the block index
2538 BlockIndexEntry* idxEntry;
2539 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2540 return false;
2541 }
2542
2543 // Get ahold of a new block
2544 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2545 if (newBlock == nullptr) {
2547 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2548 return false;
2549 }
2550#if MCDBGQ_TRACKMEM
2551 newBlock->owner = this;
2552#endif
2553 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2554
2555 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
2556 // 可能会抛出异常,尝试现在插入数据,先于我们发布新块的事实
2558 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2559 }
2560 MOODYCAMEL_CATCH (...) {
2562 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2563 this->parent->add_block_to_free_list(newBlock);
2565 }
2566 }
2567
2568 // Insert the new block into the index
2569 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2570
2571 this->tailBlock = newBlock;
2572
2573 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) {
2574 this->tailIndex.store(newTailIndex, std::memory_order_release);
2575 return true;
2576 }
2577 }
2578
2579 // Enqueue
2580 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2581
2582 this->tailIndex.store(newTailIndex, std::memory_order_release);
2583 return true;
2584 }
2585
2586 template<typename U>
2587 bool dequeue(U& element)
2588 {
2589 // 请参阅 ExplicitProducer::dequeue 以了解原因和解释
2590 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2591 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2592 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2593 std::atomic_thread_fence(std::memory_order_acquire);
2594
2595 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2596 tail = this->tailIndex.load(std::memory_order_acquire);
2597 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2598 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2599
2600 // 确定元素所在的块
2601 auto entry = get_block_index_entry_for_index(index);
2602
2603 // Dequeue
2604 auto block = entry->value.load(std::memory_order_relaxed);
2605 auto& el = *((*block)[index]);
2606
2607 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2608#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2609 // 注意:每次 dequeue 时都获取互斥锁,而不是仅在释放块时获取,这非常不理想,但毕竟这只是纯调试代码。
2610 debug::DebugLock lock(producer->mutex);
2611#endif
2612 struct Guard {
2613 Block* block;
2614 index_t index;
2615 BlockIndexEntry* entry;
2616 ConcurrentQueue* parent;
2617
2618 ~Guard()
2619 {
2620 (*block)[index]->~T();
2621 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2622 entry->value.store(nullptr, std::memory_order_relaxed);
2623 parent->add_block_to_free_list(block);
2624 }
2625 }
2626 } guard = { block, index, entry, this->parent };
2627
2628 element = std::move(el);
2629 }
2630 else {
2631 element = std::move(el);
2632 el.~T();
2633
2634 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2635 {
2636#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2637 debug::DebugLock lock(mutex);
2638#endif
2639 // Add the block back into the global free pool (and remove from block index)
2640 entry->value.store(nullptr, std::memory_order_relaxed);
2641 }
2642 this->parent->add_block_to_free_list(block); // releases the above store
2643 }
2644 }
2645
2646 return true;
2647 }
2648 else {
2649 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2650 }
2651 }
2652
2653 return false;
2654 }
2655
2656 template<AllocationMode allocMode, typename It>
2657 bool enqueue_bulk(It itemFirst, size_t count)
2658 {
2659 // 首先,我们需要确保有足够的空间来入队所有元素;
2660 // 这意味着需要预先分配块,并将它们放入块索引中(但只有在所有分配成功的情况下才这么做)。
2661
2662 // 请注意,我们开始时的 tailBlock 可能不再由我们拥有;
2663 // 这种情况发生在当它被填满至顶部(将 tailIndex 设置为尚未分配的下一个块的第一个索引),
2664 // 然后在我们再次入队之前被完全出队(将其放入空闲列表中)。
2665
2666 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2667 auto startBlock = this->tailBlock;
2668 Block* firstAllocatedBlock = nullptr;
2669 auto endBlock = this->tailBlock;
2670
2671 // 确定我们需要分配多少块,并进行分配
2672 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2673 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2674 if (blockBaseDiff > 0) {
2675#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2676 debug::DebugLock lock(mutex);
2677#endif
2678 do {
2679 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2680 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2681
2682 // 确定我们将在块索引中插入此块的位置
2683 BlockIndexEntry* idxEntry = nullptr; // 这里的初始化是不必要的,但编译器并不总能判断出来
2684 Block* newBlock;
2685 bool indexInserted = false;
2686 auto head = this->headIndex.load(std::memory_order_relaxed);
2687 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2688 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2689 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) {
2690 // 索引分配或块分配失败;撤销目前为止已完成的其他分配
2691 // 和索引插入操作
2692 if (indexInserted) {
2694 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2695 }
2696 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2697 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2698 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2699 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2700 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2702 }
2703 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2704 this->tailBlock = startBlock;
2705
2706 return false;
2707 }
2708
2709#if MCDBGQ_TRACKMEM
2710 newBlock->owner = this;
2711#endif
2712 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2713 newBlock->next = nullptr;
2714
2715 // 将新块插入到索引中
2716 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2717
2718 // 存储块链,以便在后续分配失败时可以撤销,
2719 // 并且在实际入队时可以找到这些块
2720 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) {
2721 assert(this->tailBlock != nullptr);
2722 this->tailBlock->next = newBlock;
2723 }
2724 this->tailBlock = newBlock;
2725 endBlock = newBlock;
2726 firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2727 } while (blockBaseDiff > 0);
2728 }
2729
2730 // Enqueue, one block at a time
2731 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2732 currentTailIndex = startTailIndex;
2733 this->tailBlock = startBlock;
2734 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2735 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2736 this->tailBlock = firstAllocatedBlock;
2737 }
2738 while (true) {
2739 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2740 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2741 stopIndex = newTailIndex;
2742 }
2743 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2744 while (currentTailIndex != stopIndex) {
2745 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2746 }
2747 }
2748 else {
2750 while (currentTailIndex != stopIndex) {
2751 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2752 ++currentTailIndex;
2753 ++itemFirst;
2754 }
2755 }
2756 MOODYCAMEL_CATCH (...) {
2757 auto constructedStopIndex = currentTailIndex;
2758 auto lastBlockEnqueued = this->tailBlock;
2759
2761 auto block = startBlock;
2762 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2763 block = firstAllocatedBlock;
2764 }
2765 currentTailIndex = startTailIndex;
2766 while (true) {
2767 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2768 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2769 stopIndex = constructedStopIndex;
2770 }
2771 while (currentTailIndex != stopIndex) {
2772 (*block)[currentTailIndex++]->~T();
2773 }
2774 if (block == lastBlockEnqueued) {
2775 break;
2776 }
2777 block = block->next;
2778 }
2779 }
2780
2781 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2782 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2783 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2784 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2785 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2787 }
2788 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2789 this->tailBlock = startBlock;
2791 }
2792 }
2793
2794 if (this->tailBlock == endBlock) {
2795 assert(currentTailIndex == newTailIndex);
2796 break;
2797 }
2798 this->tailBlock = this->tailBlock->next;
2799 }
2800 this->tailIndex.store(newTailIndex, std::memory_order_release);
2801 return true;
2802 }
2803
2804 template<typename It>
2805 size_t dequeue_bulk(It& itemFirst, size_t max)
2806 {
2807 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2808 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2809 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2810 if (details::circular_less_than<size_t>(0, desiredCount)) {
2811 desiredCount = desiredCount < max ? desiredCount : max;
2812 std::atomic_thread_fence(std::memory_order_acquire);
2813
2814 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2815
2816 tail = this->tailIndex.load(std::memory_order_acquire);
2817 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2818 if (details::circular_less_than<size_t>(0, actualCount)) {
2819 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2820 if (actualCount < desiredCount) {
2821 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2822 }
2823
2824 // 获取第一个索引。请注意,由于保证至少有 actualCount 个元素,这个值永远不会超过 tail。
2825
2826 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2827
2828 // Iterate the blocks and dequeue
2829 auto index = firstIndex;
2830 BlockIndexHeader* localBlockIndex;
2831 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2832 do {
2833 auto blockStartIndex = index;
2834 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2835 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2836
2837 auto entry = localBlockIndex->index[indexIndex];
2838 auto block = entry->value.load(std::memory_order_relaxed);
2839 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2840 while (index != endIndex) {
2841 auto& el = *((*block)[index]);
2842 *itemFirst++ = std::move(el);
2843 el.~T();
2844 ++index;
2845 }
2846 }
2847 else {
2849 while (index != endIndex) {
2850 auto& el = *((*block)[index]);
2851 *itemFirst = std::move(el);
2852 ++itemFirst;
2853 el.~T();
2854 ++index;
2855 }
2856 }
2857 MOODYCAMEL_CATCH (...) {
2858 do {
2859 entry = localBlockIndex->index[indexIndex];
2860 block = entry->value.load(std::memory_order_relaxed);
2861 while (index != endIndex) {
2862 (*block)[index++]->~T();
2863 }
2864
2865 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2866#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2867 debug::DebugLock lock(mutex);
2868#endif
2869 entry->value.store(nullptr, std::memory_order_relaxed);
2870 this->parent->add_block_to_free_list(block);
2871 }
2872 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2873
2874 blockStartIndex = index;
2875 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2876 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2877 } while (index != firstIndex + actualCount);
2878
2880 }
2881 }
2882 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2883 {
2884#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2885 debug::DebugLock lock(mutex);
2886#endif
2887 // 请注意,上面的 set_many_empty 执行了释放操作,这意味着任何获得我们即将释放的块的人
2888 // 都可以安全地使用它,因为我们的写操作(和读取操作!)在此之前已经完成。
2889 entry->value.store(nullptr, std::memory_order_relaxed);
2890 }
2891 this->parent->add_block_to_free_list(block); // releases the above store
2892 }
2893 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2894 } while (index != firstIndex + actualCount);
2895
2896 return actualCount;
2897 }
2898 else {
2899 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2900 }
2901 }
2902
2903 return 0;
2904 }
2905
2906 private:
2907 // 块大小必须大于 1,因此任何低位比特被设置的数字都是无效的块基索引
2908 static const index_t INVALID_BLOCK_BASE = 1;
2909
2911 {
2912 std::atomic<index_t> key;
2913 std::atomic<Block*> value;
2914 };
2915
2924
2925 template<AllocationMode allocMode>
2926 inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
2927 {
2928 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
2929 if (localBlockIndex == nullptr) {
2930 return false; // this can happen if new_block_index failed in the constructor
2931 }
2932 auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2933 idxEntry = localBlockIndex->index[newTail];
2934 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2935 idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
2936
2937 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2938 localBlockIndex->tail.store(newTail, std::memory_order_release);
2939 return true;
2940 }
2941
2942 // 旧的块索引没有空间,尝试分配另一个块索引!
2943 if (allocMode == CannotAlloc || !new_block_index()) {
2944 return false;
2945 }
2946 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2947 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2948 idxEntry = localBlockIndex->index[newTail];
2949 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2950 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2951 localBlockIndex->tail.store(newTail, std::memory_order_release);
2952 return true;
2953 }
2954
2956 {
2957 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2958 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2959 }
2960
2962 {
2963 BlockIndexHeader* localBlockIndex;
2964 auto idx = get_block_index_index_for_index(index, localBlockIndex);
2965 return localBlockIndex->index[idx];
2966 }
2967
2968 inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const
2969 {
2970#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2971 debug::DebugLock lock(mutex);
2972#endif
2973 index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2974 localBlockIndex = blockIndex.load(std::memory_order_acquire);
2975 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2976 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2977 assert(tailBase != INVALID_BLOCK_BASE);
2978 // 注意:必须使用除法而不是位移,因为索引可能会回绕,导致负偏移,我们希望保留这个负值
2979 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / BLOCK_SIZE);
2980 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2981 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
2982 return idx;
2983 }
2984
2986 {
2987 auto prev = blockIndex.load(std::memory_order_relaxed);
2988 size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
2989 auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
2990 auto raw = static_cast<char*>((Traits::malloc)(
2991 sizeof(BlockIndexHeader) +
2992 std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount +
2993 std::alignment_of<BlockIndexEntry*>::value - 1 + sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
2994 if (raw == nullptr) {
2995 return false;
2996 }
2997
2998 auto header = new (raw) BlockIndexHeader;
2999 auto entries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader)));
3000 auto index = reinterpret_cast<BlockIndexEntry**>(details::align_for<BlockIndexEntry*>(reinterpret_cast<char*>(entries) + sizeof(BlockIndexEntry) * entryCount));
3001 if (prev != nullptr) {
3002 auto prevTail = prev->tail.load(std::memory_order_relaxed);
3003 auto prevPos = prevTail;
3004 size_t i = 0;
3005 do {
3006 prevPos = (prevPos + 1) & (prev->capacity - 1);
3007 index[i++] = prev->index[prevPos];
3008 } while (prevPos != prevTail);
3009 assert(i == prevCapacity);
3010 }
3011 for (size_t i = 0; i != entryCount; ++i) {
3012 new (entries + i) BlockIndexEntry;
3013 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
3014 index[prevCapacity + i] = entries + i;
3015 }
3016 header->prev = prev;
3017 header->entries = entries;
3018 header->index = index;
3019 header->capacity = nextBlockIndexCapacity;
3020 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
3021
3022 blockIndex.store(header, std::memory_order_release);
3023
3025
3026 return true;
3027 }
3028
3029 private:
3031 std::atomic<BlockIndexHeader*> blockIndex;
3032
3033#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3034 public:
3035 details::ThreadExitListener threadExitListener;
3036 private:
3037#endif
3038
3039#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3040 public:
3041 ImplicitProducer* nextImplicitProducer;
3042 private:
3043#endif
3044
3045#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3046 mutable debug::DebugMutex mutex;
3047#endif
3048#if MCDBGQ_TRACKMEM
3049 friend struct MemStats;
3050#endif
3051 };
3052
3053
3054 //////////////////////////////////
3055 // Block pool manipulation
3056 //////////////////////////////////
3057
3058 void populate_initial_block_list(size_t blockCount)
3059 {
3060 initialBlockPoolSize = blockCount;
3061 if (initialBlockPoolSize == 0) {
3062 initialBlockPool = nullptr;
3063 return;
3064 }
3065
3066 initialBlockPool = create_array<Block>(blockCount);
3067 if (initialBlockPool == nullptr) {
3069 }
3070 for (size_t i = 0; i < initialBlockPoolSize; ++i) {
3072 }
3073 }
3074
3076 {
3077 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3078 return nullptr;
3079 }
3080
3081 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3082
3083 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
3084 }
3085
3086 inline void add_block_to_free_list(Block* block)
3087 {
3088#if MCDBGQ_TRACKMEM
3089 block->owner = nullptr;
3090#endif
3091 freeList.add(block);
3092 }
3093
3094 inline void add_blocks_to_free_list(Block* block)
3095 {
3096 while (block != nullptr) {
3097 auto next = block->next;
3099 block = next;
3100 }
3101 }
3102
3104 {
3105 return freeList.try_get();
3106 }
3107
3108 // 从内存池中获取一个空闲块,或分配一个新块(如果适用)
3109 template<AllocationMode canAlloc>
3111 {
3112 auto block = try_get_block_from_initial_pool();
3113 if (block != nullptr) {
3114 return block;
3115 }
3116
3118 if (block != nullptr) {
3119 return block;
3120 }
3121
3122 if (canAlloc == CanAlloc) {
3123 return create<Block>();
3124 }
3125
3126 return nullptr;
3127 }
3128
3129
3130#if MCDBGQ_TRACKMEM
3131 public:
3132// 定义一个名为MemStats的结构体,用于统计内存相关的各种信息
3133 struct MemStats {
3134 // 已分配的块数量
3135 size_t allocatedBlocks;
3136 // 已分配的块数量
3137 size_t usedBlocks;
3138 // 空闲的块数量
3139 size_t freeBlocks;
3140 // 显式拥有的块数量
3141 size_t ownedBlocksExplicit;
3142 // 显式拥有的块数量
3143 size_t ownedBlocksImplicit;
3144// 隐式生产者的数量
3145 size_t implicitProducers;
3146// 隐式生产者的数量
3147 size_t explicitProducers;
3148// 入队元素的数量
3149 size_t elementsEnqueued;
3150 // 块类所占用的字节数
3151 size_t blockClassBytes;
3152 // 队列类所占用的字节数
3153 size_t queueClassBytes;
3154 // 隐式块索引所占用的字节数
3155 size_t implicitBlockIndexBytes;
3156// 显式块索引所占用的字节数
3157 size_t explicitBlockIndexBytes;
3158 // 声明ConcurrentQueue类为友元类,意味着ConcurrentQueue类可以访问MemStats的私有成员
3159 friend class ConcurrentQueue;
3160
3161 private:
3162 // 静态成员函数,用于获取给定ConcurrentQueue对象的内存统计信息,参数为指向ConcurrentQueue的指针
3163 static MemStats getFor(ConcurrentQueue* q)
3164 {
3165// 创建一个MemStats结构体实例,并将所有成员初始化为0
3166 MemStats stats = { 0 };
3167 // 获取队列中大约的元素数量,并赋值给stats的elementsEnqueued成员,这里size_approx()应该是ConcurrentQueue类提供的用于估算队列元素个数的函数
3168 stats.elementsEnqueued = q->size_approx();
3169 // 获取队列空闲链表的头节点,这里假设freeList是ConcurrentQueue类中用于管理空闲块链表的数据成员,head_unsafe()用于获取头节点(可能是一种非线程安全的获取方式,具体取决于实现)
3170 auto block = q->freeList.head_unsafe();
3171 // 循环遍历空闲链表,直到遍历到链表末尾(节点为nullptr表示链表结束)
3172 while (block != nullptr) {
3173 // 已分配块数量加1,因为当前遍历到的是一个已分配的空闲块
3174 ++stats.allocatedBlocks;
3175 // 空闲块数量加1,当前块处于空闲状态
3176 ++stats.freeBlocks;
3177 // 获取下一个空闲块节点,通过原子加载操作(memory_order_relaxed表示一种较宽松的内存顺序要求,常用于性能优先的场景)获取下一个节点指针
3178 block = block->freeListNext.load(std::memory_order_relaxed);
3179 }
3180 // 加载队列生产者链表的尾节点,使用memory_order_acquire内存顺序保证获取到的是其他线程已完成写入的最新值,用于后续遍历生产者链表
3181 for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3182 // 通过动态类型转换判断当前生产者指针指向的是否是隐式生产者(ImplicitProducer类型),如果转换成功(不为nullptr)则表示是隐式生产者
3183 bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3184// 如果是隐式生产者,隐式生产者数量加1
3185 stats.implicitProducers += implicit ? 1 : 0;
3186 // 如果不是隐式生产者(即显式生产者),显式生产者数量加1
3187 stats.explicitProducers += implicit ? 0 : 1;
3188 // 如果是隐式生产者,进入以下逻辑进行相关统计信息的更新
3189 if (implicit) {
3190 // 将ptr指针转换为ImplicitProducer*类型,以便后续访问ImplicitProducer类相关的成员变量和函数
3191 auto prod = static_cast<ImplicitProducer*>(ptr);
3192// 累加ImplicitProducer类型对象所占用的字节数到queueClassBytes成员,用于统计队列类相关的内存占用情况
3193 stats.queueClassBytes += sizeof(ImplicitProducer);
3194// 原子加载隐式生产者的头索引,同样使用memory_order_relaxed内存顺序
3195 auto head = prod->headIndex.load(std::memory_order_relaxed);
3196 // 原子加载隐式生产者的尾索引
3197 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3198// 原子加载隐式生产者的块索引(这里假设是一个指向某种数据结构用于管理块索引的指针)
3199 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3200 // 如果块索引指针不为nullptr,说明存在块索引相关的数据结构,进入以下循环处理逻辑
3201 if (hash != nullptr) {
3202// 循环遍历块索引数据结构中每个索引位置(假设index是一个数组或者类似可遍历的数据结构)
3203 for (size_t i = 0; i != hash->capacity; ++i) {
3204 // 检查当前索引位置对应的块索引条目的键是否不等于无效块基值(这里INVALID_BLOCK_BASE应该是ImplicitProducer类中定义的表示无效块的一个常量之类的),并且对应的值指针不为nullptr,表示该块是有效的已分配块
3205 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3206 // 已分配块数量加1,因为找到了一个有效的已分配块
3207 ++stats.allocatedBlocks;
3208 // 隐式拥有的块数量加1,因为这是隐式生产者拥有的有效块
3209 ++stats.ownedBlocksImplicit;
3210 }
3211 }
3212 // 累加隐式块索引所占用的字节数,计算方式为索引容量乘以每个索引条目的字节大小(这里假设BlockIndexEntry是用于表示块索引条目的结构体之类的)
3213 stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3214 // 循环遍历块索引数据结构的链表(假设通过prev指针连接),用于统计整个链表结构所占用的字节数,包括头部和每个节点相关的字节数
3215 for (; hash != nullptr; hash = hash->prev) {
3216 stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3217 }
3218 }
3219// 根据头索引和尾索引循环处理已使用块的统计,这里假设circular_less_than是用于比较循环索引大小的函数,BLOCK_SIZE是块大小相关的常量之类的
3220 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3221// 已使用块数量加1,说明对应位置的块正在被使用
3222 //auto block = prod->get_block_index_entry_for_index(head);
3223 ++stats.usedBlocks;
3224 }
3225 }
3226 // 如果不是隐式生产者(即显式生产者),进入以下逻辑进行相关统计信息的更新
3227 else {
3228 auto prod = static_cast<ExplicitProducer*>(ptr);
3229 stats.queueClassBytes += sizeof(ExplicitProducer);
3230 auto tailBlock = prod->tailBlock;
3231 bool wasNonEmpty = false;
3232 if (tailBlock != nullptr) {
3233 auto block = tailBlock;
3234 do {
3235 ++stats.allocatedBlocks;
3236 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3237 ++stats.usedBlocks;
3238 wasNonEmpty = wasNonEmpty || block != tailBlock;
3239 }
3240 ++stats.ownedBlocksExplicit;
3241 block = block->next;
3242 } while (block != tailBlock);
3243 }
3244 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3245 while (index != nullptr) {
3246 stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3247 index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3248 }
3249 }
3250 }
3251
3252 auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3253 stats.allocatedBlocks += freeOnInitialPool;
3254 stats.freeBlocks += freeOnInitialPool;
3255
3256 stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3257 stats.queueClassBytes += sizeof(ConcurrentQueue);
3258
3259 return stats;
3260 }
3261 };
3262
3263 // 仅用于调试。不是线程安全的。
3264 MemStats getMemStats()
3265 {
3266 return MemStats::getFor(this);
3267 }
3268 private:
3269 friend struct MemStats;
3270#endif
3271
3272
3273 //////////////////////////////////
3274 // 生产者列表操作
3275 //////////////////////////////////
3276
3278 {
3279 bool recycled;
3280 return recycle_or_create_producer(isExplicit, recycled);
3281 }
3282
3283 ProducerBase* recycle_or_create_producer(bool isExplicit, bool& recycled)
3284 {
3285#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3286 debug::DebugLock lock(implicitProdMutex);
3287#endif
3288 // 先尝试重用一个
3289 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3290 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3291 bool expected = true;
3292 if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
3293 // 我们抓到一个了!它已被标记为激活,调用者可以使用它
3294 recycled = true;
3295 return ptr;
3296 }
3297 }
3298 }
3299
3300 recycled = false;
3301 return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this)) : create<ImplicitProducer>(this));
3302 }
3303
3305 {
3306 // 处理内存分配失败
3307 if (producer == nullptr) {
3308 return nullptr;
3309 }
3310
3311 producerCount.fetch_add(1, std::memory_order_relaxed);
3312
3313 // 将其添加到无锁列表中
3314 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3315 do {
3316 producer->next = prevTail;
3317 } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3318
3319#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3320 if (producer->isExplicit) {
3321 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3322 do {
3323 static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3324 } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3325 }
3326 else {
3327 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3328 do {
3329 static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3330 } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3331 }
3332#endif
3333
3334 return producer;
3335 }
3336
3338 {
3339 // 在另一个实例移动到/与此实例交换之后,我们偷来的所有生产者仍然认为它们的父队列是另一个队列。
3340 // 所以需要修复它们!
3341 for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
3342 ptr->parent = this;
3343 }
3344 }
3345
3346
3347 //////////////////////////////////
3348 // 隐式生产者哈希
3349 //////////////////////////////////
3350
3352 {
3353 std::atomic<details::thread_id_t> key;
3354 ImplicitProducer* value; // 由于只有设置它的线程会读取它,因此不需要原子性
3355
3357
3359 {
3360 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3361 value = other.value;
3362 }
3363
3365 {
3366 swap(other);
3367 return *this;
3368 }
3369
3371 {
3372 if (this != &other) {
3373 details::swap_relaxed(key, other.key);
3374 std::swap(value, other.value);
3375 }
3376 }
3377 };
3378
3379 template<typename XT, typename XTraits>
3380 friend void moodycamel::swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&, typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&) MOODYCAMEL_NOEXCEPT;
3381
3388
3390 {
3391 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3392
3393 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3394 auto hash = &initialImplicitProducerHash;
3396 hash->entries = &initialImplicitProducerHashEntries[0];
3397 for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3398 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3399 }
3400 hash->prev = nullptr;
3401 implicitProducerHash.store(hash, std::memory_order_relaxed);
3402 }
3403
3405 {
3406 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3407
3408 // 交换(假设我们的隐式生产者哈希已初始化)
3409 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3411 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3412
3413 details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3414
3415 details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3416 if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3417 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3418 }
3419 else {
3421 for (hash = implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3422 continue;
3423 }
3425 }
3426 if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3427 other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3428 }
3429 else {
3431 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3432 continue;
3433 }
3434 hash->prev = &other.initialImplicitProducerHash;
3435 }
3436 }
3437
3438 // 仅在内存分配失败时才会失败(返回 nullptr)
3440 {
3441 // 注意,由于数据本质上是线程本地的(键是线程 ID),
3442 // 所以对内存屏障的需求减少(每个线程的内存顺序本身已一致),
3443 // 除了当前的表本身外。
3444
3445 // 首先在当前表和所有先前的哈希表中查找线程 ID。
3446 // 如果未找到,它一定不在其中,因为这个线程之前会将其
3447 // 添加到我们遍历过的表中的某一个表里。
3448
3449 // 代码和算法改编自 http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3450
3451#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3452 debug::DebugLock lock(implicitProdMutex);
3453#endif
3454
3455 auto id = details::thread_id();
3456 auto hashedId = details::hash_thread_id(id);
3457
3458 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3459 for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3460 // 在这个哈希表中查找 ID
3461 auto index = hashedId;
3462 while (true) { // 不是无限循环,因为哈希表中至少有一个槽位是空闲的
3463 index &= hash->capacity - 1;
3464
3465 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3466 if (probedKey == id) {
3467 // 找到了!不过,如果我们不得不在多个哈希表中进行深度搜索,我们应该懒惰地将其添加到
3468 // 当前的主哈希表中,以避免下次的扩展搜索。
3469 // 注意,当前的哈希表中保证有空间,因为每个后续的哈希表隐式地为所有先前的表保留了空间
3470 // (只有一个 implicitProducerHashCount)。
3471 auto value = hash->entries[index].value;
3472 if (hash != mainHash) {
3473 index = hashedId;
3474 while (true) {
3475 index &= mainHash->capacity - 1;
3476 probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3477 auto empty = details::invalid_thread_id;
3478#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3479 auto reusable = details::invalid_thread_id2;
3480 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3481 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3482#else
3483 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3484#endif
3485 mainHash->entries[index].value = value;
3486 break;
3487 }
3488 ++index;
3489 }
3490 }
3491
3492 return value;
3493 }
3494 if (probedKey == details::invalid_thread_id) {
3495 break; // 不在这个哈希表中
3496 }
3497 ++index;
3498 }
3499 }
3500
3501 // Insert!
3502 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3503 while (true) {
3504 if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3505 // 我们已获得了调整大小的锁,尝试分配一个更大的哈希表。
3506 // 注意,获取屏障与此块末尾的释放屏障同步,因此当我们重新加载 implicitProducerHash 时,它
3507 // 必须是最新版本(它只在这个锁定的块内被更改)。
3508 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3509 if (newCount >= (mainHash->capacity >> 1)) {
3510 auto newCapacity = mainHash->capacity << 1;
3511 while (newCount >= (newCapacity >> 1)) {
3512 newCapacity <<= 1;
3513 }
3514 auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
3515 if (raw == nullptr) {
3516 // 分配失败
3517 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3518 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3519 return nullptr;
3520 }
3521
3522 auto newHash = new (raw) ImplicitProducerHash;
3523 newHash->capacity = newCapacity;
3524 newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
3525 for (size_t i = 0; i != newCapacity; ++i) {
3526 new (newHash->entries + i) ImplicitProducerKVP;
3527 newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3528 }
3529 newHash->prev = mainHash;
3530 implicitProducerHash.store(newHash, std::memory_order_release);
3531 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3532 mainHash = newHash;
3533 }
3534 else {
3535 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3536 }
3537 }
3538
3539 // 如果当前表的填充度低于三分之四,即使如此也将其添加到旧表中,以避免等待下一个表
3540 // 被另一个线程分配(如果我们刚刚完成了上面的分配,条件将总是为真)。
3541 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3542 bool recycled;
3543 auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled));
3544 if (producer == nullptr) {
3545 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3546 return nullptr;
3547 }
3548 if (recycled) {
3549 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3550 }
3551
3552#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3553 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3554 producer->threadExitListener.userData = producer;
3555 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3556#endif
3557
3558 auto index = hashedId;
3559 while (true) {
3560 index &= mainHash->capacity - 1;
3561 auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3562
3563 auto empty = details::invalid_thread_id;
3564#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3565 auto reusable = details::invalid_thread_id2;
3566 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3567 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3568#else
3569 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3570#endif
3571 mainHash->entries[index].value = producer;
3572 break;
3573 }
3574 ++index;
3575 }
3576 return producer;
3577 }
3578
3579 // 嗯,旧的哈希表已经很满了,而其他线程正在忙于分配一个新的哈希表。
3580 // 我们需要等待正在分配的新表的线程完成(如果分配成功,我们添加到新表中;如果不成功,
3581 // 我们自己尝试分配)。
3582 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3583 }
3584 }
3585
3586#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3587 void implicit_producer_thread_exited(ImplicitProducer* producer)
3588 {
3589 // 从线程退出监听器中移除
3590 details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3591
3592 // 从哈希表中移除
3593#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3594 debug::DebugLock lock(implicitProdMutex);
3595#endif
3596 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3597 assert(hash != nullptr); // 线程退出监听器仅在我们最初被添加到哈希表时注册
3598
3599 auto id = details::thread_id();
3600 auto hashedId = details::hash_thread_id(id);
3601 details::thread_id_t probedKey;
3602
3603 // 我们需要遍历所有的哈希表,以防其他线程还没有在当前哈希表上,
3604 // 并且它们正在尝试添加一个条目,认为还有空位(因为它们重用了一个生产者)
3605 for (; hash != nullptr; hash = hash->prev) {
3606 auto index = hashedId;
3607 do {
3608 index &= hash->capacity - 1;
3609 probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3610 if (probedKey == id) {
3611 hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
3612 break;
3613 }
3614 ++index;
3615 } while (probedKey != details::invalid_thread_id); // 可能发生在哈希表已改变但我们尚未被重新添加,或者我们最初根本没有被添加到这个哈希表
3616
3617 }
3618
3619 // 将队列标记为可回收
3620 producer->inactive.store(true, std::memory_order_release);
3621 }
3622
3623 static void implicit_producer_thread_exited_callback(void* userData)
3624 {
3625 auto producer = static_cast<ImplicitProducer*>(userData);
3626 auto queue = producer->parent;
3627 queue->implicit_producer_thread_exited(producer);
3628 }
3629#endif
3630
3631 //////////////////////////////////
3632 // 工具函数
3633 //////////////////////////////////
3634
3635 template<typename U>
3636 static inline U* create_array(size_t count)
3637 {
3638 assert(count > 0);
3639 auto p = static_cast<U*>((Traits::malloc)(sizeof(U) * count));
3640 if (p == nullptr) {
3641 return nullptr;
3642 }
3643
3644 for (size_t i = 0; i != count; ++i) {
3645 new (p + i) U();
3646 }
3647 return p;
3648 }
3649
3650 template<typename U>
3651 static inline void destroy_array(U* p, size_t count)
3652 {
3653 if (p != nullptr) {
3654 assert(count > 0);
3655 for (size_t i = count; i != 0; ) {
3656 (p + --i)->~U();
3657 }
3658 (Traits::free)(p);
3659 }
3660 }
3661
3662 template<typename U>
3663 static inline U* create()
3664 {
3665 auto p = (Traits::malloc)(sizeof(U));
3666 return p != nullptr ? new (p) U : nullptr;
3667 }
3668
3669 template<typename U, typename A1>
3670 static inline U* create(A1&& a1)
3671 {
3672 auto p = (Traits::malloc)(sizeof(U));
3673 return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
3674 }
3675
3676 template<typename U>
3677 static inline void destroy(U* p)
3678 {
3679 if (p != nullptr) {
3680 p->~U();
3681 }
3682 (Traits::free)(p);
3683 }
3684
3685private:
3686 std::atomic<ProducerBase*> producerListTail;
3687 std::atomic<std::uint32_t> producerCount;
3688
3689 std::atomic<size_t> initialBlockPoolIndex;
3692
3693#if !MCDBGQ_USEDEBUGFREELIST
3695#else
3696 debug::DebugFreeList<Block> freeList;
3697#endif
3698
3699 std::atomic<ImplicitProducerHash*> implicitProducerHash;
3700 std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
3702 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3704
3705 std::atomic<std::uint32_t> nextExplicitConsumerId;
3706 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3707
3708#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3709 debug::DebugMutex implicitProdMutex;
3710#endif
3711
3712#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3713 std::atomic<ExplicitProducer*> explicitProducers;
3714 std::atomic<ImplicitProducer*> implicitProducers;
3715#endif
3716};
3717
3718
3719template<typename T, typename Traits>
3721 : producer(queue.recycle_or_create_producer(true))
3722{
3723 if (producer != nullptr) {
3724 producer->token = this;
3725 }
3726}
3727
3728template<typename T, typename Traits>
3730 : producer(reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true))
3731{
3732 if (producer != nullptr) {
3733 producer->token = this;
3734 }
3735}
3736
3737template<typename T, typename Traits>
3739 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3740{
3741 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3743}
3744
3745template<typename T, typename Traits>
3747 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3748{
3749 initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3751}
3752
3753template<typename T, typename Traits>
3754inline void swap(ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
3755{
3756 a.swap(b);
3757}
3758
3760{
3761 a.swap(b);
3762}
3763
3765{
3766 a.swap(b);
3767}
3768
3769template<typename T, typename Traits>
3770inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT
3771{
3772 a.swap(b);
3773}
3774
3775}
3776
3777#if defined(__GNUC__)
3778#pragma GCC diagnostic pop
3779#endif
return true
#define MOODYCAMEL_NOEXCEPT
ConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
Traits::index_t index_t
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)
#define MOODYCAMEL_THREADLOCAL
static const size_t BLOCK_SIZE
#define MOODYCAMEL_DELETE_FUNCTION
ConcurrentQueue & operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
#define MOODYCAMEL_CATCH(...)
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr)
#define MOODYCAMEL_RETHROW
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
static const size_t MAX_SUBQUEUE_SIZE
#define MOODYCAMEL_TRY
bool try_enqueue(producer_token_t const &token, T &&item)
ConcurrentQueue & swap_internal(ConcurrentQueue &other)
bool enqueue(producer_token_t const &token, T const &item)
bool enqueue(producer_token_t const &token, T &&item)
ConcurrentQueue(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
static const size_t MAX_SUBQUEUE_SIZE
ConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ConcurrentQueue & operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
bool enqueue_bulk(It itemFirst, size_t count)
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
bool try_dequeue_from_producer(producer_token_t const &producer, U &item)
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
bool try_dequeue(consumer_token_t &token, U &item)
::moodycamel::ProducerToken producer_token_t
void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
size_t try_dequeue_bulk(It itemFirst, size_t max)
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
bool try_enqueue(T const &item)
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
ConcurrentQueue & operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
::moodycamel::ConsumerToken consumer_token_t
bool try_enqueue(producer_token_t const &token, T const &item)
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
bool try_dequeue_non_interleaved(U &item)
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
static const size_t BLOCK_SIZE
ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
bool try_enqueue_bulk(It itemFirst, size_t count)
static const thread_id_t invalid_thread_id2
static bool unlikely(bool x)
static bool circular_less_than(T a, T b)
static void swap_relaxed(std::atomic< T > &left, std::atomic< T > &right)
std::max_align_t std_max_align_t
static auto deref_noexcept(It &it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
static const thread_id_t invalid_thread_id
static size_t hash_thread_id(thread_id_t id)
static thread_id_t thread_id()
static T const & nomove(T const &x)
static bool likely(bool x)
static char * align_for(char *ptr)
std::uintptr_t thread_id_t
static T ceil_to_pow_2(T x)
ImplicitProducer * get_or_add_implicit_producer()
friend struct ProducerToken
friend struct ConsumerToken
std::atomic< std::uint32_t > nextExplicitConsumerId
void swap_implicit_producer_hashes(ConcurrentQueue &other)
FreeList< Block > freeList
static void destroy(U *p)
enum moodycamel::AllocationMode try_dequeue_bulk_from_producer
bool inner_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ImplicitProducerHash initialImplicitProducerHash
ProducerBase * recycle_or_create_producer(bool isExplicit)
std::atomic_flag implicitProducerHashResizeInProgress
friend struct ImplicitProducer
std::atomic< size_t > implicitProducerHashCount
friend class ConcurrentQueueTests
static U * create()
std::atomic< ProducerBase * > producerListTail
void reown_producers()
std::atomic< size_t > initialBlockPoolIndex
void swap(typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &a, typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT
Block * requisition_block()
void populate_initial_implicit_producer_hash()
ProducerBase * add_producer(ProducerBase *producer)
bool inner_enqueue(producer_token_t const &token, U &&element)
void populate_initial_block_list(size_t blockCount)
Block * try_get_block_from_free_list()
std::atomic< ImplicitProducerHash * > implicitProducerHash
std::atomic< std::uint32_t > globalExplicitConsumerOffset
void add_block_to_free_list(Block *block)
static U * create_array(size_t count)
std::array< ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE > initialImplicitProducerHashEntries
void add_blocks_to_free_list(Block *block)
friend struct ExplicitProducer
bool update_current_producer_after_rotation(consumer_token_t &token)
static void destroy_array(U *p, size_t count)
Block * try_get_block_from_initial_pool()
std::atomic< std::uint32_t > producerCount
Block * initialBlockPool
static bool is_lock_free()
size_t initialBlockPoolSize
size_t size_approx() const
std::atomic< size_t > elementsCompletelyDequeued
details::max_align_t dummy
std::atomic< Block * > freeListNext
char elements[sizeof(T) *BLOCK_SIZE]
bool set_empty(index_t i)
std::atomic< bool > shouldBeOnFreeList
T const * operator[](index_t idx) const MOODYCAMEL_NOEXCEPT
std::atomic< std::uint32_t > freeListRefs
T * operator[](index_t idx) MOODYCAMEL_NOEXCEPT
std::atomic< bool > emptyFlags[BLOCK_SIZE<=EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE :1]
bool set_many_empty(index_t i, size_t count)
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
std::uint32_t itemsConsumedFromCurrent
ConsumerToken(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
ConsumerToken(ConcurrentQueue< T, Traits > &q)
ConsumerToken & operator=(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
details::ConcurrentQueueProducerTypelessBase * desiredProducer
std::uint32_t lastKnownGlobalOffset
ConsumerToken & operator=(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
details::ConcurrentQueueProducerTypelessBase * currentProducer
ConsumerToken(BlockingConcurrentQueue< T, Traits > &q)
ConsumerToken(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
void swap(ConsumerToken &other) MOODYCAMEL_NOEXCEPT
bool enqueue_bulk(It itemFirst, size_t count)
BlockIndexEntry * pr_blockIndexEntries
ExplicitProducer(ConcurrentQueue *parent)
size_t dequeue_bulk(It &itemFirst, size_t max)
std::atomic< BlockIndexHeader * > blockIndex
bool new_block_index(size_t numberOfFilledSlotsToExpose)
std::atomic< N * > freeListNext
std::atomic< std::uint32_t > freeListRefs
void add_knowing_refcount_is_zero(N *node)
FreeList(FreeList &&other)
void swap(FreeList &other)
std::atomic< N * > freeListHead
static const std::uint32_t SHOULD_BE_ON_FREELIST
FreeList & operator=(FreeList const &) MOODYCAMEL_DELETE_FUNCTION
FreeList(FreeList const &) MOODYCAMEL_DELETE_FUNCTION
static const std::uint32_t REFS_MASK
std::atomic< details::thread_id_t > key
ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT
ImplicitProducerKVP & operator=(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT
void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT
std::atomic< BlockIndexHeader * > blockIndex
static const index_t INVALID_BLOCK_BASE
size_t get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex) const
bool enqueue_bulk(It itemFirst, size_t count)
bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex)
size_t dequeue_bulk(It &itemFirst, size_t max)
BlockIndexEntry * get_block_index_entry_for_index(index_t index) const
ImplicitProducer(ConcurrentQueue *parent)
ProducerBase * next_prod() const
std::atomic< index_t > headIndex
std::atomic< index_t > dequeueOvercommit
std::atomic< index_t > dequeueOptimisticCount
size_t dequeue_bulk(It &itemFirst, size_t max)
std::atomic< index_t > tailIndex
ProducerBase(ConcurrentQueue *parent_, bool isExplicit_)
ProducerToken(BlockingConcurrentQueue< T, Traits > &queue)
ProducerToken & operator=(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
ProducerToken(ConcurrentQueue< T, Traits > &queue)
details::ConcurrentQueueProducerTypelessBase * producer
ProducerToken(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
void swap(ProducerToken &other) MOODYCAMEL_NOEXCEPT
ProducerToken(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
ProducerToken & operator=(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
static std::uint64_t hash(std::uint64_t h)
static std::uint32_t hash(std::uint32_t h)
static auto eval(U &&x) -> decltype(std::forward< U >(x))
static T const & eval(T const &x)
static thread_id_hash_t prehash(thread_id_t const &x)