Skip to content

Commit 67069ea

Browse files
committed
multithread wip
1 parent 9d9642c commit 67069ea

File tree

5 files changed

+226
-14
lines changed

5 files changed

+226
-14
lines changed

CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ IF (TNTCXX_BUILD_TESTING)
8686
# Retrieve the source directory to later get the header path.
8787
FETCHCONTENT_GETPROPERTIES(msgpuck)
8888
FETCHCONTENT_MAKEAVAILABLE(msgpuck)
89+
90+
find_package(Threads REQUIRED)
8991
ENDIF()
9092

9193
OPTION(TNTCXX_ENABLE_SANITIZERS
@@ -209,6 +211,11 @@ TNTCXX_TEST(NAME Client.test TYPE ctest
209211
LIBRARIES ${COMMON_LIB}
210212
)
211213

214+
TNTCXX_TEST(NAME ClientMultithread.test TYPE ctest
215+
SOURCES src/Client/Connector.hpp test/ClientMultithreadTest.cpp
216+
LIBRARIES ${COMMON_LIB} Threads::Threads
217+
)
218+
212219
IF (TNTCXX_ENABLE_SSL)
213220
TNTCXX_TEST(NAME ClientSSL.test TYPE ctest
214221
SOURCES src/Client/Connector.hpp test/ClientTest.cpp

src/Client/Connection.hpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,7 @@ Connection<BUFFER, NetProvider>::execute(const std::string& statement, const T&
612612
{
613613
impl->enc.encodeExecute(statement, parameters);
614614
impl->connector.readyToSend(*this);
615-
return RequestEncoder<BUFFER>::getSync();
615+
return impl->enc.getSync();
616616
}
617617

618618
template<class BUFFER, class NetProvider>
@@ -622,7 +622,7 @@ Connection<BUFFER, NetProvider>::execute(unsigned int stmt_id, const T& paramete
622622
{
623623
impl->enc.encodeExecute(stmt_id, parameters);
624624
impl->connector.readyToSend(*this);
625-
return RequestEncoder<BUFFER>::getSync();
625+
return impl->enc.getSync();
626626
}
627627

628628
template<class BUFFER, class NetProvider>
@@ -631,7 +631,7 @@ Connection<BUFFER, NetProvider>::prepare(const std::string& statement)
631631
{
632632
impl->enc.encodePrepare(statement);
633633
impl->connector.readyToSend(*this);
634-
return RequestEncoder<BUFFER>::getSync();
634+
return impl->enc.getSync();
635635
}
636636

637637
template<class BUFFER, class NetProvider>
@@ -641,7 +641,7 @@ Connection<BUFFER, NetProvider>::call(const std::string &func, const T &args)
641641
{
642642
impl->enc.encodeCall(func, args);
643643
impl->connector.readyToSend(*this);
644-
return RequestEncoder<BUFFER>::getSync();
644+
return impl->enc.getSync();
645645
}
646646

647647
template<class BUFFER, class NetProvider>
@@ -650,7 +650,7 @@ Connection<BUFFER, NetProvider>::ping()
650650
{
651651
impl->enc.encodePing();
652652
impl->connector.readyToSend(*this);
653-
return RequestEncoder<BUFFER>::getSync();
653+
return impl->enc.getSync();
654654
}
655655

656656
template<class BUFFER, class NetProvider>
@@ -660,7 +660,7 @@ Connection<BUFFER, NetProvider>::insert(const T &tuple, uint32_t space_id)
660660
{
661661
impl->enc.encodeInsert(tuple, space_id);
662662
impl->connector.readyToSend(*this);
663-
return RequestEncoder<BUFFER>::getSync();
663+
return impl->enc.getSync();
664664
}
665665

666666
template<class BUFFER, class NetProvider>
@@ -670,7 +670,7 @@ Connection<BUFFER, NetProvider>::replace(const T &tuple, uint32_t space_id)
670670
{
671671
impl->enc.encodeReplace(tuple, space_id);
672672
impl->connector.readyToSend(*this);
673-
return RequestEncoder<BUFFER>::getSync();
673+
return impl->enc.getSync();
674674
}
675675

676676
template<class BUFFER, class NetProvider>
@@ -681,7 +681,7 @@ Connection<BUFFER, NetProvider>::delete_(const T &key, uint32_t space_id,
681681
{
682682
impl->enc.encodeDelete(key, space_id, index_id);
683683
impl->connector.readyToSend(*this);
684-
return RequestEncoder<BUFFER>::getSync();
684+
return impl->enc.getSync();
685685
}
686686

687687
template<class BUFFER, class NetProvider>
@@ -692,7 +692,7 @@ Connection<BUFFER, NetProvider>::update(const K &key, const T &tuple,
692692
{
693693
impl->enc.encodeUpdate(key, tuple, space_id, index_id);
694694
impl->connector.readyToSend(*this);
695-
return RequestEncoder<BUFFER>::getSync();
695+
return impl->enc.getSync();
696696
}
697697

698698
template<class BUFFER, class NetProvider>
@@ -703,7 +703,7 @@ Connection<BUFFER, NetProvider>::upsert(const T &tuple, const O &ops,
703703
{
704704
impl->enc.encodeUpsert(tuple, ops, space_id, index_base);
705705
impl->connector.readyToSend(*this);
706-
return RequestEncoder<BUFFER>::getSync();
706+
return impl->enc.getSync();
707707
}
708708

709709
template<class BUFFER, class NetProvider>
@@ -716,7 +716,7 @@ Connection<BUFFER, NetProvider>::select(const T &key, uint32_t space_id,
716716
impl->enc.encodeSelect(key, space_id, index_id, limit,
717717
offset, iterator);
718718
impl->connector.readyToSend(*this);
719-
return RequestEncoder<BUFFER>::getSync();
719+
return impl->enc.getSync();
720720
}
721721

722722
template<class BUFFER, class NetProvider>

src/Client/LibevNetProvider.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ LibevNetProvider<BUFFER, Stream>::LibevNetProvider(Connector_t &connector,
270270
m_Connector(connector), m_Loop(loop), m_IsOwnLoop(false)
271271
{
272272
if (m_Loop == nullptr) {
273-
m_Loop = ev_default_loop(0);
273+
m_Loop = ev_loop_new(0);
274274
m_IsOwnLoop = true;
275275
}
276276
assert(m_Loop != nullptr);

src/Client/RequestEncoder.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ class RequestEncoder {
9797
const Greeting &greet);
9898

9999
/** Sync value is used as request id. */
100-
static size_t getSync() { return sync; }
100+
size_t getSync() { return sync; }
101101
static constexpr size_t PREHEADER_SIZE = 5;
102102
private:
103103
void encodeHeader(int request);
104104
BUFFER &m_Buf;
105-
inline static ssize_t sync = 0;
105+
ssize_t sync = 0;
106106
};
107107

108108
template<class BUFFER>

test/ClientMultithreadTest.cpp

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
3+
*
4+
* Redistribution and use in source and binary forms, with or
5+
* without modification, are permitted provided that the following
6+
* conditions are met:
7+
*
8+
* 1. Redistributions of source code must retain the above
9+
* copyright notice, this list of conditions and the
10+
* following disclaimer.
11+
*
12+
* 2. Redistributions in binary form must reproduce the above
13+
* copyright notice, this list of conditions and the following
14+
* disclaimer in the documentation and/or other materials
15+
* provided with the distribution.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18+
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19+
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21+
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22+
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25+
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26+
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28+
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29+
* SUCH DAMAGE.
30+
*/
31+
#include "Utils/Helpers.hpp"
32+
#include "Utils/System.hpp"
33+
#include "Utils/UserTuple.hpp"
34+
35+
#include "../src/Client/LibevNetProvider.hpp"
36+
#include "../src/Client/Connector.hpp"
37+
38+
#include <thread>
39+
#include <tuple>
40+
#include <cmath>
41+
42+
const char *localhost = "127.0.0.1";
43+
int port = 3301;
44+
int dummy_server_port = 3302;
45+
const char *unixsocket = "./tnt.sock";
46+
int WAIT_TIMEOUT = 1000; //milliseconds
47+
48+
using Buf_t = tnt::Buffer<16 * 1024, tnt::MempoolInstance<16 * 1024>>;
49+
50+
#ifdef TNTCXX_ENABLE_SSL
51+
constexpr bool enable_ssl = true;
52+
constexpr StreamTransport transport = STREAM_SSL;
53+
#else
54+
constexpr bool enable_ssl = false;
55+
constexpr StreamTransport transport = STREAM_PLAIN;
56+
#endif
57+
58+
#ifdef __linux__
59+
using NetProvider = EpollNetProvider<Buf_t, DefaultStream>;
60+
#else
61+
using NetProvider = LibevNetProvider<Buf_t, DefaultStream>;
62+
#endif
63+
64+
template <class Connector, class Connection>
65+
static int
66+
test_connect(Connector &client, Connection &conn, const std::string &addr,
67+
unsigned port,
68+
const std::string user = {}, const std::string passwd = {})
69+
{
70+
std::string service = port == 0 ? std::string{} : std::to_string(port);
71+
return client.connect(conn, {
72+
.address = addr,
73+
.service = service,
74+
.transport = transport,
75+
.user = user,
76+
.passwd = passwd,
77+
});
78+
}
79+
80+
class PingRequestProcessor {
81+
public:
82+
static rid_t
83+
sendRequest(Connection<Buf_t, NetProvider> &conn,
84+
size_t thread_id, size_t iter)
85+
{
86+
(void)thread_id;
87+
(void)iter;
88+
rid_t f = conn.ping();
89+
fail_unless(!conn.futureIsReady(f));
90+
return f;
91+
}
92+
93+
static void
94+
processResponse(std::optional<Response<Buf_t>> &response,
95+
size_t thread_id, size_t iter)
96+
{
97+
(void)thread_id;
98+
(void)iter;
99+
fail_unless(response != std::nullopt);
100+
fail_unless(response->header.code == 0);
101+
}
102+
};
103+
104+
class ReplaceRequestProcessor {
105+
public:
106+
static rid_t
107+
sendRequest(Connection<Buf_t, NetProvider> &conn,
108+
size_t thread_id, size_t iter)
109+
{
110+
const size_t space_id = 512;
111+
std::tuple data = std::make_tuple(iter, "a", double(iter * thread_id));
112+
rid_t f = conn.space[space_id].replace(data);
113+
fail_unless(!conn.futureIsReady(f));
114+
return f;
115+
}
116+
117+
static void
118+
processResponse(std::optional<Response<Buf_t>> &response,
119+
size_t thread_id, size_t iter)
120+
{
121+
fail_unless(response != std::nullopt);
122+
fail_unless(response->header.code == 0);
123+
124+
fail_unless(response != std::nullopt);
125+
fail_unless(response->body.data != std::nullopt);
126+
fail_unless(response->body.error_stack == std::nullopt);
127+
128+
std::vector<std::tuple<size_t, std::string, double>> response_data;
129+
fail_unless(response->body.data->decode(response_data));
130+
fail_unless(response_data.size() == 1);
131+
fail_unless(std::get<0>(response_data[0]) == iter);
132+
fail_unless(std::get<1>(response_data[0]) == std::string("a"));
133+
fail_unless(std::fabs(std::get<2>(response_data[0]) - iter * thread_id) <= std::numeric_limits<double>::epsilon());
134+
}
135+
};
136+
137+
138+
template<typename RequestProcessor, size_t ConnPerThread = 1>
139+
static void
140+
multithread_test(void)
141+
{
142+
TEST_INIT(0);
143+
static constexpr int ITER_NUM = 1000;
144+
static constexpr int THREAD_NUM = 24;
145+
std::vector<std::thread> threads;
146+
threads.reserve(THREAD_NUM);
147+
for (int t = 0; t < THREAD_NUM; t++) {
148+
threads.emplace_back([]() {
149+
Connector<Buf_t, NetProvider> client;
150+
std::vector<Connection<Buf_t, NetProvider>> conns;
151+
for (size_t i = 0; i < ConnPerThread; i++)
152+
conns.emplace_back(client);
153+
for (auto &conn : conns) {
154+
int rc = test_connect(client, conn, localhost, port);
155+
fail_unless(rc == 0);
156+
}
157+
158+
for (int iter = 0; iter < ITER_NUM; iter++) {
159+
std::array<rid_t, ConnPerThread> fs;
160+
161+
for (size_t t = 0; t < ConnPerThread; t++)
162+
fs[t] = RequestProcessor::sendRequest(conns[t], t, iter);
163+
164+
for (size_t t = 0; t < ConnPerThread; t++) {
165+
client.wait(conns[t], fs[t], WAIT_TIMEOUT);
166+
fail_unless(conns[t].futureIsReady(fs[t]));
167+
std::optional<Response<Buf_t>> response = conns[t].getResponse(fs[t]);
168+
RequestProcessor::processResponse(response, t, iter);
169+
}
170+
}
171+
});
172+
}
173+
for (auto &thread : threads)
174+
thread.join();
175+
}
176+
177+
int main()
178+
{
179+
#ifdef TNTCXX_ENABLE_SSL
180+
#ifndef __FreeBSD__
181+
// There's no way to disable SIGPIPE for SSL on non-FreeBSD platforms,
182+
// so it is needed to disable signal handling.
183+
signal(SIGPIPE, SIG_IGN);
184+
#endif
185+
#endif
186+
187+
if (cleanDir() != 0)
188+
return -1;
189+
190+
#ifdef TNTCXX_ENABLE_SSL
191+
if (genSSLCert() != 0)
192+
return -1;
193+
#endif
194+
195+
if (launchTarantool(enable_ssl) != 0)
196+
return -1;
197+
198+
sleep(1);
199+
200+
multithread_test<PingRequestProcessor>();
201+
multithread_test<PingRequestProcessor, 5>();
202+
multithread_test<ReplaceRequestProcessor>();
203+
multithread_test<ReplaceRequestProcessor, 5>();
204+
return 0;
205+
}

0 commit comments

Comments
 (0)