From 5136e4e0797f3c3dfcae0cdb43c1b1d5d92617e3 Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Wed, 9 Apr 2025 13:23:29 +0300 Subject: [PATCH 01/15] clang-format: tune some options We always break line after template declarations - let's enable corresponding clang-format option. Also, we usually try to fit constructor initializer list on a single line - let's set corresponding option as well. --- .clang-format | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.clang-format b/.clang-format index c6304c770..49fbe83f3 100644 --- a/.clang-format +++ b/.clang-format @@ -8,9 +8,11 @@ UseTab: Always AccessModifierOffset: -8 AlignAfterOpenBracket: Align AlwaysBreakAfterReturnType: TopLevel +AlwaysBreakTemplateDeclarations: Yes ColumnLimit: 120 ContinuationIndentWidth: 8 Cpp11BracedListStyle: true SpaceInEmptyBlock: false +PackConstructorInitializers: CurrentLine PointerAlignment: Right ReferenceAlignment: Right From b53d6e153494bb538bbe56585eb7d7ce511c1504 Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Tue, 8 Apr 2025 19:25:53 +0300 Subject: [PATCH 02/15] logger: make logger thread-safe Current logger implementation is not thread-safe, so the commit rewrites it. Firstly, `localtime` is not thread-safe since it uses a static buffer under the hood. We don't use its result anyway (we somewhy obtain current time but don't print it), so let's simply drop it. Secondly, despite the fact `std::cout` and `std::cerr` are thread-safe according to C++11 standard, some compilers don't stick to this contract so they shouldn't be actually used from multiple threads without synchronization. Let's simply use `write` instead - it is guaranteed to be thread-safe. Part of #110 --- src/Utils/Logger.hpp | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/Utils/Logger.hpp b/src/Utils/Logger.hpp index 4f82cc412..aac420cfc 100644 --- a/src/Utils/Logger.hpp +++ b/src/Utils/Logger.hpp @@ -31,8 +31,9 @@ */ #include +#include -#include +#include #include enum LogLevel { @@ -66,23 +67,32 @@ class Logger { Logger(LogLevel lvl) : m_LogLvl(lvl) {}; template - void log(std::ostream& strm, LogLevel log_lvl, - const char *file, int line, ARGS&& ...args) + void log(int fd, LogLevel log_lvl, const char *file, int line, ARGS &&...args) { if (!isLogPossible(log_lvl)) return; - time_t rawTime; - time(&rawTime); - struct tm *timeInfo = localtime(&rawTime); - char timeString[10]; - strftime(timeString, sizeof(timeString), "%H:%M:%S", timeInfo); - // The line below is commented for compatibility with previous - // version. I'm not sure it was bug or feature, but the time, - // filename and line was not printed. + /* File and line were never printed (by mistake, I guess). */ (void)file; (void)line; - //strm << timeString << " " << file << ':' << line << ' '; + /* + * Standard streams (`cout` and `cerr`) are thread-safe + * according to C++11 or more modern standards, but it turns + * out that some compilers do not stick to this contract. + * + * Let's use `stringstream` to build a string and then write + * it manually with `write` since it is guaranteed to be + * thread-safe. Yes, that's slower because of unnnecessary + * allocations and copies, but the log is used generally for + * debug or logging exceptional cases (errors) anyway, so + * that's not a big deal. + * + * Related: https://github.com/llvm/llvm-project/issues/51851 + */ + std::stringstream strm; strm << log_lvl << ": "; (strm << ... << std::forward(args)) << '\n'; + std::string str = strm.str(); + ssize_t rc = write(fd, std::data(str), std::size(str)); + (void)rc; } void setLogLevel(LogLevel lvl) { @@ -106,8 +116,8 @@ template void log(LogLevel level, const char *file, int line, ARGS&& ...args) { - gLogger.log(level == ERROR ? std::cerr : std::cout, - level, file, line, std::forward(args)...); + int fd = level == ERROR ? STDERR_FILENO : STDOUT_FILENO; + gLogger.log(fd, level, file, line, std::forward(args)...); } #define LOG_DEBUG(...) log(DEBUG, __FILE__, __LINE__, __VA_ARGS__) From 02409d8388dc1f8d7fdbd226366f10deeadefbe9 Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Wed, 9 Apr 2025 14:49:55 +0300 Subject: [PATCH 03/15] client: store sync for request encoders separately Each connection has an underying request encoder, and currently all encoders share the same `sync` field (it is marked as `static`). It breaks multithreaded scenarios because connections can be used concurrently from several threads. The commit makes the field non-static. Part of #110 --- src/Client/Connection.hpp | 22 +++++++++++----------- src/Client/RequestEncoder.hpp | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/Client/Connection.hpp b/src/Client/Connection.hpp index d9d745f4f..fd31230dd 100644 --- a/src/Client/Connection.hpp +++ b/src/Client/Connection.hpp @@ -612,7 +612,7 @@ Connection::execute(const std::string& statement, const T& { impl->enc.encodeExecute(statement, parameters); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -622,7 +622,7 @@ Connection::execute(unsigned int stmt_id, const T& paramete { impl->enc.encodeExecute(stmt_id, parameters); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -631,7 +631,7 @@ Connection::prepare(const std::string& statement) { impl->enc.encodePrepare(statement); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -641,7 +641,7 @@ Connection::call(const std::string &func, const T &args) { impl->enc.encodeCall(func, args); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -650,7 +650,7 @@ Connection::ping() { impl->enc.encodePing(); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -660,7 +660,7 @@ Connection::insert(const T &tuple, uint32_t space_id) { impl->enc.encodeInsert(tuple, space_id); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -670,7 +670,7 @@ Connection::replace(const T &tuple, uint32_t space_id) { impl->enc.encodeReplace(tuple, space_id); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -681,7 +681,7 @@ Connection::delete_(const T &key, uint32_t space_id, { impl->enc.encodeDelete(key, space_id, index_id); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -692,7 +692,7 @@ Connection::update(const K &key, const T &tuple, { impl->enc.encodeUpdate(key, tuple, space_id, index_id); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -703,7 +703,7 @@ Connection::upsert(const T &tuple, const O &ops, { impl->enc.encodeUpsert(tuple, ops, space_id, index_base); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -716,7 +716,7 @@ Connection::select(const T &key, uint32_t space_id, impl->enc.encodeSelect(key, space_id, index_id, limit, offset, iterator); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template diff --git a/src/Client/RequestEncoder.hpp b/src/Client/RequestEncoder.hpp index 2752d3b18..b4862615f 100644 --- a/src/Client/RequestEncoder.hpp +++ b/src/Client/RequestEncoder.hpp @@ -97,12 +97,12 @@ class RequestEncoder { const Greeting &greet); /** Sync value is used as request id. */ - static size_t getSync() { return sync; } + size_t getSync() { return sync; } static constexpr size_t PREHEADER_SIZE = 5; private: void encodeHeader(int request); BUFFER &m_Buf; - inline static ssize_t sync = 0; + ssize_t sync = 0; }; template From 9b0811600369b2cd35cf6c6e37ec11676e23f0b6 Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Wed, 9 Apr 2025 14:56:32 +0300 Subject: [PATCH 04/15] client: make gc step non-static All connections has GC of input buffer, and step counter is static now. It breaks multithreaded scenarios since several connections can be use concurrently from several threads, so let's move the GC step counter right to `Connection` class. Part of #110 --- src/Client/Connection.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Client/Connection.hpp b/src/Client/Connection.hpp index fd31230dd..ef5cd35e3 100644 --- a/src/Client/Connection.hpp +++ b/src/Client/Connection.hpp @@ -250,6 +250,7 @@ class Connection private: ConnectionImpl *impl; static constexpr size_t GC_STEP_CNT = 100; + size_t gc_step = 0; template rid_t insert(const T &tuple, uint32_t space_id); @@ -521,8 +522,7 @@ template static void inputBufGC(Connection &conn) { - static int gc_step = 0; - if ((gc_step++ % Connection::GC_STEP_CNT) == 0) { + if ((conn.gc_step++ % Connection::GC_STEP_CNT) == 0) { LOG_DEBUG("Flushed input buffer of the connection %p", &conn); conn.impl->inBuf.flush(); } From a95e4c5a81904ce66bd9316642a6701707ab075b Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Wed, 9 Apr 2025 14:58:51 +0300 Subject: [PATCH 05/15] LibevNetProvider: use different ev loops in different providers Currently, the provider uses default loop, which is static. Let's create a new loop for each provider to allow to use them in different threads. Part of #110 --- src/Client/LibevNetProvider.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Client/LibevNetProvider.hpp b/src/Client/LibevNetProvider.hpp index 97577c9d7..fa4a81280 100644 --- a/src/Client/LibevNetProvider.hpp +++ b/src/Client/LibevNetProvider.hpp @@ -270,7 +270,7 @@ LibevNetProvider::LibevNetProvider(Connector_t &connector, m_Connector(connector), m_Loop(loop), m_IsOwnLoop(false) { if (m_Loop == nullptr) { - m_Loop = ev_default_loop(0); + m_Loop = ev_loop_new(0); m_IsOwnLoop = true; } assert(m_Loop != nullptr); From fa759785f1abfa515f342c0cf7fa550c7ea20180 Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Wed, 9 Apr 2025 16:29:25 +0300 Subject: [PATCH 06/15] mempool: implement move semantics for MempoolInstance Mempool uses default implementations of move and copy semantics and it's not great - it owns resources, hence, trivial implementations are not appropriate. Let's implement convenient move constructor and operator. What's about copying ones, let's explicitly delete them - I can hardly imagine a case when one needs to copy an allocator instead of moving it. The move semantics in MempoolInstance is implemented as a simple swap so that the destructor of moved-from object will release resources that moved-to object owned before the move. All the newly introduced constructors and operators are explicitly marked as `noexcept` because thay actually are. Part of #110 --- src/Utils/Mempool.hpp | 36 +++++++++++++++++++++++++++++++++++ test/MempoolUnitTest.cpp | 41 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/src/Utils/Mempool.hpp b/src/Utils/Mempool.hpp index 76bd61723..0ef49773a 100644 --- a/src/Utils/Mempool.hpp +++ b/src/Utils/Mempool.hpp @@ -34,6 +34,7 @@ #include #include #include +#include namespace tnt { @@ -44,6 +45,18 @@ class MempoolStats { void statAddBlock() { ++m_BlockCount; } void statDelBlock() { --m_BlockCount; } public: + MempoolStats() = default; + MempoolStats(MempoolStats &&other) noexcept + { + /* Call move assignment operator. */ + *this = std::forward(other); + } + MempoolStats &operator=(MempoolStats &&other) noexcept + { + std::swap(m_SlabCount, other.m_SlabCount); + std::swap(m_BlockCount, other.m_BlockCount); + } + /** Count of allocated (used) blocks. */ size_t statBlockCount() const { return m_BlockCount; } /** Count of allocated (total) slabs. */ @@ -60,6 +73,10 @@ class MempoolStats { void statAddBlock() { } void statDelBlock() { } public: + MempoolStats() = default; + MempoolStats(MempoolStats &&other) noexcept = default; + MempoolStats &operator=(MempoolStats &&other) noexcept = default; + /** Disabled. return SIZE_MAX. */ size_t statBlockCount() const { return SIZE_MAX; } /** Disabled. return SIZE_MAX. */ @@ -115,6 +132,25 @@ class MempoolInstance : public MempoolStats { static constexpr size_t SLAB_ALIGN = SA; MempoolInstance() = default; + MempoolInstance(const MempoolInstance &other) = delete; + MempoolInstance &operator=(const MempoolInstance &other) = delete; + MempoolInstance(MempoolInstance &&other) noexcept + { + /* Call move assignment operator. */ + *this = std::forward(other); + } + MempoolInstance &operator=(MempoolInstance &&other) noexcept + { + if (this == &other) + return *this; + /* Move base class. */ + MempoolStats::operator=(std::forward(other)); + std::swap(m_SlabList, other.m_SlabList); + std::swap(m_FreeList, other.m_FreeList); + std::swap(m_SlabDataBeg, other.m_SlabDataBeg); + std::swap(m_SlabDataEnd, other.m_SlabDataEnd); + return *this; + } ~MempoolInstance() noexcept { while (m_SlabList != nullptr) { diff --git a/test/MempoolUnitTest.cpp b/test/MempoolUnitTest.cpp index 95a73ec90..cbb6e5d29 100644 --- a/test/MempoolUnitTest.cpp +++ b/test/MempoolUnitTest.cpp @@ -32,6 +32,7 @@ #include "../src/Utils/Mempool.hpp" #include "Utils/Helpers.hpp" #include +#include template struct Allocation { @@ -247,6 +248,40 @@ test_alignment() } } +template +void +test_move() +{ + TEST_INIT(2, S, 0); + constexpr size_t N = 1024; + tnt::MempoolInstance mp; + + Allocations all; + for (size_t i = 0; i < N; i++) + all.add(mp.allocate()); + fail_unless(all.are_valid()); + + tnt::MempoolInstance mp_move_constructed(std::move(mp)); + for (size_t i = 0; i < N; i++) { + all.add(mp_move_constructed.allocate()); + all.add(mp.allocate()); + } + fail_unless(all.are_valid()); + + tnt::MempoolInstance mp_move_copied; + /* Allocate some memory and let ASAN check if it won't be leaked. */ + for (size_t i = 0; i < N; i++) { + char *ptr = mp_move_copied.allocate(); + (void)ptr; + } + mp_move_copied = std::move(mp); + for (size_t i = 0; i < N; i++) { + all.add(mp_move_copied.allocate()); + all.add(mp.allocate()); + } + fail_unless(all.are_valid()); +} + int main() { test_default<8>(); @@ -275,4 +310,10 @@ int main() test_alignment<120, 2>(); test_alignment<120, 13>(); test_alignment<120, 64>(); + + test_move<8>(); + test_move<64>(); + test_move<14>(); + test_move<72>(); + test_move<80>(); } From 60700f536baeab6cd2febb2ba4919e8ce520f26a Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Tue, 15 Apr 2025 16:05:38 +0300 Subject: [PATCH 07/15] buffer: consume allocator in constructor Allocator can be not copyable, so it's better to consume it in constructor. It is needed because we are going to use non-copyable allocator for `Buffer` by default. Part of #110 --- src/Buffer/Buffer.hpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Buffer/Buffer.hpp b/src/Buffer/Buffer.hpp index 68f405578..6acce4072 100644 --- a/src/Buffer/Buffer.hpp +++ b/src/Buffer/Buffer.hpp @@ -37,6 +37,7 @@ #include #include #include +#include #include "../Utils/Mempool.hpp" #include "../Utils/List.hpp" @@ -265,7 +266,7 @@ class Buffer /** =============== Buffer definition =============== */ /** Copy of any kind is disabled. Move is allowed. */ - Buffer(const allocator& all = allocator()); + Buffer(allocator &&all = allocator()); Buffer(const Buffer& buf) = delete; Buffer& operator = (const Buffer& buf) = delete; Buffer(Buffer &&buf) noexcept = default; @@ -648,7 +649,7 @@ Buffer::iterator_common::moveBackward(size_t step) } template -Buffer::Buffer(const allocator &all) : m_all(all) +Buffer::Buffer(allocator &&all) : m_all(std::forward(all)) { static_assert((N & (N - 1)) == 0, "N must be power of 2"); static_assert(allocator::REAL_SIZE % alignof(Block) == 0, From 494712e72787a0add31a8d0ddef40aafac412301 Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Thu, 10 Apr 2025 11:35:54 +0300 Subject: [PATCH 08/15] buffer: implement move semantics properly Default implementations of move semantics are not suitable because buffer owns resources - let's implement proper ones. Note that the move cannot be implemented as a swap here because allocator, that is stored in the buffer, shouldn't be used after it is moved. Part of #110 --- src/Buffer/Buffer.hpp | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/src/Buffer/Buffer.hpp b/src/Buffer/Buffer.hpp index 6acce4072..6fb8caf55 100644 --- a/src/Buffer/Buffer.hpp +++ b/src/Buffer/Buffer.hpp @@ -116,6 +116,12 @@ class Buffer * to the next byte after the last valid byte in block. * */ bool isEndOfBlock(const char *ptr); + /** Delete blocks and release occupied memory. */ + void releaseBlocks(void) + { + while (!m_blocks.isEmpty()) + delBlock(&m_blocks.first()); + } public: /** =============== Convenient wrappers =============== */ @@ -269,8 +275,32 @@ class Buffer Buffer(allocator &&all = allocator()); Buffer(const Buffer& buf) = delete; Buffer& operator = (const Buffer& buf) = delete; - Buffer(Buffer &&buf) noexcept = default; - Buffer &operator=(Buffer &&buf) noexcept = default; + Buffer(Buffer &&other) noexcept + { + /* Call move assignment operator. */ + *this = std::forward(other); + } + Buffer &operator=(Buffer &&other) noexcept + { + if (this == &other) + return *this; + + m_blocks = std::move(other.m_blocks); + /* + * Release blocks of `other` right on the move because + * we are going to move its allocator as well and we + * must not use it after it is moved. + */ + other.releaseBlocks(); + m_all = std::move(other.m_all); + + m_iterators = std::move(other.m_iterators); + m_begin = other.m_begin; + other.m_begin = nullptr; + m_end = other.m_end; + other.m_end = nullptr; + return *this; + } ~Buffer() noexcept; /** @@ -667,9 +697,7 @@ Buffer::Buffer(allocator &&all) : m_all(std::forward(al template Buffer::~Buffer() noexcept { - /* Delete blocks and release occupied memory. */ - while (!m_blocks.isEmpty()) - delBlock(&m_blocks.first()); + releaseBlocks(); } template From 31943ba4b98f837d8fe8108fe09dfc670604b59e Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Wed, 9 Apr 2025 15:53:29 +0300 Subject: [PATCH 09/15] buffer: use MempoolInstance as default allocator Currently we use MempoolHolder as default allocator - it makes all buffers use the same MempoolInstance. This buffer cannot be used in Multi-Threaded scenarios, so let's simply use `MempoolInstance` instead - now each connection will use its own allocator. To reduce memory footprint of each connection, let's set default parameter `M = slab_size / block_size` to 16 to use 256KB slabs by default. Along the way, move a brace to the same line with `Buffer` class name to make it conform to our clang format. Part of #110 --- src/Buffer/Buffer.hpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/Buffer/Buffer.hpp b/src/Buffer/Buffer.hpp index 6fb8caf55..cf73dd9af 100644 --- a/src/Buffer/Buffer.hpp +++ b/src/Buffer/Buffer.hpp @@ -59,9 +59,8 @@ namespace tnt { * REAL_SIZE - constant determines real size of allocated chunk (excluding * overhead taken by allocator). */ -template > -class Buffer -{ +template > +class Buffer { private: /** =============== Block definition =============== */ /** Blocks are organized into linked list. */ From 0f0f3894ba631f94ff34180595594da3343ea338 Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Wed, 9 Apr 2025 14:47:03 +0300 Subject: [PATCH 10/15] test: add a test for multithreaded client usage The previous commits adapted Tntcxx for multithreaded execution - this adds a test for the scenario. Part of #110 --- CMakeLists.txt | 7 ++ test/ClientMultithreadTest.cpp | 187 +++++++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+) create mode 100644 test/ClientMultithreadTest.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9df8e0571..ab727c1d5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -86,6 +86,8 @@ IF (TNTCXX_BUILD_TESTING) # Retrieve the source directory to later get the header path. FETCHCONTENT_GETPROPERTIES(msgpuck) FETCHCONTENT_MAKEAVAILABLE(msgpuck) + + find_package(Threads REQUIRED) ENDIF() OPTION(TNTCXX_ENABLE_SANITIZERS @@ -209,6 +211,11 @@ TNTCXX_TEST(NAME Client.test TYPE ctest LIBRARIES ${COMMON_LIB} ) +TNTCXX_TEST(NAME ClientMultithread.test TYPE ctest + SOURCES src/Client/Connector.hpp test/ClientMultithreadTest.cpp + LIBRARIES ${COMMON_LIB} Threads::Threads +) + IF (TNTCXX_ENABLE_SSL) TNTCXX_TEST(NAME ClientSSL.test TYPE ctest SOURCES src/Client/Connector.hpp test/ClientTest.cpp diff --git a/test/ClientMultithreadTest.cpp b/test/ClientMultithreadTest.cpp new file mode 100644 index 000000000..5c4c068d3 --- /dev/null +++ b/test/ClientMultithreadTest.cpp @@ -0,0 +1,187 @@ +/* + * SPDX-License-Identifier: BSD-2-Clause + * + * Copyright 2010-2025, Tarantool AUTHORS, please see AUTHORS file. + */ +#include "Utils/Helpers.hpp" +#include "Utils/System.hpp" +#include "Utils/UserTuple.hpp" + +#include "Client/Connector.hpp" +#include "Client/LibevNetProvider.hpp" + +#include +#include +#include + +const char *localhost = "127.0.0.1"; +int port = 3301; +int dummy_server_port = 3302; +const char *unixsocket = "./tnt.sock"; +int WAIT_TIMEOUT = 1000; // milliseconds + +using Buf_t = tnt::Buffer<16 * 1024>; + +#ifdef TNTCXX_ENABLE_SSL +constexpr bool enable_ssl = true; +constexpr StreamTransport transport = STREAM_SSL; +#else +constexpr bool enable_ssl = false; +constexpr StreamTransport transport = STREAM_PLAIN; +#endif + +#ifdef __linux__ +using NetProvider = EpollNetProvider; +#else +using NetProvider = LibevNetProvider; +#endif + +template +static int +test_connect(Connector &client, Connection &conn, const std::string &addr, unsigned port, const std::string user = {}, + const std::string passwd = {}) +{ + std::string service = port == 0 ? std::string {} : std::to_string(port); + return client.connect(conn, + { + .address = addr, + .service = service, + .transport = transport, + .user = user, + .passwd = passwd, + }); +} + +class PingRequestProcessor { +public: + static rid_t sendRequest(Connection &conn, size_t thread_id, size_t iter) + { + (void)thread_id; + (void)iter; + rid_t f = conn.ping(); + fail_unless(!conn.futureIsReady(f)); + return f; + } + + static void processResponse(std::optional> &response, size_t thread_id, size_t iter) + { + (void)thread_id; + (void)iter; + fail_unless(response != std::nullopt); + fail_unless(response->header.code == 0); + } +}; + +class ReplaceRequestProcessor { +public: + static rid_t sendRequest(Connection &conn, size_t thread_id, size_t iter) + { + const size_t space_id = 512; + std::tuple data = std::make_tuple(iter, "a", double(iter * thread_id)); + rid_t f = conn.space[space_id].replace(data); + fail_unless(!conn.futureIsReady(f)); + return f; + } + + static void processResponse(std::optional> &response, size_t thread_id, size_t iter) + { + fail_unless(response != std::nullopt); + fail_unless(response->header.code == 0); + + fail_unless(response != std::nullopt); + fail_unless(response->body.data != std::nullopt); + fail_unless(response->body.error_stack == std::nullopt); + + std::vector> response_data; + fail_unless(response->body.data->decode(response_data)); + fail_unless(response_data.size() == 1); + fail_unless(std::get<0>(response_data[0]) == iter); + fail_unless(std::get<1>(response_data[0]) == std::string("a")); + fail_unless(std::fabs(std::get<2>(response_data[0]) - iter * thread_id) + <= std::numeric_limits::epsilon()); + } +}; + +template +static void +multithread_test(void) +{ + TEST_INIT(0); + static constexpr int ITER_NUM = 1000; + static constexpr int THREAD_NUM = 24; + std::vector threads; + threads.reserve(THREAD_NUM); + for (int t = 0; t < THREAD_NUM; t++) { + threads.emplace_back([]() { + Connector client; + std::vector> conns; + for (size_t i = 0; i < ConnPerThread; i++) + conns.emplace_back(client); + for (auto &conn : conns) { + int rc = test_connect(client, conn, localhost, port); + fail_unless(rc == 0); + } + + for (int iter = 0; iter < ITER_NUM; iter++) { + std::array fs; + + for (size_t t = 0; t < ConnPerThread; t++) + fs[t] = RequestProcessor::sendRequest(conns[t], t, iter); + + for (size_t t = 0; t < ConnPerThread; t++) { + client.wait(conns[t], fs[t], WAIT_TIMEOUT); + fail_unless(conns[t].futureIsReady(fs[t])); + std::optional> response = conns[t].getResponse(fs[t]); + RequestProcessor::processResponse(response, t, iter); + } + } + }); + } + for (auto &thread : threads) + thread.join(); +} + +int +main() +{ + /* + * Send STDOUT to /dev/null - otherwise, there will be a ton of debug + * logs and it will be impossible to inspect them on failure. + * + * Note that one can disable debug logs by setting logging level of + * `Logger`. But that's not the case here because we want to cover + * the logger with multi-threading test as well to make sure that + * it is thread-safe, so we shouldn't disable them, but we don't want + * to read them either. + */ + if (freopen("/dev/null", "w", stdout) == NULL) { + std::cerr << "Cannot send STDOUT to /dev/null" << std::endl; + abort(); + } +#ifdef TNTCXX_ENABLE_SSL +#ifndef __FreeBSD__ + // There's no way to disable SIGPIPE for SSL on non-FreeBSD platforms, + // so it is needed to disable signal handling. + signal(SIGPIPE, SIG_IGN); +#endif +#endif + + if (cleanDir() != 0) + return -1; + +#ifdef TNTCXX_ENABLE_SSL + if (genSSLCert() != 0) + return -1; +#endif + + if (launchTarantool(enable_ssl) != 0) + return -1; + + sleep(1); + + multithread_test(); + multithread_test(); + multithread_test(); + multithread_test(); + return 0; +} From 60e6ecf08f915c699d5a1afdeedc2d4ea05ae62d Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Tue, 8 Apr 2025 10:59:58 +0300 Subject: [PATCH 11/15] build/ci: add thread sanitizer workflow We have a cmake option to build Tntcxx with sanitizers. Currently, only memory and UB sanitizers are used - let's populate the list with thread sanitizer since we have a test for a multithreaded scenario and use it on CI. Since address and thread sanitizers cannot be used together, existing cmake option `TNTCXX_ENABLE_SANITIZERS` is extended - now it accepts "address" and "thread" arguments to choose a sanitizer. Part of #110 --- .github/actions/build-tntcxx/action.yml | 2 +- .github/workflows/reusable-testing.yml | 4 ++-- .github/workflows/testing.yml | 7 +++++-- CMakeLists.txt | 12 ++++++++++-- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/.github/actions/build-tntcxx/action.yml b/.github/actions/build-tntcxx/action.yml index dba8f8cef..61f8b6d6a 100644 --- a/.github/actions/build-tntcxx/action.yml +++ b/.github/actions/build-tntcxx/action.yml @@ -6,7 +6,7 @@ inputs: required: true enable-sanitizers: description: 'Corresponds to TNTCXX_ENABLE_SANITIZERS option of CMake' - default: 'false' + default: '' cxx-standard: description: 'Corresponds to CMAKE_CXX_STANDARD option of CMake' cxx-compiler: diff --git a/.github/workflows/reusable-testing.yml b/.github/workflows/reusable-testing.yml index fd7e5db2d..118644df3 100644 --- a/.github/workflows/reusable-testing.yml +++ b/.github/workflows/reusable-testing.yml @@ -22,8 +22,8 @@ on: default: false type: boolean enable-sanitizers: - default: false - type: boolean + default: '' + type: string build-only: default: false type: boolean diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index fbbe11654..35c77eb11 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -100,10 +100,13 @@ jobs: exclude: - runs-on: macos-14 compiler: {c: gcc, cxx: g++} - name: sanitizers (${{ matrix.runs-on }}, ${{ matrix.build-type }}, ${{ matrix.compiler.c }}) + sanitizer: + - thread + - address + name: sanitizers (${{ matrix.sanitizer }}, ${{ matrix.runs-on }}, ${{ matrix.build-type }}, ${{ matrix.compiler.c }}) with: runs-on: ${{ matrix.runs-on }} build-type: ${{ matrix.build-type }} c-compiler: ${{ matrix.compiler.c }} cxx-compiler: ${{ matrix.compiler.cxx }} - enable-sanitizers: true + enable-sanitizers: ${{ matrix.sanitizer }} diff --git a/CMakeLists.txt b/CMakeLists.txt index ab727c1d5..053086d7f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -91,10 +91,12 @@ IF (TNTCXX_BUILD_TESTING) ENDIF() OPTION(TNTCXX_ENABLE_SANITIZERS - "If ON, tntcxx will be instrumented with sanitizers." + "If passed, tntcxx will be instrumented with sanitizers. Possible values: \"address\", \"thread\"" OFF) -IF (TNTCXX_ENABLE_SANITIZERS) +IF (NOT TNTCXX_ENABLE_SANITIZERS) + # Sanitizers are disabled - do nothing. +ELSEIF (TNTCXX_ENABLE_SANITIZERS STREQUAL "address") SET(SANITIZER_FLAGS "-fsanitize=address") # FIXME(gh-62) IF (NOT CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang") @@ -114,6 +116,12 @@ IF (TNTCXX_ENABLE_SANITIZERS) # for details). ADD_COMPILE_OPTIONS(${SANITIZER_FLAGS}) ADD_LINK_OPTIONS(${SANITIZER_FLAGS}) +ELSEIF (TNTCXX_ENABLE_SANITIZERS STREQUAL "thread") + SET(SANITIZER_FLAGS "-fsanitize=thread") + ADD_COMPILE_OPTIONS(${SANITIZER_FLAGS}) + ADD_LINK_OPTIONS(${SANITIZER_FLAGS}) +ELSE () + MESSAGE(FATAL_ERROR "Unknown TNTCXX_ENABLE_SANITIZERS value") ENDIF() # Common function for building tests. From dc539480ff41f4e62cffb10ba025c970e235cc24 Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Tue, 15 Apr 2025 18:01:43 +0300 Subject: [PATCH 12/15] readme: remove trailing whitespaces Trailing whitespaces are never appreciated, so let's remove them all. --- README.md | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 14ce5bc22..0c9e61f8b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # tntcxx — Tarantool C++ Connector -This repository contains the tntcxx Tarantool C++ connector code. tntcxx is an -open-source Tarantool C++ connector (compliant to C++17) designed with high +This repository contains the tntcxx Tarantool C++ connector code. tntcxx is an +open-source Tarantool C++ connector (compliant to C++17) designed with high efficiency in mind. ## Building tntcxx @@ -25,21 +25,21 @@ subdirectory of your project or as an embedded dependency. 1. Make tntcxx's source code available to the main build. This can be done a few different ways: - * Download the tntcxx source code manually and place it at a known location. + * Download the tntcxx source code manually and place it at a known location. This is the least flexible approach and can make it more difficult to use with continuous integration systems, etc. * Embed the tntcxx source code as a direct copy in the main project's source - tree. This is often the simplest approach, but is also the hardest to keep + tree. This is often the simplest approach, but is also the hardest to keep up to date. Some organizations may not permit this method. * Add tntcxx as a [git submodule](https://git-scm.com/docs/git-submodule) or equivalent. This may not always be possible or appropriate. Git submodules, for example, have their own set of advantages and drawbacks. * Use the CMake [`FetchContent`](https://cmake.org/cmake/help/latest/module/FetchContent.html) - commands to download tntcxx as part of the build's configure step. This + commands to download tntcxx as part of the build's configure step. This approach doesn't have the limitations of the other methods. -The last of the above methods is implemented with a small piece of CMake code -that downloads and pulls the tntcxx code into the main build. Just add the +The last of the above methods is implemented with a small piece of CMake code +that downloads and pulls the tntcxx code into the main build. Just add the following snippet to your CMakeLists.txt: ```cmake include(FetchContent) @@ -50,7 +50,7 @@ FetchContent_Declare( FetchContent_MakeAvailable(tntcxx) ``` -After obtaining tntcxx sources using the rest of the methods, you can use the +After obtaining tntcxx sources using the rest of the methods, you can use the following CMake command to incorporate tntcxx into your CMake project: ```cmake add_subdirectory(${TNTCXX_SOURCE_DIR}) @@ -64,8 +64,8 @@ target_link_libraries(example tntcxx::tntcxx) ##### Running tntcxx Tests with CMake -Use the `-DTNTCXX_BUILD_TESTING=ON` option to run the tntcxx tests. This option -is enabled by default if the tntcxx project is determined to be the top level +Use the `-DTNTCXX_BUILD_TESTING=ON` option to run the tntcxx tests. This option +is enabled by default if the tntcxx project is determined to be the top level project. Note that `BUILD_TESTING` must also be on (the default). For example, to run the tntcxx tests, you could use this script: @@ -80,8 +80,8 @@ ctest ### CMake Option Synopsis -- `-DTNTCXX_BUILD_TESTING=ON` must be set to enable testing. This option is -enabled by default if the tntcxx project is determined to be the top level +- `-DTNTCXX_BUILD_TESTING=ON` must be set to enable testing. This option is +enabled by default if the tntcxx project is determined to be the top level project. ## Internals @@ -139,7 +139,7 @@ Connection conn(client); Now assume Tarantool instance is listening `3301` port on localhost. To connect to the server we should invoke `Connector::connect()` method of client object and -pass three arguments: connection instance, address and port. +pass three arguments: connection instance, address and port. ```c++ int rc = client.connect(conn, address, port); ``` @@ -165,7 +165,7 @@ To execute simplest request (i.e. ping), one can invoke corresponding method of connection object: ```c++ rid_t ping = conn.ping(); -``` +``` Each request method returns request id, which is sort of future. It can be used to get the result of request execution once it is ready (i.e. response). Requests are queued in the input buffer of connection until `Connector::wait()` is called. @@ -204,7 +204,7 @@ msgpacks. See section below to understand how to decode tuples. Now let's consider a bit more sophisticated requests. Assume we have space with `id = 512` and following format on the server: -`CREATE TABLE t(id INT PRIMARY KEY, a TEXT, b DOUBLE);` +`CREATE TABLE t(id INT PRIMARY KEY, a TEXT, b DOUBLE);` Preparing analogue of `t:replace(1, "111", 1.01);` request can be done this way: ```c++ From 534f69b8b9b58400f2eba69125fae9f63783255d Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Wed, 9 Apr 2025 15:05:27 +0300 Subject: [PATCH 13/15] readme: start words with capital letters in headers Just prettifying the readme. --- README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 0c9e61f8b..cf477ba18 100644 --- a/README.md +++ b/README.md @@ -107,7 +107,7 @@ as template parameter of buffer. Connector can be embedded in any C++ application with including main header: `#include "/src/Client/Connector.hpp"` -### Objects instantiation +### Objects Instantiation To create client one should specify buffer's and network provider's implementations as template parameters. Connector's main class has the following signature: @@ -144,7 +144,7 @@ pass three arguments: connection instance, address and port. int rc = client.connect(conn, address, port); ``` -### Error handling +### Error Handling Implementation of connector is exception free, so we rely on return codes: in case of fail, `connect()` will return `rc < 0`. @@ -159,7 +159,7 @@ if (rc != 0) { To reset connection after errors (clean up error message and connection status), one can use `Connection::reset()`. -### Preparing requests +### Preparing Requests To execute simplest request (i.e. ping), one can invoke corresponding method of connection object: @@ -170,7 +170,7 @@ Each request method returns request id, which is sort of future. It can be used to get the result of request execution once it is ready (i.e. response). Requests are queued in the input buffer of connection until `Connector::wait()` is called. -### Sending requests +### Sending Requests That said, to send requests to the server side, we should invoke `client.wait()`: ```c++ @@ -182,7 +182,7 @@ request is ready, `wait()` terminates. It also provides negative return code in case of system related fails (e.g. broken or time outed connection). If `wait()` returns 0, then response is received and expected to be parsed. -### Receiving responses +### Receiving Responses To get the response when it is ready, we can use `Connection::getResponse()`. It takes request id and returns optional object containing response (`nullptr` @@ -200,7 +200,7 @@ either runtime error(s) (accessible by `response.body.error_stack`) or data tuples are not decoded and come in form of pointers to the start and end of msgpacks. See section below to understand how to decode tuples. -### Data manipulation +### Data Manipulation Now let's consider a bit more sophisticated requests. Assume we have space with `id = 512` and following format on the server: @@ -218,7 +218,7 @@ auto i = conn.space[512].index[1]; rid_t select = i.select(std::make_tuple(1), 1, 0 /*offset*/, IteratorType::EQ); ``` -### Data readers +### Data Readers Responses from server contain raw data (i.e. encoded into MsgPack tuples). Let's define structure describing data stored in space `t`: From 9d87d4734251d5b9ad43f47c0ec7397973eb1b98 Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Wed, 9 Apr 2025 15:37:57 +0300 Subject: [PATCH 14/15] readme: add a section about multi-threading model The section describes how the connector should be used from several threads. Closes #110 Co-authored-by: Georgiy Lebedev --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index cf477ba18..d7cb62c05 100644 --- a/README.md +++ b/README.md @@ -263,3 +263,9 @@ an array of tuples as value in response to `select`. So, in order to successfully decode them, we should pass an array of tuples to decoder - that's why `std::vector` is needed. If decoding was successful, `results` will contain all decoded `UserTuples`. + +## Multi-Threading Model + +A `Connector` object and all its instances of `Connection` objects must be used in a single thread. For multi-threaded usage, create one or several `Connector` instances for each thread. Each `Connection` object must be used only with the `Connector` object that it was created from. + +If custom `Buffer` or `NetProvider` implementations are used for `Connector` objects, the custom implementations must not share any state (e.g., `static` fields). From bc0984a07fbd7213ea279975449f96f81ea82305 Mon Sep 17 00:00:00 2001 From: Andrey Saranchin Date: Thu, 17 Apr 2025 10:21:18 +0300 Subject: [PATCH 15/15] ci: use the newest version of setup-tarantool We are using the first version of setup-tarantool action and it causes some problems - sometimes it just fails because the NodeJS version installed on the runner is too new for this old action. Let's simply use the third (the newest) version of this action - it behaves in the same way, but it is more stable. --- .github/actions/setup-tarantool/action.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/actions/setup-tarantool/action.yml b/.github/actions/setup-tarantool/action.yml index 63487dafb..3fe10ad3c 100644 --- a/.github/actions/setup-tarantool/action.yml +++ b/.github/actions/setup-tarantool/action.yml @@ -10,7 +10,7 @@ runs: steps: - name: Setup Tarantool 2.11 if: startsWith(inputs.runs-on, 'ubuntu') - uses: tarantool/setup-tarantool@v1 + uses: tarantool/setup-tarantool@v3 with: tarantool-version: '2.11'