diff --git a/.clang-format b/.clang-format index c6304c770..49fbe83f3 100644 --- a/.clang-format +++ b/.clang-format @@ -8,9 +8,11 @@ UseTab: Always AccessModifierOffset: -8 AlignAfterOpenBracket: Align AlwaysBreakAfterReturnType: TopLevel +AlwaysBreakTemplateDeclarations: Yes ColumnLimit: 120 ContinuationIndentWidth: 8 Cpp11BracedListStyle: true SpaceInEmptyBlock: false +PackConstructorInitializers: CurrentLine PointerAlignment: Right ReferenceAlignment: Right diff --git a/.github/actions/build-tntcxx/action.yml b/.github/actions/build-tntcxx/action.yml index dba8f8cef..61f8b6d6a 100644 --- a/.github/actions/build-tntcxx/action.yml +++ b/.github/actions/build-tntcxx/action.yml @@ -6,7 +6,7 @@ inputs: required: true enable-sanitizers: description: 'Corresponds to TNTCXX_ENABLE_SANITIZERS option of CMake' - default: 'false' + default: '' cxx-standard: description: 'Corresponds to CMAKE_CXX_STANDARD option of CMake' cxx-compiler: diff --git a/.github/actions/setup-tarantool/action.yml b/.github/actions/setup-tarantool/action.yml index 63487dafb..3fe10ad3c 100644 --- a/.github/actions/setup-tarantool/action.yml +++ b/.github/actions/setup-tarantool/action.yml @@ -10,7 +10,7 @@ runs: steps: - name: Setup Tarantool 2.11 if: startsWith(inputs.runs-on, 'ubuntu') - uses: tarantool/setup-tarantool@v1 + uses: tarantool/setup-tarantool@v3 with: tarantool-version: '2.11' diff --git a/.github/workflows/reusable-testing.yml b/.github/workflows/reusable-testing.yml index fd7e5db2d..118644df3 100644 --- a/.github/workflows/reusable-testing.yml +++ b/.github/workflows/reusable-testing.yml @@ -22,8 +22,8 @@ on: default: false type: boolean enable-sanitizers: - default: false - type: boolean + default: '' + type: string build-only: default: false type: boolean diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index fbbe11654..35c77eb11 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -100,10 +100,13 @@ jobs: exclude: - runs-on: macos-14 compiler: {c: gcc, cxx: g++} - name: sanitizers (${{ matrix.runs-on }}, ${{ matrix.build-type }}, ${{ matrix.compiler.c }}) + sanitizer: + - thread + - address + name: sanitizers (${{ matrix.sanitizer }}, ${{ matrix.runs-on }}, ${{ matrix.build-type }}, ${{ matrix.compiler.c }}) with: runs-on: ${{ matrix.runs-on }} build-type: ${{ matrix.build-type }} c-compiler: ${{ matrix.compiler.c }} cxx-compiler: ${{ matrix.compiler.cxx }} - enable-sanitizers: true + enable-sanitizers: ${{ matrix.sanitizer }} diff --git a/CMakeLists.txt b/CMakeLists.txt index 9df8e0571..053086d7f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -86,13 +86,17 @@ IF (TNTCXX_BUILD_TESTING) # Retrieve the source directory to later get the header path. FETCHCONTENT_GETPROPERTIES(msgpuck) FETCHCONTENT_MAKEAVAILABLE(msgpuck) + + find_package(Threads REQUIRED) ENDIF() OPTION(TNTCXX_ENABLE_SANITIZERS - "If ON, tntcxx will be instrumented with sanitizers." + "If passed, tntcxx will be instrumented with sanitizers. Possible values: \"address\", \"thread\"" OFF) -IF (TNTCXX_ENABLE_SANITIZERS) +IF (NOT TNTCXX_ENABLE_SANITIZERS) + # Sanitizers are disabled - do nothing. +ELSEIF (TNTCXX_ENABLE_SANITIZERS STREQUAL "address") SET(SANITIZER_FLAGS "-fsanitize=address") # FIXME(gh-62) IF (NOT CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang") @@ -112,6 +116,12 @@ IF (TNTCXX_ENABLE_SANITIZERS) # for details). ADD_COMPILE_OPTIONS(${SANITIZER_FLAGS}) ADD_LINK_OPTIONS(${SANITIZER_FLAGS}) +ELSEIF (TNTCXX_ENABLE_SANITIZERS STREQUAL "thread") + SET(SANITIZER_FLAGS "-fsanitize=thread") + ADD_COMPILE_OPTIONS(${SANITIZER_FLAGS}) + ADD_LINK_OPTIONS(${SANITIZER_FLAGS}) +ELSE () + MESSAGE(FATAL_ERROR "Unknown TNTCXX_ENABLE_SANITIZERS value") ENDIF() # Common function for building tests. @@ -209,6 +219,11 @@ TNTCXX_TEST(NAME Client.test TYPE ctest LIBRARIES ${COMMON_LIB} ) +TNTCXX_TEST(NAME ClientMultithread.test TYPE ctest + SOURCES src/Client/Connector.hpp test/ClientMultithreadTest.cpp + LIBRARIES ${COMMON_LIB} Threads::Threads +) + IF (TNTCXX_ENABLE_SSL) TNTCXX_TEST(NAME ClientSSL.test TYPE ctest SOURCES src/Client/Connector.hpp test/ClientTest.cpp diff --git a/README.md b/README.md index 14ce5bc22..d7cb62c05 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # tntcxx — Tarantool C++ Connector -This repository contains the tntcxx Tarantool C++ connector code. tntcxx is an -open-source Tarantool C++ connector (compliant to C++17) designed with high +This repository contains the tntcxx Tarantool C++ connector code. tntcxx is an +open-source Tarantool C++ connector (compliant to C++17) designed with high efficiency in mind. ## Building tntcxx @@ -25,21 +25,21 @@ subdirectory of your project or as an embedded dependency. 1. Make tntcxx's source code available to the main build. This can be done a few different ways: - * Download the tntcxx source code manually and place it at a known location. + * Download the tntcxx source code manually and place it at a known location. This is the least flexible approach and can make it more difficult to use with continuous integration systems, etc. * Embed the tntcxx source code as a direct copy in the main project's source - tree. This is often the simplest approach, but is also the hardest to keep + tree. This is often the simplest approach, but is also the hardest to keep up to date. Some organizations may not permit this method. * Add tntcxx as a [git submodule](https://git-scm.com/docs/git-submodule) or equivalent. This may not always be possible or appropriate. Git submodules, for example, have their own set of advantages and drawbacks. * Use the CMake [`FetchContent`](https://cmake.org/cmake/help/latest/module/FetchContent.html) - commands to download tntcxx as part of the build's configure step. This + commands to download tntcxx as part of the build's configure step. This approach doesn't have the limitations of the other methods. -The last of the above methods is implemented with a small piece of CMake code -that downloads and pulls the tntcxx code into the main build. Just add the +The last of the above methods is implemented with a small piece of CMake code +that downloads and pulls the tntcxx code into the main build. Just add the following snippet to your CMakeLists.txt: ```cmake include(FetchContent) @@ -50,7 +50,7 @@ FetchContent_Declare( FetchContent_MakeAvailable(tntcxx) ``` -After obtaining tntcxx sources using the rest of the methods, you can use the +After obtaining tntcxx sources using the rest of the methods, you can use the following CMake command to incorporate tntcxx into your CMake project: ```cmake add_subdirectory(${TNTCXX_SOURCE_DIR}) @@ -64,8 +64,8 @@ target_link_libraries(example tntcxx::tntcxx) ##### Running tntcxx Tests with CMake -Use the `-DTNTCXX_BUILD_TESTING=ON` option to run the tntcxx tests. This option -is enabled by default if the tntcxx project is determined to be the top level +Use the `-DTNTCXX_BUILD_TESTING=ON` option to run the tntcxx tests. This option +is enabled by default if the tntcxx project is determined to be the top level project. Note that `BUILD_TESTING` must also be on (the default). For example, to run the tntcxx tests, you could use this script: @@ -80,8 +80,8 @@ ctest ### CMake Option Synopsis -- `-DTNTCXX_BUILD_TESTING=ON` must be set to enable testing. This option is -enabled by default if the tntcxx project is determined to be the top level +- `-DTNTCXX_BUILD_TESTING=ON` must be set to enable testing. This option is +enabled by default if the tntcxx project is determined to be the top level project. ## Internals @@ -107,7 +107,7 @@ as template parameter of buffer. Connector can be embedded in any C++ application with including main header: `#include "/src/Client/Connector.hpp"` -### Objects instantiation +### Objects Instantiation To create client one should specify buffer's and network provider's implementations as template parameters. Connector's main class has the following signature: @@ -139,12 +139,12 @@ Connection conn(client); Now assume Tarantool instance is listening `3301` port on localhost. To connect to the server we should invoke `Connector::connect()` method of client object and -pass three arguments: connection instance, address and port. +pass three arguments: connection instance, address and port. ```c++ int rc = client.connect(conn, address, port); ``` -### Error handling +### Error Handling Implementation of connector is exception free, so we rely on return codes: in case of fail, `connect()` will return `rc < 0`. @@ -159,18 +159,18 @@ if (rc != 0) { To reset connection after errors (clean up error message and connection status), one can use `Connection::reset()`. -### Preparing requests +### Preparing Requests To execute simplest request (i.e. ping), one can invoke corresponding method of connection object: ```c++ rid_t ping = conn.ping(); -``` +``` Each request method returns request id, which is sort of future. It can be used to get the result of request execution once it is ready (i.e. response). Requests are queued in the input buffer of connection until `Connector::wait()` is called. -### Sending requests +### Sending Requests That said, to send requests to the server side, we should invoke `client.wait()`: ```c++ @@ -182,7 +182,7 @@ request is ready, `wait()` terminates. It also provides negative return code in case of system related fails (e.g. broken or time outed connection). If `wait()` returns 0, then response is received and expected to be parsed. -### Receiving responses +### Receiving Responses To get the response when it is ready, we can use `Connection::getResponse()`. It takes request id and returns optional object containing response (`nullptr` @@ -200,11 +200,11 @@ either runtime error(s) (accessible by `response.body.error_stack`) or data tuples are not decoded and come in form of pointers to the start and end of msgpacks. See section below to understand how to decode tuples. -### Data manipulation +### Data Manipulation Now let's consider a bit more sophisticated requests. Assume we have space with `id = 512` and following format on the server: -`CREATE TABLE t(id INT PRIMARY KEY, a TEXT, b DOUBLE);` +`CREATE TABLE t(id INT PRIMARY KEY, a TEXT, b DOUBLE);` Preparing analogue of `t:replace(1, "111", 1.01);` request can be done this way: ```c++ @@ -218,7 +218,7 @@ auto i = conn.space[512].index[1]; rid_t select = i.select(std::make_tuple(1), 1, 0 /*offset*/, IteratorType::EQ); ``` -### Data readers +### Data Readers Responses from server contain raw data (i.e. encoded into MsgPack tuples). Let's define structure describing data stored in space `t`: @@ -263,3 +263,9 @@ an array of tuples as value in response to `select`. So, in order to successfully decode them, we should pass an array of tuples to decoder - that's why `std::vector` is needed. If decoding was successful, `results` will contain all decoded `UserTuples`. + +## Multi-Threading Model + +A `Connector` object and all its instances of `Connection` objects must be used in a single thread. For multi-threaded usage, create one or several `Connector` instances for each thread. Each `Connection` object must be used only with the `Connector` object that it was created from. + +If custom `Buffer` or `NetProvider` implementations are used for `Connector` objects, the custom implementations must not share any state (e.g., `static` fields). diff --git a/src/Buffer/Buffer.hpp b/src/Buffer/Buffer.hpp index 68f405578..cf73dd9af 100644 --- a/src/Buffer/Buffer.hpp +++ b/src/Buffer/Buffer.hpp @@ -37,6 +37,7 @@ #include #include #include +#include #include "../Utils/Mempool.hpp" #include "../Utils/List.hpp" @@ -58,9 +59,8 @@ namespace tnt { * REAL_SIZE - constant determines real size of allocated chunk (excluding * overhead taken by allocator). */ -template > -class Buffer -{ +template > +class Buffer { private: /** =============== Block definition =============== */ /** Blocks are organized into linked list. */ @@ -115,6 +115,12 @@ class Buffer * to the next byte after the last valid byte in block. * */ bool isEndOfBlock(const char *ptr); + /** Delete blocks and release occupied memory. */ + void releaseBlocks(void) + { + while (!m_blocks.isEmpty()) + delBlock(&m_blocks.first()); + } public: /** =============== Convenient wrappers =============== */ @@ -265,11 +271,35 @@ class Buffer /** =============== Buffer definition =============== */ /** Copy of any kind is disabled. Move is allowed. */ - Buffer(const allocator& all = allocator()); + Buffer(allocator &&all = allocator()); Buffer(const Buffer& buf) = delete; Buffer& operator = (const Buffer& buf) = delete; - Buffer(Buffer &&buf) noexcept = default; - Buffer &operator=(Buffer &&buf) noexcept = default; + Buffer(Buffer &&other) noexcept + { + /* Call move assignment operator. */ + *this = std::forward(other); + } + Buffer &operator=(Buffer &&other) noexcept + { + if (this == &other) + return *this; + + m_blocks = std::move(other.m_blocks); + /* + * Release blocks of `other` right on the move because + * we are going to move its allocator as well and we + * must not use it after it is moved. + */ + other.releaseBlocks(); + m_all = std::move(other.m_all); + + m_iterators = std::move(other.m_iterators); + m_begin = other.m_begin; + other.m_begin = nullptr; + m_end = other.m_end; + other.m_end = nullptr; + return *this; + } ~Buffer() noexcept; /** @@ -648,7 +678,7 @@ Buffer::iterator_common::moveBackward(size_t step) } template -Buffer::Buffer(const allocator &all) : m_all(all) +Buffer::Buffer(allocator &&all) : m_all(std::forward(all)) { static_assert((N & (N - 1)) == 0, "N must be power of 2"); static_assert(allocator::REAL_SIZE % alignof(Block) == 0, @@ -666,9 +696,7 @@ Buffer::Buffer(const allocator &all) : m_all(all) template Buffer::~Buffer() noexcept { - /* Delete blocks and release occupied memory. */ - while (!m_blocks.isEmpty()) - delBlock(&m_blocks.first()); + releaseBlocks(); } template diff --git a/src/Client/Connection.hpp b/src/Client/Connection.hpp index d9d745f4f..ef5cd35e3 100644 --- a/src/Client/Connection.hpp +++ b/src/Client/Connection.hpp @@ -250,6 +250,7 @@ class Connection private: ConnectionImpl *impl; static constexpr size_t GC_STEP_CNT = 100; + size_t gc_step = 0; template rid_t insert(const T &tuple, uint32_t space_id); @@ -521,8 +522,7 @@ template static void inputBufGC(Connection &conn) { - static int gc_step = 0; - if ((gc_step++ % Connection::GC_STEP_CNT) == 0) { + if ((conn.gc_step++ % Connection::GC_STEP_CNT) == 0) { LOG_DEBUG("Flushed input buffer of the connection %p", &conn); conn.impl->inBuf.flush(); } @@ -612,7 +612,7 @@ Connection::execute(const std::string& statement, const T& { impl->enc.encodeExecute(statement, parameters); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -622,7 +622,7 @@ Connection::execute(unsigned int stmt_id, const T& paramete { impl->enc.encodeExecute(stmt_id, parameters); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -631,7 +631,7 @@ Connection::prepare(const std::string& statement) { impl->enc.encodePrepare(statement); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -641,7 +641,7 @@ Connection::call(const std::string &func, const T &args) { impl->enc.encodeCall(func, args); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -650,7 +650,7 @@ Connection::ping() { impl->enc.encodePing(); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -660,7 +660,7 @@ Connection::insert(const T &tuple, uint32_t space_id) { impl->enc.encodeInsert(tuple, space_id); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -670,7 +670,7 @@ Connection::replace(const T &tuple, uint32_t space_id) { impl->enc.encodeReplace(tuple, space_id); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -681,7 +681,7 @@ Connection::delete_(const T &key, uint32_t space_id, { impl->enc.encodeDelete(key, space_id, index_id); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -692,7 +692,7 @@ Connection::update(const K &key, const T &tuple, { impl->enc.encodeUpdate(key, tuple, space_id, index_id); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -703,7 +703,7 @@ Connection::upsert(const T &tuple, const O &ops, { impl->enc.encodeUpsert(tuple, ops, space_id, index_base); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template @@ -716,7 +716,7 @@ Connection::select(const T &key, uint32_t space_id, impl->enc.encodeSelect(key, space_id, index_id, limit, offset, iterator); impl->connector.readyToSend(*this); - return RequestEncoder::getSync(); + return impl->enc.getSync(); } template diff --git a/src/Client/LibevNetProvider.hpp b/src/Client/LibevNetProvider.hpp index 97577c9d7..fa4a81280 100644 --- a/src/Client/LibevNetProvider.hpp +++ b/src/Client/LibevNetProvider.hpp @@ -270,7 +270,7 @@ LibevNetProvider::LibevNetProvider(Connector_t &connector, m_Connector(connector), m_Loop(loop), m_IsOwnLoop(false) { if (m_Loop == nullptr) { - m_Loop = ev_default_loop(0); + m_Loop = ev_loop_new(0); m_IsOwnLoop = true; } assert(m_Loop != nullptr); diff --git a/src/Client/RequestEncoder.hpp b/src/Client/RequestEncoder.hpp index 2752d3b18..b4862615f 100644 --- a/src/Client/RequestEncoder.hpp +++ b/src/Client/RequestEncoder.hpp @@ -97,12 +97,12 @@ class RequestEncoder { const Greeting &greet); /** Sync value is used as request id. */ - static size_t getSync() { return sync; } + size_t getSync() { return sync; } static constexpr size_t PREHEADER_SIZE = 5; private: void encodeHeader(int request); BUFFER &m_Buf; - inline static ssize_t sync = 0; + ssize_t sync = 0; }; template diff --git a/src/Utils/Logger.hpp b/src/Utils/Logger.hpp index 4f82cc412..aac420cfc 100644 --- a/src/Utils/Logger.hpp +++ b/src/Utils/Logger.hpp @@ -31,8 +31,9 @@ */ #include +#include -#include +#include #include enum LogLevel { @@ -66,23 +67,32 @@ class Logger { Logger(LogLevel lvl) : m_LogLvl(lvl) {}; template - void log(std::ostream& strm, LogLevel log_lvl, - const char *file, int line, ARGS&& ...args) + void log(int fd, LogLevel log_lvl, const char *file, int line, ARGS &&...args) { if (!isLogPossible(log_lvl)) return; - time_t rawTime; - time(&rawTime); - struct tm *timeInfo = localtime(&rawTime); - char timeString[10]; - strftime(timeString, sizeof(timeString), "%H:%M:%S", timeInfo); - // The line below is commented for compatibility with previous - // version. I'm not sure it was bug or feature, but the time, - // filename and line was not printed. + /* File and line were never printed (by mistake, I guess). */ (void)file; (void)line; - //strm << timeString << " " << file << ':' << line << ' '; + /* + * Standard streams (`cout` and `cerr`) are thread-safe + * according to C++11 or more modern standards, but it turns + * out that some compilers do not stick to this contract. + * + * Let's use `stringstream` to build a string and then write + * it manually with `write` since it is guaranteed to be + * thread-safe. Yes, that's slower because of unnnecessary + * allocations and copies, but the log is used generally for + * debug or logging exceptional cases (errors) anyway, so + * that's not a big deal. + * + * Related: https://github.com/llvm/llvm-project/issues/51851 + */ + std::stringstream strm; strm << log_lvl << ": "; (strm << ... << std::forward(args)) << '\n'; + std::string str = strm.str(); + ssize_t rc = write(fd, std::data(str), std::size(str)); + (void)rc; } void setLogLevel(LogLevel lvl) { @@ -106,8 +116,8 @@ template void log(LogLevel level, const char *file, int line, ARGS&& ...args) { - gLogger.log(level == ERROR ? std::cerr : std::cout, - level, file, line, std::forward(args)...); + int fd = level == ERROR ? STDERR_FILENO : STDOUT_FILENO; + gLogger.log(fd, level, file, line, std::forward(args)...); } #define LOG_DEBUG(...) log(DEBUG, __FILE__, __LINE__, __VA_ARGS__) diff --git a/src/Utils/Mempool.hpp b/src/Utils/Mempool.hpp index 76bd61723..0ef49773a 100644 --- a/src/Utils/Mempool.hpp +++ b/src/Utils/Mempool.hpp @@ -34,6 +34,7 @@ #include #include #include +#include namespace tnt { @@ -44,6 +45,18 @@ class MempoolStats { void statAddBlock() { ++m_BlockCount; } void statDelBlock() { --m_BlockCount; } public: + MempoolStats() = default; + MempoolStats(MempoolStats &&other) noexcept + { + /* Call move assignment operator. */ + *this = std::forward(other); + } + MempoolStats &operator=(MempoolStats &&other) noexcept + { + std::swap(m_SlabCount, other.m_SlabCount); + std::swap(m_BlockCount, other.m_BlockCount); + } + /** Count of allocated (used) blocks. */ size_t statBlockCount() const { return m_BlockCount; } /** Count of allocated (total) slabs. */ @@ -60,6 +73,10 @@ class MempoolStats { void statAddBlock() { } void statDelBlock() { } public: + MempoolStats() = default; + MempoolStats(MempoolStats &&other) noexcept = default; + MempoolStats &operator=(MempoolStats &&other) noexcept = default; + /** Disabled. return SIZE_MAX. */ size_t statBlockCount() const { return SIZE_MAX; } /** Disabled. return SIZE_MAX. */ @@ -115,6 +132,25 @@ class MempoolInstance : public MempoolStats { static constexpr size_t SLAB_ALIGN = SA; MempoolInstance() = default; + MempoolInstance(const MempoolInstance &other) = delete; + MempoolInstance &operator=(const MempoolInstance &other) = delete; + MempoolInstance(MempoolInstance &&other) noexcept + { + /* Call move assignment operator. */ + *this = std::forward(other); + } + MempoolInstance &operator=(MempoolInstance &&other) noexcept + { + if (this == &other) + return *this; + /* Move base class. */ + MempoolStats::operator=(std::forward(other)); + std::swap(m_SlabList, other.m_SlabList); + std::swap(m_FreeList, other.m_FreeList); + std::swap(m_SlabDataBeg, other.m_SlabDataBeg); + std::swap(m_SlabDataEnd, other.m_SlabDataEnd); + return *this; + } ~MempoolInstance() noexcept { while (m_SlabList != nullptr) { diff --git a/test/ClientMultithreadTest.cpp b/test/ClientMultithreadTest.cpp new file mode 100644 index 000000000..5c4c068d3 --- /dev/null +++ b/test/ClientMultithreadTest.cpp @@ -0,0 +1,187 @@ +/* + * SPDX-License-Identifier: BSD-2-Clause + * + * Copyright 2010-2025, Tarantool AUTHORS, please see AUTHORS file. + */ +#include "Utils/Helpers.hpp" +#include "Utils/System.hpp" +#include "Utils/UserTuple.hpp" + +#include "Client/Connector.hpp" +#include "Client/LibevNetProvider.hpp" + +#include +#include +#include + +const char *localhost = "127.0.0.1"; +int port = 3301; +int dummy_server_port = 3302; +const char *unixsocket = "./tnt.sock"; +int WAIT_TIMEOUT = 1000; // milliseconds + +using Buf_t = tnt::Buffer<16 * 1024>; + +#ifdef TNTCXX_ENABLE_SSL +constexpr bool enable_ssl = true; +constexpr StreamTransport transport = STREAM_SSL; +#else +constexpr bool enable_ssl = false; +constexpr StreamTransport transport = STREAM_PLAIN; +#endif + +#ifdef __linux__ +using NetProvider = EpollNetProvider; +#else +using NetProvider = LibevNetProvider; +#endif + +template +static int +test_connect(Connector &client, Connection &conn, const std::string &addr, unsigned port, const std::string user = {}, + const std::string passwd = {}) +{ + std::string service = port == 0 ? std::string {} : std::to_string(port); + return client.connect(conn, + { + .address = addr, + .service = service, + .transport = transport, + .user = user, + .passwd = passwd, + }); +} + +class PingRequestProcessor { +public: + static rid_t sendRequest(Connection &conn, size_t thread_id, size_t iter) + { + (void)thread_id; + (void)iter; + rid_t f = conn.ping(); + fail_unless(!conn.futureIsReady(f)); + return f; + } + + static void processResponse(std::optional> &response, size_t thread_id, size_t iter) + { + (void)thread_id; + (void)iter; + fail_unless(response != std::nullopt); + fail_unless(response->header.code == 0); + } +}; + +class ReplaceRequestProcessor { +public: + static rid_t sendRequest(Connection &conn, size_t thread_id, size_t iter) + { + const size_t space_id = 512; + std::tuple data = std::make_tuple(iter, "a", double(iter * thread_id)); + rid_t f = conn.space[space_id].replace(data); + fail_unless(!conn.futureIsReady(f)); + return f; + } + + static void processResponse(std::optional> &response, size_t thread_id, size_t iter) + { + fail_unless(response != std::nullopt); + fail_unless(response->header.code == 0); + + fail_unless(response != std::nullopt); + fail_unless(response->body.data != std::nullopt); + fail_unless(response->body.error_stack == std::nullopt); + + std::vector> response_data; + fail_unless(response->body.data->decode(response_data)); + fail_unless(response_data.size() == 1); + fail_unless(std::get<0>(response_data[0]) == iter); + fail_unless(std::get<1>(response_data[0]) == std::string("a")); + fail_unless(std::fabs(std::get<2>(response_data[0]) - iter * thread_id) + <= std::numeric_limits::epsilon()); + } +}; + +template +static void +multithread_test(void) +{ + TEST_INIT(0); + static constexpr int ITER_NUM = 1000; + static constexpr int THREAD_NUM = 24; + std::vector threads; + threads.reserve(THREAD_NUM); + for (int t = 0; t < THREAD_NUM; t++) { + threads.emplace_back([]() { + Connector client; + std::vector> conns; + for (size_t i = 0; i < ConnPerThread; i++) + conns.emplace_back(client); + for (auto &conn : conns) { + int rc = test_connect(client, conn, localhost, port); + fail_unless(rc == 0); + } + + for (int iter = 0; iter < ITER_NUM; iter++) { + std::array fs; + + for (size_t t = 0; t < ConnPerThread; t++) + fs[t] = RequestProcessor::sendRequest(conns[t], t, iter); + + for (size_t t = 0; t < ConnPerThread; t++) { + client.wait(conns[t], fs[t], WAIT_TIMEOUT); + fail_unless(conns[t].futureIsReady(fs[t])); + std::optional> response = conns[t].getResponse(fs[t]); + RequestProcessor::processResponse(response, t, iter); + } + } + }); + } + for (auto &thread : threads) + thread.join(); +} + +int +main() +{ + /* + * Send STDOUT to /dev/null - otherwise, there will be a ton of debug + * logs and it will be impossible to inspect them on failure. + * + * Note that one can disable debug logs by setting logging level of + * `Logger`. But that's not the case here because we want to cover + * the logger with multi-threading test as well to make sure that + * it is thread-safe, so we shouldn't disable them, but we don't want + * to read them either. + */ + if (freopen("/dev/null", "w", stdout) == NULL) { + std::cerr << "Cannot send STDOUT to /dev/null" << std::endl; + abort(); + } +#ifdef TNTCXX_ENABLE_SSL +#ifndef __FreeBSD__ + // There's no way to disable SIGPIPE for SSL on non-FreeBSD platforms, + // so it is needed to disable signal handling. + signal(SIGPIPE, SIG_IGN); +#endif +#endif + + if (cleanDir() != 0) + return -1; + +#ifdef TNTCXX_ENABLE_SSL + if (genSSLCert() != 0) + return -1; +#endif + + if (launchTarantool(enable_ssl) != 0) + return -1; + + sleep(1); + + multithread_test(); + multithread_test(); + multithread_test(); + multithread_test(); + return 0; +} diff --git a/test/MempoolUnitTest.cpp b/test/MempoolUnitTest.cpp index 95a73ec90..cbb6e5d29 100644 --- a/test/MempoolUnitTest.cpp +++ b/test/MempoolUnitTest.cpp @@ -32,6 +32,7 @@ #include "../src/Utils/Mempool.hpp" #include "Utils/Helpers.hpp" #include +#include template struct Allocation { @@ -247,6 +248,40 @@ test_alignment() } } +template +void +test_move() +{ + TEST_INIT(2, S, 0); + constexpr size_t N = 1024; + tnt::MempoolInstance mp; + + Allocations all; + for (size_t i = 0; i < N; i++) + all.add(mp.allocate()); + fail_unless(all.are_valid()); + + tnt::MempoolInstance mp_move_constructed(std::move(mp)); + for (size_t i = 0; i < N; i++) { + all.add(mp_move_constructed.allocate()); + all.add(mp.allocate()); + } + fail_unless(all.are_valid()); + + tnt::MempoolInstance mp_move_copied; + /* Allocate some memory and let ASAN check if it won't be leaked. */ + for (size_t i = 0; i < N; i++) { + char *ptr = mp_move_copied.allocate(); + (void)ptr; + } + mp_move_copied = std::move(mp); + for (size_t i = 0; i < N; i++) { + all.add(mp_move_copied.allocate()); + all.add(mp.allocate()); + } + fail_unless(all.are_valid()); +} + int main() { test_default<8>(); @@ -275,4 +310,10 @@ int main() test_alignment<120, 2>(); test_alignment<120, 13>(); test_alignment<120, 64>(); + + test_move<8>(); + test_move<64>(); + test_move<14>(); + test_move<72>(); + test_move<80>(); }