diff --git a/Makefile.am b/Makefile.am
index 4c4abc4415..e81a4ca7a9 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -22,6 +22,8 @@ include_HEADERS = \
src_libzmq_la_SOURCES = \
src/address.cpp \
src/address.hpp \
+ src/allocator.cpp \
+ src/allocator.hpp \
src/array.hpp \
src/atomic_counter.hpp \
src/atomic_ptr.hpp \
diff --git a/include/zmq.h b/include/zmq.h
index edf28efd2b..d0174c5a1a 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -268,6 +268,8 @@ typedef void(zmq_free_fn) (void *data_, void *hint_);
ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg_);
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_);
+ZMQ_EXPORT int
+zmq_msg_init_allocator (zmq_msg_t *msg_, size_t size_, void *allocator_);
ZMQ_EXPORT int zmq_msg_init_data (
zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_);
ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_);
@@ -669,6 +671,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
/* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10
+//#define ZMQ_MSG_ALLOCATOR 11
/* DRAFT Context methods. */
ZMQ_EXPORT int zmq_ctx_set_ext (void *context_,
@@ -680,6 +683,17 @@ ZMQ_EXPORT int zmq_ctx_get_ext (void *context_,
void *optval_,
size_t *optvallen_);
+/* ZMQ-provided message-pool implementations. */
+// default allocator using malloc/free
+#define ZMQ_MSG_ALLOCATOR_DEFAULT 0
+// using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway
+#define ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL 1
+// using internally a MPMC queue
+#define ZMQ_MSG_ALLOCATOR_GLOBAL_POOL 2
+
+ZMQ_EXPORT void *zmq_msg_allocator_new (int type_);
+ZMQ_EXPORT int zmq_msg_allocator_destroy (void **allocator_);
+
/* DRAFT Socket methods. */
ZMQ_EXPORT int zmq_join (void *s, const char *group);
ZMQ_EXPORT int zmq_leave (void *s, const char *group);
diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp
index 8378950cd9..3f47234622 100644
--- a/perf/remote_thr.cpp
+++ b/perf/remote_thr.cpp
@@ -27,6 +27,7 @@
along with this program. If not, see .
*/
+#include "../src/platform.hpp"
#include "../include/zmq.h"
#include
#include
@@ -67,6 +68,11 @@ int main (int argc, char *argv[])
return -1;
}
+#ifdef ZMQ_BUILD_DRAFT_API
+ // EXPERIMENTAL ALLOCATOR FOR MSG_T
+ void *allocator = zmq_msg_allocator_new (ZMQ_MSG_ALLOCATOR_GLOBAL_POOL);
+#endif
+
s = zmq_socket (ctx, ZMQ_PUSH);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
@@ -105,7 +111,11 @@ int main (int argc, char *argv[])
}
for (i = 0; i != message_count; i++) {
+#ifdef ZMQ_BUILD_DRAFT_API
+ rc = zmq_msg_init_allocator (&msg, message_size, allocator);
+#else
rc = zmq_msg_init_size (&msg, message_size);
+#endif
if (rc != 0) {
printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
return -1;
@@ -134,5 +144,11 @@ int main (int argc, char *argv[])
return -1;
}
+#ifdef ZMQ_BUILD_DRAFT_API
+ // IMPORTANT: destroy the allocator only after zmq_ctx_term() since otherwise
+ // some zmq_msg_t may still be "in fly"
+ zmq_msg_allocator_destroy (&allocator);
+#endif
+
return 0;
}
diff --git a/src/allocator.cpp b/src/allocator.cpp
new file mode 100644
index 0000000000..ff6b6320fa
--- /dev/null
+++ b/src/allocator.cpp
@@ -0,0 +1,97 @@
+/*
+ Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
+
+ This file is part of libzmq, the ZeroMQ core engine in C++.
+
+ libzmq is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License (LGPL) as published
+ by the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ As a special exception, the Contributors give you permission to link
+ this library with independent modules to produce an executable,
+ regardless of the license terms of these independent modules, and to
+ copy and distribute the resulting executable under terms of your choice,
+ provided that you also meet, for each linked independent module, the
+ terms and conditions of the license of that module. An independent
+ module is a module which is not derived from or based on this library.
+ If you modify this library, you must extend this exception to your
+ version of the library.
+
+ libzmq is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#include "precompiled.hpp"
+#include "allocator.hpp"
+
+
+zmq::allocator_t::allocator_t ()
+{
+ _type = ZMQ_MSG_ALLOCATOR_DEFAULT;
+ _tag = 0xCAFEEBEB;
+}
+
+size_t zmq::allocator_t::size () const
+{
+ switch (_type) {
+ case ZMQ_MSG_ALLOCATOR_DEFAULT:
+ return 0;
+
+ // using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway
+ case ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL:
+ return 0;
+
+ // using internally a MPMC queue
+ case ZMQ_MSG_ALLOCATOR_GLOBAL_POOL:
+ return _global_pool.size ();
+
+ default:
+ return 0;
+ }
+}
+
+
+void *zmq::allocator_t::allocate (size_t len)
+{
+ switch (_type) {
+ case ZMQ_MSG_ALLOCATOR_DEFAULT:
+ return malloc (len);
+
+ // using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway
+ case ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL:
+ // FIXME
+ return NULL;
+
+ // using internally a MPMC queue
+ case ZMQ_MSG_ALLOCATOR_GLOBAL_POOL:
+ return _global_pool.allocate_msg (len);
+ }
+ return NULL;
+}
+
+void zmq::allocator_t::deallocate_msg (void *data_, void *hint_)
+{
+ allocator_t *alloc = reinterpret_cast (hint_);
+ switch (alloc->_type) {
+ case ZMQ_MSG_ALLOCATOR_DEFAULT:
+ free (data_);
+ return;
+
+ // using internally a SPSC queue (cannot be used with inproc maybe?) or perhaps an MPMC queue anyway
+ case ZMQ_MSG_ALLOCATOR_PER_THREAD_POOL:
+ // FIXME
+ return;
+
+ // using internally a MPMC queue
+ case ZMQ_MSG_ALLOCATOR_GLOBAL_POOL:
+ zmq::msg_t::content_t *msg_content =
+ (zmq::msg_t::content_t *) data_;
+ alloc->_global_pool.deallocate_msg (msg_content, msg_content->size);
+ }
+}
diff --git a/src/allocator.hpp b/src/allocator.hpp
new file mode 100644
index 0000000000..3e1352e9e9
--- /dev/null
+++ b/src/allocator.hpp
@@ -0,0 +1,197 @@
+/*
+ Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
+
+ This file is part of libzmq, the ZeroMQ core engine in C++.
+
+ libzmq is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License (LGPL) as published
+ by the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ As a special exception, the Contributors give you permission to link
+ this library with independent modules to produce an executable,
+ regardless of the license terms of these independent modules, and to
+ copy and distribute the resulting executable under terms of your choice,
+ provided that you also meet, for each linked independent module, the
+ terms and conditions of the license of that module. An independent
+ module is a module which is not derived from or based on this library.
+ If you modify this library, you must extend this exception to your
+ version of the library.
+
+ libzmq is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_MEMORYPOOL_HPP_INCLUDED__
+#define __ZMQ_MEMORYPOOL_HPP_INCLUDED__
+
+#include
+#include "msg.hpp"
+#include "concurrentqueue.h"
+
+// FIXME: we need to grow dynamically the mempool
+#define MAX_ACTIVE_MESSAGES (16384)
+
+namespace zmq
+{
+class global_memory_pool_t
+{
+ typedef struct
+ {
+ size_t num_msgs;
+ // actual user data
+ uint8_t *raw_data;
+ } msg_block_t;
+
+ typedef enum
+ {
+ MsgBlock_SizeClass_256 = 0, // for messages up to 256B long
+ MsgBlock_SizeClass_512,
+ MsgBlock_SizeClass_1024,
+ MsgBlock_SizeClass_2048,
+ MsgBlock_SizeClass_4096,
+ MsgBlock_SizeClass_8192,
+
+ MsgBlock_NumSizeClasses
+ } MsgBlock_e;
+
+ inline size_t MsgBlockToBytes (MsgBlock_e block_class)
+ {
+ switch (block_class) {
+ case MsgBlock_SizeClass_256:
+ return 256;
+ case MsgBlock_SizeClass_512:
+ return 512;
+ case MsgBlock_SizeClass_1024:
+ return 1024;
+ case MsgBlock_SizeClass_2048:
+ return 2048;
+ case MsgBlock_SizeClass_4096:
+ return 4096;
+ case MsgBlock_SizeClass_8192:
+ return 8192;
+ default:
+ return 0;
+ }
+ }
+ inline MsgBlock_e BytesToMsgBlock (size_t n)
+ {
+ if (n < 256)
+ return MsgBlock_SizeClass_256;
+ else if (n < 512)
+ return MsgBlock_SizeClass_512;
+ else if (n < 1024)
+ return MsgBlock_SizeClass_1024;
+ else if (n < 2048)
+ return MsgBlock_SizeClass_2048;
+ else if (n < 4096)
+ return MsgBlock_SizeClass_4096;
+ else if (n < 8192)
+ return MsgBlock_SizeClass_8192;
+
+ // size too big
+ return MsgBlock_NumSizeClasses;
+ }
+
+ public:
+ global_memory_pool_t ()
+ {
+ // enqueue all available blocks in the free list:
+ for (int i = 0; i < MsgBlock_NumSizeClasses; i++) {
+ size_t msg_size = MsgBlockToBytes ((MsgBlock_e) i);
+
+ m_storage[i].num_msgs = MAX_ACTIVE_MESSAGES;
+ m_storage[i].raw_data =
+ (uint8_t *) malloc (MAX_ACTIVE_MESSAGES * msg_size);
+
+ uint8_t *msg_memory = m_storage[i].raw_data;
+ for (int j = 0; j < MAX_ACTIVE_MESSAGES; j++) {
+ m_free_list[i].enqueue (msg_memory);
+ msg_memory += msg_size;
+ }
+ }
+ }
+ ~global_memory_pool_t ()
+ {
+ // deallocate all message classes
+ for (int i = 0; i < MsgBlock_NumSizeClasses; i++) {
+ free (m_storage[i].raw_data);
+ m_storage[i].raw_data = NULL;
+ }
+ }
+
+ void *allocate_msg (size_t len) // consumer thread: user app thread
+ {
+ MsgBlock_e bl = BytesToMsgBlock (len);
+ assert (bl != MsgBlock_NumSizeClasses);
+
+ // consume 1 block from the list of free msg
+ uint8_t *next_avail = nullptr;
+ if (!m_free_list[bl].try_dequeue (next_avail)) {
+ assert (0); // I want to find out if this ever happens
+ return NULL;
+ }
+
+ assert (next_avail);
+ return next_avail;
+ }
+
+ void
+ deallocate_msg (void *data_,
+ size_t len) // producer thread: ZMQ background IO thread
+ {
+ MsgBlock_e bl = BytesToMsgBlock (len);
+ assert (bl != MsgBlock_NumSizeClasses);
+
+ // produce a new free msg:
+ m_free_list[bl].enqueue ((uint8_t *) data_);
+ }
+
+ size_t size () const
+ {
+ size_t acc = 0;
+ for (int i = 0; i < MsgBlock_NumSizeClasses; i++)
+ acc += m_free_list[i].size_approx ();
+ return acc;
+ }
+
+ private:
+ msg_block_t m_storage[MsgBlock_NumSizeClasses];
+ moodycamel::ConcurrentQueue m_free_list[MsgBlock_NumSizeClasses];
+};
+
+class allocator_t
+{
+ public:
+ allocator_t ();
+ ~allocator_t ()
+ {
+ // Mark this instance as dead
+ _tag = 0xdeadbeef;
+ }
+
+ void init (int type_) { _type = type_; }
+
+ // allocate() typically gets called by the consumer thread: the user app thread(s)
+ void *allocate (size_t len);
+
+ // deallocate_msg() typically gets called by the producer thread: the ZMQ background IO thread(s)
+ static void deallocate_msg (void *data_, void *hint_);
+
+ size_t size () const;
+ bool check_tag () const { return _tag == 0xCAFEEBEB; }
+
+
+ private:
+ int _type;
+ uint32_t _tag;
+ global_memory_pool_t _global_pool;
+};
+}
+
+#endif
diff --git a/src/concurrentqueue.h b/src/concurrentqueue.h
new file mode 100644
index 0000000000..21cb9375aa
--- /dev/null
+++ b/src/concurrentqueue.h
@@ -0,0 +1,3636 @@
+// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
+// An overview, including benchmark results, is provided here:
+// http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
+// The full design is also described in excruciating detail at:
+// http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
+
+// Simplified BSD license:
+// Copyright (c) 2013-2016, Cameron Desrochers.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification,
+// are permitted provided that the following conditions are met:
+//
+// - Redistributions of source code must retain the above copyright notice, this list of
+// conditions and the following disclaimer.
+// - Redistributions in binary form must reproduce the above copyright notice, this list of
+// conditions and the following disclaimer in the documentation and/or other materials
+// provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
+// OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+#pragma once
+
+#if defined(__GNUC__)
+// Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
+// Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
+// upon assigning any computed values)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wconversion"
+
+#ifdef MCDBGQ_USE_RELACY
+#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
+#endif
+#endif
+
+#if defined(__APPLE__)
+#include "TargetConditionals.h"
+#endif
+
+#ifdef MCDBGQ_USE_RELACY
+#include "relacy/relacy_std.hpp"
+#include "relacy_shims.h"
+// We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations.
+// We'll override the default trait malloc ourselves without a macro.
+#undef new
+#undef delete
+#undef malloc
+#undef free
+#else
+#include // Requires C++11. Sorry VS2010.
+#include
+#endif
+#include // for max_align_t
+#include
+#include
+#include
+#include
+#include
+#include
+#include // for CHAR_BIT
+#include
+#include // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
+
+// Platform-specific definitions of a numeric thread ID type and an invalid value
+namespace moodycamel { namespace details {
+ template struct thread_id_converter {
+ typedef thread_id_t thread_id_numeric_size_t;
+ typedef thread_id_t thread_id_hash_t;
+ static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
+ };
+} }
+#if defined(MCDBGQ_USE_RELACY)
+namespace moodycamel { namespace details {
+ typedef std::uint32_t thread_id_t;
+ static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
+ static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
+ static inline thread_id_t thread_id() { return rl::thread_index(); }
+} }
+#elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
+// No sense pulling in windows.h in a header, we'll manually declare the function
+// we use and rely on backwards-compatibility for this not to break
+extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
+namespace moodycamel { namespace details {
+ static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
+ typedef std::uint32_t thread_id_t;
+ static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
+ static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
+ static inline thread_id_t thread_id() { return static_cast(::GetCurrentThreadId()); }
+} }
+#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
+namespace moodycamel { namespace details {
+ static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
+
+ typedef std::thread::id thread_id_t;
+ static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
+
+ // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
+ // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
+ // be.
+ static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
+
+ template struct thread_id_size { };
+ template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; };
+ template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; };
+
+ template<> struct thread_id_converter {
+ typedef thread_id_size::numeric_t thread_id_numeric_size_t;
+#ifndef __APPLE__
+ typedef std::size_t thread_id_hash_t;
+#else
+ typedef thread_id_numeric_size_t thread_id_hash_t;
+#endif
+
+ static thread_id_hash_t prehash(thread_id_t const& x)
+ {
+#ifndef __APPLE__
+ return std::hash()(x);
+#else
+ return *reinterpret_cast(&x);
+#endif
+ }
+ };
+} }
+#else
+// Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
+// In order to get a numeric thread ID in a platform-independent way, we use a thread-local
+// static variable's address as a thread identifier :-)
+#if defined(__GNUC__) || defined(__INTEL_COMPILER)
+#define MOODYCAMEL_THREADLOCAL __thread
+#elif defined(_MSC_VER)
+#define MOODYCAMEL_THREADLOCAL __declspec(thread)
+#else
+// Assume C++11 compliant compiler
+#define MOODYCAMEL_THREADLOCAL thread_local
+#endif
+namespace moodycamel { namespace details {
+ typedef std::uintptr_t thread_id_t;
+ static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
+ static const thread_id_t invalid_thread_id2 = 1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
+ static inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast(&x); }
+} }
+#endif
+
+// Exceptions
+#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
+#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
+#define MOODYCAMEL_EXCEPTIONS_ENABLED
+#endif
+#endif
+#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
+#define MOODYCAMEL_TRY try
+#define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
+#define MOODYCAMEL_RETHROW throw
+#define MOODYCAMEL_THROW(expr) throw (expr)
+#else
+#define MOODYCAMEL_TRY if (true)
+#define MOODYCAMEL_CATCH(...) else if (false)
+#define MOODYCAMEL_RETHROW
+#define MOODYCAMEL_THROW(expr)
+#endif
+
+#ifndef MOODYCAMEL_NOEXCEPT
+#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
+#define MOODYCAMEL_NOEXCEPT
+#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
+#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
+#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
+// VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
+// We have to assume *all* non-trivial constructors may throw on VS2012!
+#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
+#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference::value && std::is_move_constructible::value ? std::is_trivially_move_constructible::value : std::is_trivially_copy_constructible::value)
+#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference::value && std::is_move_assignable::value ? std::is_trivially_move_assignable::value || std::is_nothrow_move_assignable::value : std::is_trivially_copy_assignable::value || std::is_nothrow_copy_assignable::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
+#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
+#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
+#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference::value && std::is_move_constructible::value ? std::is_trivially_move_constructible::value || std::is_nothrow_move_constructible::value : std::is_trivially_copy_constructible::value || std::is_nothrow_copy_constructible::value)
+#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference::value && std::is_move_assignable::value ? std::is_trivially_move_assignable::value || std::is_nothrow_move_assignable::value : std::is_trivially_copy_assignable::value || std::is_nothrow_copy_assignable::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
+#else
+#define MOODYCAMEL_NOEXCEPT noexcept
+#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
+#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
+#endif
+#endif
+
+#ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
+#ifdef MCDBGQ_USE_RELACY
+#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
+#else
+// VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
+// g++ <=4.7 doesn't support thread_local either.
+// Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
+#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
+// Assume `thread_local` is fully supported in all other C++11 compilers/platforms
+//#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // always disabled for now since several users report having problems with it on
+#endif
+#endif
+#endif
+
+// VS2012 doesn't support deleted functions.
+// In this case, we declare the function normally but don't define it. A link error will be generated if the function is called.
+#ifndef MOODYCAMEL_DELETE_FUNCTION
+#if defined(_MSC_VER) && _MSC_VER < 1800
+#define MOODYCAMEL_DELETE_FUNCTION
+#else
+#define MOODYCAMEL_DELETE_FUNCTION = delete
+#endif
+#endif
+
+// Compiler-specific likely/unlikely hints
+namespace moodycamel { namespace details {
+#if defined(__GNUC__)
+ static inline bool (likely)(bool x) { return __builtin_expect((x), true); }
+ static inline bool (unlikely)(bool x) { return __builtin_expect((x), false); }
+#else
+ static inline bool (likely)(bool x) { return x; }
+ static inline bool (unlikely)(bool x) { return x; }
+#endif
+} }
+
+#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
+#include "internal/concurrentqueue_internal_debug.h"
+#endif
+
+namespace moodycamel {
+namespace details {
+ template
+ struct const_numeric_max {
+ static_assert(std::is_integral::value, "const_numeric_max can only be used with integers");
+ static const T value = std::numeric_limits::is_signed
+ ? (static_cast(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast(1)
+ : static_cast(-1);
+ };
+
+#if defined(__GLIBCXX__)
+ typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
+#else
+ typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
+#endif
+
+ // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
+ // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
+ typedef union {
+ std_max_align_t x;
+ long long y;
+ void* z;
+ } max_align_t;
+}
+
+// Default traits for the ConcurrentQueue. To change some of the
+// traits without re-implementing all of them, inherit from this
+// struct and shadow the declarations you wish to be different;
+// since the traits are used as a template type parameter, the
+// shadowed declarations will be used where defined, and the defaults
+// otherwise.
+struct ConcurrentQueueDefaultTraits
+{
+ // General-purpose size type. std::size_t is strongly recommended.
+ typedef std::size_t size_t;
+
+ // The type used for the enqueue and dequeue indices. Must be at least as
+ // large as size_t. Should be significantly larger than the number of elements
+ // you expect to hold at once, especially if you have a high turnover rate;
+ // for example, on 32-bit x86, if you expect to have over a hundred million
+ // elements or pump several million elements through your queue in a very
+ // short space of time, using a 32-bit type *may* trigger a race condition.
+ // A 64-bit int type is recommended in that case, and in practice will
+ // prevent a race condition no matter the usage of the queue. Note that
+ // whether the queue is lock-free with a 64-int type depends on the whether
+ // std::atomic is lock-free, which is platform-specific.
+ typedef std::size_t index_t;
+
+ // Internally, all elements are enqueued and dequeued from multi-element
+ // blocks; this is the smallest controllable unit. If you expect few elements
+ // but many producers, a smaller block size should be favoured. For few producers
+ // and/or many elements, a larger block size is preferred. A sane default
+ // is provided. Must be a power of 2.
+ static const size_t BLOCK_SIZE = 32;
+
+ // For explicit producers (i.e. when using a producer token), the block is
+ // checked for being empty by iterating through a list of flags, one per element.
+ // For large block sizes, this is too inefficient, and switching to an atomic
+ // counter-based approach is faster. The switch is made for block sizes strictly
+ // larger than this threshold.
+ static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
+
+ // How many full blocks can be expected for a single explicit producer? This should
+ // reflect that number's maximum for optimal performance. Must be a power of 2.
+ static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
+
+ // How many full blocks can be expected for a single implicit producer? This should
+ // reflect that number's maximum for optimal performance. Must be a power of 2.
+ static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
+
+ // The initial size of the hash table mapping thread IDs to implicit producers.
+ // Note that the hash is resized every time it becomes half full.
+ // Must be a power of two, and either 0 or at least 1. If 0, implicit production
+ // (using the enqueue methods without an explicit producer token) is disabled.
+ static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
+
+ // Controls the number of items that an explicit consumer (i.e. one with a token)
+ // must consume before it causes all consumers to rotate and move on to the next
+ // internal queue.
+ static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
+
+ // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
+ // Enqueue operations that would cause this limit to be surpassed will fail. Note
+ // that this limit is enforced at the block level (for performance reasons), i.e.
+ // it's rounded up to the nearest block size.
+ static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max::value;
+
+
+#ifndef MCDBGQ_USE_RELACY
+ // Memory allocation can be customized if needed.
+ // malloc should return nullptr on failure, and handle alignment like std::malloc.
+#if defined(malloc) || defined(free)
+ // Gah, this is 2015, stop defining macros that break standard code already!
+ // Work around malloc/free being special macros:
+ static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
+ static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
+ static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
+ static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
+#else
+ static inline void* malloc(size_t size) { return std::malloc(size); }
+ static inline void free(void* ptr) { return std::free(ptr); }
+#endif
+#else
+ // Debug versions when running under the Relacy race detector (ignore
+ // these in user code)
+ static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
+ static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
+#endif
+};
+
+
+// When producing or consuming many elements, the most efficient way is to:
+// 1) Use one of the bulk-operation methods of the queue with a token
+// 2) Failing that, use the bulk-operation methods without a token
+// 3) Failing that, create a token and use that with the single-item methods
+// 4) Failing that, use the single-parameter methods of the queue
+// Having said that, don't create tokens willy-nilly -- ideally there should be
+// a maximum of one token per thread (of each kind).
+struct ProducerToken;
+struct ConsumerToken;
+
+template class ConcurrentQueue;
+template class BlockingConcurrentQueue;
+class ConcurrentQueueTests;
+
+
+namespace details
+{
+ struct ConcurrentQueueProducerTypelessBase
+ {
+ ConcurrentQueueProducerTypelessBase* next;
+ std::atomic inactive;
+ ProducerToken* token;
+
+ ConcurrentQueueProducerTypelessBase()
+ : next(nullptr), inactive(false), token(nullptr)
+ {
+ }
+ };
+
+ template struct _hash_32_or_64 {
+ static inline std::uint32_t hash(std::uint32_t h)
+ {
+ // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
+ // Since the thread ID is already unique, all we really want to do is propagate that
+ // uniqueness evenly across all the bits, so that we can use a subset of the bits while
+ // reducing collisions significantly
+ h ^= h >> 16;
+ h *= 0x85ebca6b;
+ h ^= h >> 13;
+ h *= 0xc2b2ae35;
+ return h ^ (h >> 16);
+ }
+ };
+ template<> struct _hash_32_or_64<1> {
+ static inline std::uint64_t hash(std::uint64_t h)
+ {
+ h ^= h >> 33;
+ h *= 0xff51afd7ed558ccd;
+ h ^= h >> 33;
+ h *= 0xc4ceb9fe1a85ec53;
+ return h ^ (h >> 33);
+ }
+ };
+ template struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { };
+
+ static inline size_t hash_thread_id(thread_id_t id)
+ {
+ static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
+ return static_cast(hash_32_or_64::thread_id_hash_t)>::hash(
+ thread_id_converter::prehash(id)));
+ }
+
+ template
+ static inline bool circular_less_than(T a, T b)
+ {
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable: 4554)
+#endif
+ static_assert(std::is_integral::value && !std::numeric_limits::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
+ return static_cast(a - b) > static_cast(static_cast(1) << static_cast(sizeof(T) * CHAR_BIT - 1));
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+ }
+
+ template
+ static inline char* align_for(char* ptr)
+ {
+ const std::size_t alignment = std::alignment_of::value;
+ return ptr + (alignment - (reinterpret_cast(ptr) % alignment)) % alignment;
+ }
+
+ template
+ static inline T ceil_to_pow_2(T x)
+ {
+ static_assert(std::is_integral::value && !std::numeric_limits::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
+
+ // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
+ --x;
+ x |= x >> 1;
+ x |= x >> 2;
+ x |= x >> 4;
+ for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
+ x |= x >> (i << 3);
+ }
+ ++x;
+ return x;
+ }
+
+ template
+ static inline void swap_relaxed(std::atomic& left, std::atomic& right)
+ {
+ T temp = std::move(left.load(std::memory_order_relaxed));
+ left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
+ right.store(std::move(temp), std::memory_order_relaxed);
+ }
+
+ template
+ static inline T const& nomove(T const& x)
+ {
+ return x;
+ }
+
+ template
+ struct nomove_if
+ {
+ template
+ static inline T const& eval(T const& x)
+ {
+ return x;
+ }
+ };
+
+ template<>
+ struct nomove_if
+ {
+ template
+ static inline auto eval(U&& x)
+ -> decltype(std::forward(x))
+ {
+ return std::forward(x);
+ }
+ };
+
+ template
+ static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
+ {
+ return *it;
+ }
+
+#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
+ template struct is_trivially_destructible : std::is_trivially_destructible { };
+#else
+ template struct is_trivially_destructible : std::has_trivial_destructor { };
+#endif
+
+#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
+#ifdef MCDBGQ_USE_RELACY
+ typedef RelacyThreadExitListener ThreadExitListener;
+ typedef RelacyThreadExitNotifier ThreadExitNotifier;
+#else
+ struct ThreadExitListener
+ {
+ typedef void (*callback_t)(void*);
+ callback_t callback;
+ void* userData;
+
+ ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
+ };
+
+
+ class ThreadExitNotifier
+ {
+ public:
+ static void subscribe(ThreadExitListener* listener)
+ {
+ auto& tlsInst = instance();
+ listener->next = tlsInst.tail;
+ tlsInst.tail = listener;
+ }
+
+ static void unsubscribe(ThreadExitListener* listener)
+ {
+ auto& tlsInst = instance();
+ ThreadExitListener** prev = &tlsInst.tail;
+ for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
+ if (ptr == listener) {
+ *prev = ptr->next;
+ break;
+ }
+ prev = &ptr->next;
+ }
+ }
+
+ private:
+ ThreadExitNotifier() : tail(nullptr) { }
+ ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
+ ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
+
+ ~ThreadExitNotifier()
+ {
+ // This thread is about to exit, let everyone know!
+ assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
+ for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
+ ptr->callback(ptr->userData);
+ }
+ }
+
+ // Thread-local
+ static inline ThreadExitNotifier& instance()
+ {
+ static thread_local ThreadExitNotifier notifier;
+ return notifier;
+ }
+
+ private:
+ ThreadExitListener* tail;
+ };
+#endif
+#endif
+
+ template struct static_is_lock_free_num { enum { value = 0 }; };
+ template<> struct static_is_lock_free_num { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
+ template<> struct static_is_lock_free_num { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
+ template<> struct static_is_lock_free_num { enum { value = ATOMIC_INT_LOCK_FREE }; };
+ template<> struct static_is_lock_free_num { enum { value = ATOMIC_LONG_LOCK_FREE }; };
+ template<> struct static_is_lock_free_num { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
+ template struct static_is_lock_free : static_is_lock_free_num::type> { };
+ template<> struct static_is_lock_free { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
+ template struct static_is_lock_free { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
+}
+
+
+struct ProducerToken
+{
+ template
+ explicit ProducerToken(ConcurrentQueue& queue);
+
+ template
+ explicit ProducerToken(BlockingConcurrentQueue& queue);
+
+ ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
+ : producer(other.producer)
+ {
+ other.producer = nullptr;
+ if (producer != nullptr) {
+ producer->token = this;
+ }
+ }
+
+ inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
+ {
+ swap(other);
+ return *this;
+ }
+
+ void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT
+ {
+ std::swap(producer, other.producer);
+ if (producer != nullptr) {
+ producer->token = this;
+ }
+ if (other.producer != nullptr) {
+ other.producer->token = &other;
+ }
+ }
+
+ // A token is always valid unless:
+ // 1) Memory allocation failed during construction
+ // 2) It was moved via the move constructor
+ // (Note: assignment does a swap, leaving both potentially valid)
+ // 3) The associated queue was destroyed
+ // Note that if valid() returns true, that only indicates
+ // that the token is valid for use with a specific queue,
+ // but not which one; that's up to the user to track.
+ inline bool valid() const { return producer != nullptr; }
+
+ ~ProducerToken()
+ {
+ if (producer != nullptr) {
+ producer->token = nullptr;
+ producer->inactive.store(true, std::memory_order_release);
+ }
+ }
+
+ // Disable copying and assignment
+ ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
+ ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
+
+private:
+ template friend class ConcurrentQueue;
+ friend class ConcurrentQueueTests;
+
+protected:
+ details::ConcurrentQueueProducerTypelessBase* producer;
+};
+
+
+struct ConsumerToken
+{
+ template
+ explicit ConsumerToken(ConcurrentQueue& q);
+
+ template
+ explicit ConsumerToken(BlockingConcurrentQueue& q);
+
+ ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
+ : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
+ {
+ }
+
+ inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
+ {
+ swap(other);
+ return *this;
+ }
+
+ void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT
+ {
+ std::swap(initialOffset, other.initialOffset);
+ std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
+ std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
+ std::swap(currentProducer, other.currentProducer);
+ std::swap(desiredProducer, other.desiredProducer);
+ }
+
+ // Disable copying and assignment
+ ConsumerToken(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
+ ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
+
+private:
+ template friend class ConcurrentQueue;
+ friend class ConcurrentQueueTests;
+
+private: // but shared with ConcurrentQueue
+ std::uint32_t initialOffset;
+ std::uint32_t lastKnownGlobalOffset;
+ std::uint32_t itemsConsumedFromCurrent;
+ details::ConcurrentQueueProducerTypelessBase* currentProducer;
+ details::ConcurrentQueueProducerTypelessBase* desiredProducer;
+};
+
+// Need to forward-declare this swap because it's in a namespace.
+// See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
+template
+inline void swap(typename ConcurrentQueue::ImplicitProducerKVP& a, typename ConcurrentQueue::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT;
+
+
+template
+class ConcurrentQueue
+{
+public:
+ typedef ::moodycamel::ProducerToken producer_token_t;
+ typedef ::moodycamel::ConsumerToken consumer_token_t;
+
+ typedef typename Traits::index_t index_t;
+ typedef typename Traits::size_t size_t;
+
+ static const size_t BLOCK_SIZE = static_cast(Traits::BLOCK_SIZE);
+ static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
+ static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
+ static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
+ static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
+ static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
+#pragma warning(disable: 4309) // static_cast: Truncation of constant value
+#endif
+ static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max::value - static_cast(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max::value : ((static_cast(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
+ static_assert(!std::numeric_limits::is_signed && std::is_integral::value, "Traits::size_t must be an unsigned integral type");
+ static_assert(!std::numeric_limits::is_signed && std::is_integral::value, "Traits::index_t must be an unsigned integral type");
+ static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
+ static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
+ static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
+ static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
+ static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
+ static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)), "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
+ static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1, "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
+
+public:
+ // Creates a queue with at least `capacity` element slots; note that the
+ // actual number of elements that can be inserted without additional memory
+ // allocation depends on the number of producers and the block size (e.g. if
+ // the block size is equal to `capacity`, only a single block will be allocated
+ // up-front, which means only a single producer will be able to enqueue elements
+ // without an extra allocation -- blocks aren't shared between producers).
+ // This method is not thread safe -- it is up to the user to ensure that the
+ // queue is fully constructed before it starts being used by other threads (this
+ // includes making the memory effects of construction visible, possibly with a
+ // memory barrier).
+ explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
+ : producerListTail(nullptr),
+ producerCount(0),
+ initialBlockPoolIndex(0),
+ nextExplicitConsumerId(0),
+ globalExplicitConsumerOffset(0)
+ {
+ implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
+ populate_initial_implicit_producer_hash();
+ populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
+
+#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
+ // Track all the producers using a fully-resolved typed list for
+ // each kind; this makes it possible to debug them starting from
+ // the root queue object (otherwise wacky casts are needed that
+ // don't compile in the debugger's expression evaluator).
+ explicitProducers.store(nullptr, std::memory_order_relaxed);
+ implicitProducers.store(nullptr, std::memory_order_relaxed);
+#endif
+ }
+
+ // Computes the correct amount of pre-allocated blocks for you based
+ // on the minimum number of elements you want available at any given
+ // time, and the maximum concurrent number of each type of producer.
+ ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
+ : producerListTail(nullptr),
+ producerCount(0),
+ initialBlockPoolIndex(0),
+ nextExplicitConsumerId(0),
+ globalExplicitConsumerOffset(0)
+ {
+ implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
+ populate_initial_implicit_producer_hash();
+ size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
+ populate_initial_block_list(blocks);
+
+#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
+ explicitProducers.store(nullptr, std::memory_order_relaxed);
+ implicitProducers.store(nullptr, std::memory_order_relaxed);
+#endif
+ }
+
+ // Note: The queue should not be accessed concurrently while it's
+ // being deleted. It's up to the user to synchronize this.
+ // This method is not thread safe.
+ ~ConcurrentQueue()
+ {
+ // Destroy producers
+ auto ptr = producerListTail.load(std::memory_order_relaxed);
+ while (ptr != nullptr) {
+ auto next = ptr->next_prod();
+ if (ptr->token != nullptr) {
+ ptr->token->producer = nullptr;
+ }
+ destroy(ptr);
+ ptr = next;
+ }
+
+ // Destroy implicit producer hash tables
+ if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) {
+ auto hash = implicitProducerHash.load(std::memory_order_relaxed);
+ while (hash != nullptr) {
+ auto prev = hash->prev;
+ if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically
+ for (size_t i = 0; i != hash->capacity; ++i) {
+ hash->entries[i].~ImplicitProducerKVP();
+ }
+ hash->~ImplicitProducerHash();
+ (Traits::free)(hash);
+ }
+ hash = prev;
+ }
+ }
+
+ // Destroy global free list
+ auto block = freeList.head_unsafe();
+ while (block != nullptr) {
+ auto next = block->freeListNext.load(std::memory_order_relaxed);
+ if (block->dynamicallyAllocated) {
+ destroy(block);
+ }
+ block = next;
+ }
+
+ // Destroy initial free list
+ destroy_array(initialBlockPool, initialBlockPoolSize);
+ }
+
+ // Disable copying and copy assignment
+ ConcurrentQueue(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
+ ConcurrentQueue& operator=(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
+
+ // Moving is supported, but note that it is *not* a thread-safe operation.
+ // Nobody can use the queue while it's being moved, and the memory effects
+ // of that move must be propagated to other threads before they can use it.
+ // Note: When a queue is moved, its tokens are still valid but can only be
+ // used with the destination queue (i.e. semantically they are moved along
+ // with the queue itself).
+ ConcurrentQueue(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
+ : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
+ producerCount(other.producerCount.load(std::memory_order_relaxed)),
+ initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
+ initialBlockPool(other.initialBlockPool),
+ initialBlockPoolSize(other.initialBlockPoolSize),
+ freeList(std::move(other.freeList)),
+ nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
+ globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
+ {
+ // Move the other one into this, and leave the other one as an empty queue
+ implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
+ populate_initial_implicit_producer_hash();
+ swap_implicit_producer_hashes(other);
+
+ other.producerListTail.store(nullptr, std::memory_order_relaxed);
+ other.producerCount.store(0, std::memory_order_relaxed);
+ other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
+ other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
+
+#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
+ explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
+ other.explicitProducers.store(nullptr, std::memory_order_relaxed);
+ implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
+ other.implicitProducers.store(nullptr, std::memory_order_relaxed);
+#endif
+
+ other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
+ other.initialBlockPoolSize = 0;
+ other.initialBlockPool = nullptr;
+
+ reown_producers();
+ }
+
+ inline ConcurrentQueue& operator=(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
+ {
+ return swap_internal(other);
+ }
+
+ // Swaps this queue's state with the other's. Not thread-safe.
+ // Swapping two queues does not invalidate their tokens, however
+ // the tokens that were created for one queue must be used with
+ // only the swapped queue (i.e. the tokens are tied to the
+ // queue's movable state, not the object itself).
+ inline void swap(ConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
+ {
+ swap_internal(other);
+ }
+
+private:
+ ConcurrentQueue& swap_internal(ConcurrentQueue& other)
+ {
+ if (this == &other) {
+ return *this;
+ }
+
+ details::swap_relaxed(producerListTail, other.producerListTail);
+ details::swap_relaxed(producerCount, other.producerCount);
+ details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
+ std::swap(initialBlockPool, other.initialBlockPool);
+ std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
+ freeList.swap(other.freeList);
+ details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
+ details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
+
+ swap_implicit_producer_hashes(other);
+
+ reown_producers();
+ other.reown_producers();
+
+#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
+ details::swap_relaxed(explicitProducers, other.explicitProducers);
+ details::swap_relaxed(implicitProducers, other.implicitProducers);
+#endif
+
+ return *this;
+ }
+
+public:
+ // Enqueues a single item (by copying it).
+ // Allocates memory if required. Only fails if memory allocation fails (or implicit
+ // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
+ // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
+ // Thread-safe.
+ inline bool enqueue(T const& item)
+ {
+ if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
+ return inner_enqueue(item);
+ }
+
+ // Enqueues a single item (by moving it, if possible).
+ // Allocates memory if required. Only fails if memory allocation fails (or implicit
+ // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
+ // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
+ // Thread-safe.
+ inline bool enqueue(T&& item)
+ {
+ if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
+ return inner_enqueue(std::move(item));
+ }
+
+ // Enqueues a single item (by copying it) using an explicit producer token.
+ // Allocates memory if required. Only fails if memory allocation fails (or
+ // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
+ // Thread-safe.
+ inline bool enqueue(producer_token_t const& token, T const& item)
+ {
+ return inner_enqueue(token, item);
+ }
+
+ // Enqueues a single item (by moving it, if possible) using an explicit producer token.
+ // Allocates memory if required. Only fails if memory allocation fails (or
+ // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
+ // Thread-safe.
+ inline bool enqueue(producer_token_t const& token, T&& item)
+ {
+ return inner_enqueue(token, std::move(item));
+ }
+
+ // Enqueues several items.
+ // Allocates memory if required. Only fails if memory allocation fails (or
+ // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
+ // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
+ // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
+ // Thread-safe.
+ template
+ bool enqueue_bulk(It itemFirst, size_t count)
+ {
+ if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
+ return inner_enqueue_bulk(itemFirst, count);
+ }
+
+ // Enqueues several items using an explicit producer token.
+ // Allocates memory if required. Only fails if memory allocation fails
+ // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
+ // Note: Use std::make_move_iterator if the elements should be moved
+ // instead of copied.
+ // Thread-safe.
+ template
+ bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
+ {
+ return inner_enqueue_bulk(token, itemFirst, count);
+ }
+
+ // Enqueues a single item (by copying it).
+ // Does not allocate memory. Fails if not enough room to enqueue (or implicit
+ // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
+ // is 0).
+ // Thread-safe.
+ inline bool try_enqueue(T const& item)
+ {
+ if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
+ return inner_enqueue(item);
+ }
+
+ // Enqueues a single item (by moving it, if possible).
+ // Does not allocate memory (except for one-time implicit producer).
+ // Fails if not enough room to enqueue (or implicit production is
+ // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
+ // Thread-safe.
+ inline bool try_enqueue(T&& item)
+ {
+ if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
+ return inner_enqueue(std::move(item));
+ }
+
+ // Enqueues a single item (by copying it) using an explicit producer token.
+ // Does not allocate memory. Fails if not enough room to enqueue.
+ // Thread-safe.
+ inline bool try_enqueue(producer_token_t const& token, T const& item)
+ {
+ return inner_enqueue(token, item);
+ }
+
+ // Enqueues a single item (by moving it, if possible) using an explicit producer token.
+ // Does not allocate memory. Fails if not enough room to enqueue.
+ // Thread-safe.
+ inline bool try_enqueue(producer_token_t const& token, T&& item)
+ {
+ return inner_enqueue(token, std::move(item));
+ }
+
+ // Enqueues several items.
+ // Does not allocate memory (except for one-time implicit producer).
+ // Fails if not enough room to enqueue (or implicit production is
+ // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
+ // Note: Use std::make_move_iterator if the elements should be moved
+ // instead of copied.
+ // Thread-safe.
+ template
+ bool try_enqueue_bulk(It itemFirst, size_t count)
+ {
+ if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
+ return inner_enqueue_bulk(itemFirst, count);
+ }
+
+ // Enqueues several items using an explicit producer token.
+ // Does not allocate memory. Fails if not enough room to enqueue.
+ // Note: Use std::make_move_iterator if the elements should be moved
+ // instead of copied.
+ // Thread-safe.
+ template
+ bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
+ {
+ return inner_enqueue_bulk(token, itemFirst, count);
+ }
+
+
+
+ // Attempts to dequeue from the queue.
+ // Returns false if all producer streams appeared empty at the time they
+ // were checked (so, the queue is likely but not guaranteed to be empty).
+ // Never allocates. Thread-safe.
+ template
+ bool try_dequeue(U& item)
+ {
+ // Instead of simply trying each producer in turn (which could cause needless contention on the first
+ // producer), we score them heuristically.
+ size_t nonEmptyCount = 0;
+ ProducerBase* best = nullptr;
+ size_t bestSize = 0;
+ for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
+ auto size = ptr->size_approx();
+ if (size > 0) {
+ if (size > bestSize) {
+ bestSize = size;
+ best = ptr;
+ }
+ ++nonEmptyCount;
+ }
+ }
+
+ // If there was at least one non-empty queue but it appears empty at the time
+ // we try to dequeue from it, we need to make sure every queue's been tried
+ if (nonEmptyCount > 0) {
+ if ((details::likely)(best->dequeue(item))) {
+ return true;
+ }
+ for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
+ if (ptr != best && ptr->dequeue(item)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ // Attempts to dequeue from the queue.
+ // Returns false if all producer streams appeared empty at the time they
+ // were checked (so, the queue is likely but not guaranteed to be empty).
+ // This differs from the try_dequeue(item) method in that this one does
+ // not attempt to reduce contention by interleaving the order that producer
+ // streams are dequeued from. So, using this method can reduce overall throughput
+ // under contention, but will give more predictable results in single-threaded
+ // consumer scenarios. This is mostly only useful for internal unit tests.
+ // Never allocates. Thread-safe.
+ template
+ bool try_dequeue_non_interleaved(U& item)
+ {
+ for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
+ if (ptr->dequeue(item)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Attempts to dequeue from the queue using an explicit consumer token.
+ // Returns false if all producer streams appeared empty at the time they
+ // were checked (so, the queue is likely but not guaranteed to be empty).
+ // Never allocates. Thread-safe.
+ template
+ bool try_dequeue(consumer_token_t& token, U& item)
+ {
+ // The idea is roughly as follows:
+ // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
+ // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
+ // If there's no items where you're supposed to be, keep moving until you find a producer with some items
+ // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
+
+ if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
+ if (!update_current_producer_after_rotation(token)) {
+ return false;
+ }
+ }
+
+ // If there was at least one non-empty queue but it appears empty at the time
+ // we try to dequeue from it, we need to make sure every queue's been tried
+ if (static_cast(token.currentProducer)->dequeue(item)) {
+ if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
+ globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
+ }
+ return true;
+ }
+
+ auto tail = producerListTail.load(std::memory_order_acquire);
+ auto ptr = static_cast(token.currentProducer)->next_prod();
+ if (ptr == nullptr) {
+ ptr = tail;
+ }
+ while (ptr != static_cast(token.currentProducer)) {
+ if (ptr->dequeue(item)) {
+ token.currentProducer = ptr;
+ token.itemsConsumedFromCurrent = 1;
+ return true;
+ }
+ ptr = ptr->next_prod();
+ if (ptr == nullptr) {
+ ptr = tail;
+ }
+ }
+ return false;
+ }
+
+ // Attempts to dequeue several elements from the queue.
+ // Returns the number of items actually dequeued.
+ // Returns 0 if all producer streams appeared empty at the time they
+ // were checked (so, the queue is likely but not guaranteed to be empty).
+ // Never allocates. Thread-safe.
+ template
+ size_t try_dequeue_bulk(It itemFirst, size_t max)
+ {
+ size_t count = 0;
+ for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
+ count += ptr->dequeue_bulk(itemFirst, max - count);
+ if (count == max) {
+ break;
+ }
+ }
+ return count;
+ }
+
+ // Attempts to dequeue several elements from the queue using an explicit consumer token.
+ // Returns the number of items actually dequeued.
+ // Returns 0 if all producer streams appeared empty at the time they
+ // were checked (so, the queue is likely but not guaranteed to be empty).
+ // Never allocates. Thread-safe.
+ template
+ size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
+ {
+ if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
+ if (!update_current_producer_after_rotation(token)) {
+ return 0;
+ }
+ }
+
+ size_t count = static_cast(token.currentProducer)->dequeue_bulk(itemFirst, max);
+ if (count == max) {
+ if ((token.itemsConsumedFromCurrent += static_cast(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
+ globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
+ }
+ return max;
+ }
+ token.itemsConsumedFromCurrent += static_cast(count);
+ max -= count;
+
+ auto tail = producerListTail.load(std::memory_order_acquire);
+ auto ptr = static_cast(token.currentProducer)->next_prod();
+ if (ptr == nullptr) {
+ ptr = tail;
+ }
+ while (ptr != static_cast(token.currentProducer)) {
+ auto dequeued = ptr->dequeue_bulk(itemFirst, max);
+ count += dequeued;
+ if (dequeued != 0) {
+ token.currentProducer = ptr;
+ token.itemsConsumedFromCurrent = static_cast(dequeued);
+ }
+ if (dequeued == max) {
+ break;
+ }
+ max -= dequeued;
+ ptr = ptr->next_prod();
+ if (ptr == nullptr) {
+ ptr = tail;
+ }
+ }
+ return count;
+ }
+
+
+
+ // Attempts to dequeue from a specific producer's inner queue.
+ // If you happen to know which producer you want to dequeue from, this
+ // is significantly faster than using the general-case try_dequeue methods.
+ // Returns false if the producer's queue appeared empty at the time it
+ // was checked (so, the queue is likely but not guaranteed to be empty).
+ // Never allocates. Thread-safe.
+ template
+ inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
+ {
+ return static_cast(producer.producer)->dequeue(item);
+ }
+
+ // Attempts to dequeue several elements from a specific producer's inner queue.
+ // Returns the number of items actually dequeued.
+ // If you happen to know which producer you want to dequeue from, this
+ // is significantly faster than using the general-case try_dequeue methods.
+ // Returns 0 if the producer's queue appeared empty at the time it
+ // was checked (so, the queue is likely but not guaranteed to be empty).
+ // Never allocates. Thread-safe.
+ template
+ inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
+ {
+ return static_cast(producer.producer)->dequeue_bulk(itemFirst, max);
+ }
+
+
+ // Returns an estimate of the total number of elements currently in the queue. This
+ // estimate is only accurate if the queue has completely stabilized before it is called
+ // (i.e. all enqueue and dequeue operations have completed and their memory effects are
+ // visible on the calling thread, and no further operations start while this method is
+ // being called).
+ // Thread-safe.
+ size_t size_approx() const
+ {
+ size_t size = 0;
+ for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
+ size += ptr->size_approx();
+ }
+ return size;
+ }
+
+
+ // Returns true if the underlying atomic variables used by
+ // the queue are lock-free (they should be on most platforms).
+ // Thread-safe.
+ static bool is_lock_free()
+ {
+ return
+ details::static_is_lock_free::value == 2 &&
+ details::static_is_lock_free::value == 2 &&
+ details::static_is_lock_free::value == 2 &&
+ details::static_is_lock_free::value == 2 &&
+ details::static_is_lock_free::value == 2 &&
+ details::static_is_lock_free::thread_id_numeric_size_t>::value == 2;
+ }
+
+
+private:
+ friend struct ProducerToken;
+ friend struct ConsumerToken;
+ struct ExplicitProducer;
+ friend struct ExplicitProducer;
+ struct ImplicitProducer;
+ friend struct ImplicitProducer;
+ friend class ConcurrentQueueTests;
+
+ enum AllocationMode { CanAlloc, CannotAlloc };
+
+
+ ///////////////////////////////
+ // Queue methods
+ ///////////////////////////////
+
+ template
+ inline bool inner_enqueue(producer_token_t const& token, U&& element)
+ {
+ return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue(std::forward(element));
+ }
+
+ template
+ inline bool inner_enqueue(U&& element)
+ {
+ auto producer = get_or_add_implicit_producer();
+ return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue(std::forward(element));
+ }
+
+ template
+ inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
+ {
+ return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk(itemFirst, count);
+ }
+
+ template
+ inline bool inner_enqueue_bulk(It itemFirst, size_t count)
+ {
+ auto producer = get_or_add_implicit_producer();
+ return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk(itemFirst, count);
+ }
+
+ inline bool update_current_producer_after_rotation(consumer_token_t& token)
+ {
+ // Ah, there's been a rotation, figure out where we should be!
+ auto tail = producerListTail.load(std::memory_order_acquire);
+ if (token.desiredProducer == nullptr && tail == nullptr) {
+ return false;
+ }
+ auto prodCount = producerCount.load(std::memory_order_relaxed);
+ auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
+ if ((details::unlikely)(token.desiredProducer == nullptr)) {
+ // Aha, first time we're dequeueing anything.
+ // Figure out our local position
+ // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
+ std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
+ token.desiredProducer = tail;
+ for (std::uint32_t i = 0; i != offset; ++i) {
+ token.desiredProducer = static_cast(token.desiredProducer)->next_prod();
+ if (token.desiredProducer == nullptr) {
+ token.desiredProducer = tail;
+ }
+ }
+ }
+
+ std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
+ if (delta >= prodCount) {
+ delta = delta % prodCount;
+ }
+ for (std::uint32_t i = 0; i != delta; ++i) {
+ token.desiredProducer = static_cast(token.desiredProducer)->next_prod();
+ if (token.desiredProducer == nullptr) {
+ token.desiredProducer = tail;
+ }
+ }
+
+ token.lastKnownGlobalOffset = globalOffset;
+ token.currentProducer = token.desiredProducer;
+ token.itemsConsumedFromCurrent = 0;
+ return true;
+ }
+
+
+ ///////////////////////////
+ // Free list
+ ///////////////////////////
+
+ template
+ struct FreeListNode
+ {
+ FreeListNode() : freeListRefs(0), freeListNext(nullptr) { }
+
+ std::atomic freeListRefs;
+ std::atomic freeListNext;
+ };
+
+ // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
+ // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
+ // speedy under low contention.
+ template // N must inherit FreeListNode or have the same fields (and initialization of them)
+ struct FreeList
+ {
+ FreeList() : freeListHead(nullptr) { }
+ FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
+ void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
+
+ FreeList(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
+ FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
+
+ inline void add(N* node)
+ {
+#ifdef MCDBGQ_NOLOCKFREE_FREELIST
+ debug::DebugLock lock(mutex);
+#endif
+ // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
+ // set it using a fetch_add
+ if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
+ // Oh look! We were the last ones referencing this node, and we know
+ // we want to add it to the free list, so let's do it!
+ add_knowing_refcount_is_zero(node);
+ }
+ }
+
+ inline N* try_get()
+ {
+#ifdef MCDBGQ_NOLOCKFREE_FREELIST
+ debug::DebugLock lock(mutex);
+#endif
+ auto head = freeListHead.load(std::memory_order_acquire);
+ while (head != nullptr) {
+ auto prevHead = head;
+ auto refs = head->freeListRefs.load(std::memory_order_relaxed);
+ if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
+ head = freeListHead.load(std::memory_order_acquire);
+ continue;
+ }
+
+ // Good, reference count has been incremented (it wasn't at zero), which means we can read the
+ // next and not worry about it changing between now and the time we do the CAS
+ auto next = head->freeListNext.load(std::memory_order_relaxed);
+ if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
+ // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
+ // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
+ assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
+
+ // Decrease refcount twice, once for our ref, and once for the list's ref
+ head->freeListRefs.fetch_sub(2, std::memory_order_release);
+ return head;
+ }
+
+ // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
+ // Note that we don't need to release any memory effects, but we do need to ensure that the reference
+ // count decrement happens-after the CAS on the head.
+ refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
+ if (refs == SHOULD_BE_ON_FREELIST + 1) {
+ add_knowing_refcount_is_zero(prevHead);
+ }
+ }
+
+ return nullptr;
+ }
+
+ // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
+ N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
+
+ private:
+ inline void add_knowing_refcount_is_zero(N* node)
+ {
+ // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
+ // only one copy of this method per node at a time, i.e. the single thread case), then we know
+ // we can safely change the next pointer of the node; however, once the refcount is back above
+ // zero, then other threads could increase it (happens under heavy contention, when the refcount
+ // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
+ // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
+ // to add the node to the actual list fails, decrease the refcount and leave the add operation to
+ // the next thread who puts the refcount back at zero (which could be us, hence the loop).
+ auto head = freeListHead.load(std::memory_order_relaxed);
+ while (true) {
+ node->freeListNext.store(head, std::memory_order_relaxed);
+ node->freeListRefs.store(1, std::memory_order_release);
+ if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
+ // Hmm, the add failed, but we can only try again when the refcount goes back to zero
+ if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
+ continue;
+ }
+ }
+ return;
+ }
+ }
+
+ private:
+ // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
+ std::atomic freeListHead;
+
+ static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
+ static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
+
+#ifdef MCDBGQ_NOLOCKFREE_FREELIST
+ debug::DebugMutex mutex;
+#endif
+ };
+
+
+ ///////////////////////////
+ // Block
+ ///////////////////////////
+
+ enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
+
+ struct Block
+ {
+ Block()
+ : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true)
+ {
+#ifdef MCDBGQ_TRACKMEM
+ owner = nullptr;
+#endif
+ }
+
+ template
+ inline bool is_empty() const
+ {
+ if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
+ // Check flags
+ for (size_t i = 0; i < BLOCK_SIZE; ++i) {
+ if (!emptyFlags[i].load(std::memory_order_relaxed)) {
+ return false;
+ }
+ }
+
+ // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
+ std::atomic_thread_fence(std::memory_order_acquire);
+ return true;
+ }
+ else {
+ // Check counter
+ if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ return true;
+ }
+ assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
+ return false;
+ }
+ }
+
+ // Returns true if the block is now empty (does not apply in explicit context)
+ template
+ inline bool set_empty(index_t i)
+ {
+ if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
+ // Set flag
+ assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast(i & static_cast(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
+ emptyFlags[BLOCK_SIZE - 1 - static_cast(i & static_cast(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
+ return false;
+ }
+ else {
+ // Increment counter
+ auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
+ assert(prevVal < BLOCK_SIZE);
+ return prevVal == BLOCK_SIZE - 1;
+ }
+ }
+
+ // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
+ // Returns true if the block is now empty (does not apply in explicit context).
+ template
+ inline bool set_many_empty(index_t i, size_t count)
+ {
+ if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
+ // Set flags
+ std::atomic_thread_fence(std::memory_order_release);
+ i = BLOCK_SIZE - 1 - static_cast(i & static_cast(BLOCK_SIZE - 1)) - count + 1;
+ for (size_t j = 0; j != count; ++j) {
+ assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
+ emptyFlags[i + j].store(true, std::memory_order_relaxed);
+ }
+ return false;
+ }
+ else {
+ // Increment counter
+ auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
+ assert(prevVal + count <= BLOCK_SIZE);
+ return prevVal + count == BLOCK_SIZE;
+ }
+ }
+
+ template
+ inline void set_all_empty()
+ {
+ if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
+ // Set all flags
+ for (size_t i = 0; i != BLOCK_SIZE; ++i) {
+ emptyFlags[i].store(true, std::memory_order_relaxed);
+ }
+ }
+ else {
+ // Reset counter
+ elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
+ }
+ }
+
+ template
+ inline void reset_empty()
+ {
+ if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
+ // Reset flags
+ for (size_t i = 0; i != BLOCK_SIZE; ++i) {
+ emptyFlags[i].store(false, std::memory_order_relaxed);
+ }
+ }
+ else {
+ // Reset counter
+ elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
+ }
+ }
+
+ inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast(static_cast(elements)) + static_cast(idx & static_cast(BLOCK_SIZE - 1)); }
+ inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast(static_cast(elements)) + static_cast(idx & static_cast(BLOCK_SIZE - 1)); }
+
+ private:
+ // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of
+ // addresses returned by malloc, that alignment will be preserved. Apparently clang actually
+ // generates code that uses this assumption for AVX instructions in some cases. Ideally, we
+ // should also align Block to the alignment of T in case it's higher than malloc's 16-byte
+ // alignment, but this is hard to do in a cross-platform way. Assert for this case:
+ static_assert(std::alignment_of::value <= std::alignment_of::value, "The queue does not support super-aligned types at this time");
+ // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since
+ // otherwise the appropriate padding will not be added at the end of Block in order to make
+ // arrays of Blocks all be properly aligned (not just the first one). We use a union to force
+ // this.
+ union {
+ char elements[sizeof(T) * BLOCK_SIZE];
+ details::max_align_t dummy;
+ };
+ public:
+ Block* next;
+ std::atomic elementsCompletelyDequeued;
+ std::atomic emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
+ public:
+ std::atomic freeListRefs;
+ std::atomic freeListNext;
+ std::atomic shouldBeOnFreeList;
+ bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
+
+#ifdef MCDBGQ_TRACKMEM
+ void* owner;
+#endif
+ };
+ static_assert(std::alignment_of::value >= std::alignment_of::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
+
+
+#ifdef MCDBGQ_TRACKMEM
+public:
+ struct MemStats;
+private:
+#endif
+
+ ///////////////////////////
+ // Producer base
+ ///////////////////////////
+
+ struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase
+ {
+ ProducerBase(ConcurrentQueue* parent_, bool isExplicit_) :
+ tailIndex(0),
+ headIndex(0),
+ dequeueOptimisticCount(0),
+ dequeueOvercommit(0),
+ tailBlock(nullptr),
+ isExplicit(isExplicit_),
+ parent(parent_)
+ {
+ }
+
+ virtual ~ProducerBase() { };
+
+ template
+ inline bool dequeue(U& element)
+ {
+ if (isExplicit) {
+ return static_cast(this)->dequeue(element);
+ }
+ else {
+ return static_cast(this)->dequeue(element);
+ }
+ }
+
+ template
+ inline size_t dequeue_bulk(It& itemFirst, size_t max)
+ {
+ if (isExplicit) {
+ return static_cast(this)->dequeue_bulk(itemFirst, max);
+ }
+ else {
+ return static_cast(this)->dequeue_bulk(itemFirst, max);
+ }
+ }
+
+ inline ProducerBase* next_prod() const { return static_cast(next); }
+
+ inline size_t size_approx() const
+ {
+ auto tail = tailIndex.load(std::memory_order_relaxed);
+ auto head = headIndex.load(std::memory_order_relaxed);
+ return details::circular_less_than(head, tail) ? static_cast(tail - head) : 0;
+ }
+
+ inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
+ protected:
+ std::atomic tailIndex; // Where to enqueue to next
+ std::atomic headIndex; // Where to dequeue from next
+
+ std::atomic dequeueOptimisticCount;
+ std::atomic dequeueOvercommit;
+
+ Block* tailBlock;
+
+ public:
+ bool isExplicit;
+ ConcurrentQueue* parent;
+
+ protected:
+#ifdef MCDBGQ_TRACKMEM
+ friend struct MemStats;
+#endif
+ };
+
+
+ ///////////////////////////
+ // Explicit queue
+ ///////////////////////////
+
+ struct ExplicitProducer : public ProducerBase
+ {
+ explicit ExplicitProducer(ConcurrentQueue* parent) :
+ ProducerBase(parent, true),
+ blockIndex(nullptr),
+ pr_blockIndexSlotsUsed(0),
+ pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
+ pr_blockIndexFront(0),
+ pr_blockIndexEntries(nullptr),
+ pr_blockIndexRaw(nullptr)
+ {
+ size_t poolBasedIndexSize = details::ceil_to_pow_2(parent->initialBlockPoolSize) >> 1;
+ if (poolBasedIndexSize > pr_blockIndexSize) {
+ pr_blockIndexSize = poolBasedIndexSize;
+ }
+
+ new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
+ }
+
+ ~ExplicitProducer()
+ {
+ // Destruct any elements not yet dequeued.
+ // Since we're in the destructor, we can assume all elements
+ // are either completely dequeued or completely not (no halfways).
+ if (this->tailBlock != nullptr) { // Note this means there must be a block index too
+ // First find the block that's partially dequeued, if any
+ Block* halfDequeuedBlock = nullptr;
+ if ((this->headIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)) != 0) {
+ // The head's not on a block boundary, meaning a block somewhere is partially dequeued
+ // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
+ size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
+ while (details::circular_less_than(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
+ i = (i + 1) & (pr_blockIndexSize - 1);
+ }
+ assert(details::circular_less_than(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
+ halfDequeuedBlock = pr_blockIndexEntries[i].block;
+ }
+
+ // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
+ auto block = this->tailBlock;
+ do {
+ block = block->next;
+ if (block->ConcurrentQueue::Block::template is_empty()) {
+ continue;
+ }
+
+ size_t i = 0; // Offset into block
+ if (block == halfDequeuedBlock) {
+ i = static_cast(this->headIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1));
+ }
+
+ // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
+ auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast(this->tailIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1));
+ while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
+ (*block)[i++]->~T();
+ }
+ } while (block != this->tailBlock);
+ }
+
+ // Destroy all blocks that we own
+ if (this->tailBlock != nullptr) {
+ auto block = this->tailBlock;
+ do {
+ auto nextBlock = block->next;
+ if (block->dynamicallyAllocated) {
+ destroy(block);
+ }
+ else {
+ this->parent->add_block_to_free_list(block);
+ }
+ block = nextBlock;
+ } while (block != this->tailBlock);
+ }
+
+ // Destroy the block indices
+ auto header = static_cast(pr_blockIndexRaw);
+ while (header != nullptr) {
+ auto prev = static_cast(header->prev);
+ header->~BlockIndexHeader();
+ (Traits::free)(header);
+ header = prev;
+ }
+ }
+
+ template
+ inline bool enqueue(U&& element)
+ {
+ index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
+ index_t newTailIndex = 1 + currentTailIndex;
+ if ((currentTailIndex & static_cast(BLOCK_SIZE - 1)) == 0) {
+ // We reached the end of a block, start a new one
+ auto startBlock = this->tailBlock;
+ auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
+ if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty()) {
+ // We can re-use the block ahead of us, it's empty!
+ this->tailBlock = this->tailBlock->next;
+ this->tailBlock->ConcurrentQueue::Block::template reset_empty();
+
+ // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
+ // last block from it first -- except instead of removing then adding, we can just overwrite).
+ // Note that there must be a valid block index here, since even if allocation failed in the ctor,
+ // it would have been re-attempted when adding the first block to the queue; since there is such
+ // a block, a block index must have been successfully allocated.
+ }
+ else {
+ // Whatever head value we see here is >= the last value we saw here (relatively),
+ // and <= its current value. Since we have the most recent tail, the head must be
+ // <= to it.
+ auto head = this->headIndex.load(std::memory_order_relaxed);
+ assert(!details::circular_less_than(currentTailIndex, head));
+ if (!details::circular_less_than(head, currentTailIndex + BLOCK_SIZE)
+ || (MAX_SUBQUEUE_SIZE != details::const_numeric_max::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
+ // We can't enqueue in another block because there's not enough leeway -- the
+ // tail could surpass the head by the time the block fills up! (Or we'll exceed
+ // the size limit, if the second part of the condition was true.)
+ return false;
+ }
+ // We're going to need a new block; check that the block index has room
+ if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
+ // Hmm, the circular block index is already full -- we'll need
+ // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
+ // the initial allocation failed in the constructor.
+
+ if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
+ return false;
+ }
+ }
+
+ // Insert a new block in the circular linked list
+ auto newBlock = this->parent->ConcurrentQueue::template requisition_block();
+ if (newBlock == nullptr) {
+ return false;
+ }
+#ifdef MCDBGQ_TRACKMEM
+ newBlock->owner = this;
+#endif
+ newBlock->ConcurrentQueue::Block::template reset_empty();
+ if (this->tailBlock == nullptr) {
+ newBlock->next = newBlock;
+ }
+ else {
+ newBlock->next = this->tailBlock->next;
+ this->tailBlock->next = newBlock;
+ }
+ this->tailBlock = newBlock;
+ ++pr_blockIndexSlotsUsed;
+ }
+
+ if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new ((T*)nullptr) T(std::forward(element)))) {
+ // The constructor may throw. We want the element not to appear in the queue in
+ // that case (without corrupting the queue):
+ MOODYCAMEL_TRY {
+ new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element));
+ }
+ MOODYCAMEL_CATCH (...) {
+ // Revert change to the current block, but leave the new block available
+ // for next time
+ pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
+ this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
+ MOODYCAMEL_RETHROW;
+ }
+ }
+ else {
+ (void)startBlock;
+ (void)originalBlockIndexSlotsUsed;
+ }
+
+ // Add block to block index
+ auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
+ entry.base = currentTailIndex;
+ entry.block = this->tailBlock;
+ blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
+ pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
+
+ if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new ((T*)nullptr) T(std::forward(element)))) {
+ this->tailIndex.store(newTailIndex, std::memory_order_release);
+ return true;
+ }
+ }
+
+ // Enqueue
+ new ((*this->tailBlock)[currentTailIndex]) T(std::forward(element));
+
+ this->tailIndex.store(newTailIndex, std::memory_order_release);
+ return true;
+ }
+
+ template
+ bool dequeue(U& element)
+ {
+ auto tail = this->tailIndex.load(std::memory_order_relaxed);
+ auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
+ if (details::circular_less_than