Skip to content

Commit 15d2d24

Browse files
committed
multithread wip
1 parent d871bad commit 15d2d24

File tree

4 files changed

+151
-13
lines changed

4 files changed

+151
-13
lines changed

CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ TNTCXX_TEST(NAME Client.test TYPE ctest
209209
LIBRARIES ${COMMON_LIB}
210210
)
211211

212+
TNTCXX_TEST(NAME ClientMultithread.test TYPE ctest
213+
SOURCES src/Client/Connector.hpp test/ClientMultithreadTest.cpp
214+
LIBRARIES ${COMMON_LIB}
215+
)
216+
212217
IF (TNTCXX_ENABLE_SSL)
213218
TNTCXX_TEST(NAME ClientSSL.test TYPE ctest
214219
SOURCES src/Client/Connector.hpp test/ClientTest.cpp

src/Client/Connection.hpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,7 @@ Connection<BUFFER, NetProvider>::execute(const std::string& statement, const T&
612612
{
613613
impl->enc.encodeExecute(statement, parameters);
614614
impl->connector.readyToSend(*this);
615-
return RequestEncoder<BUFFER>::getSync();
615+
return impl->enc.getSync();
616616
}
617617

618618
template<class BUFFER, class NetProvider>
@@ -622,7 +622,7 @@ Connection<BUFFER, NetProvider>::execute(unsigned int stmt_id, const T& paramete
622622
{
623623
impl->enc.encodeExecute(stmt_id, parameters);
624624
impl->connector.readyToSend(*this);
625-
return RequestEncoder<BUFFER>::getSync();
625+
return impl->enc.getSync();
626626
}
627627

628628
template<class BUFFER, class NetProvider>
@@ -631,7 +631,7 @@ Connection<BUFFER, NetProvider>::prepare(const std::string& statement)
631631
{
632632
impl->enc.encodePrepare(statement);
633633
impl->connector.readyToSend(*this);
634-
return RequestEncoder<BUFFER>::getSync();
634+
return impl->enc.getSync();
635635
}
636636

637637
template<class BUFFER, class NetProvider>
@@ -641,7 +641,7 @@ Connection<BUFFER, NetProvider>::call(const std::string &func, const T &args)
641641
{
642642
impl->enc.encodeCall(func, args);
643643
impl->connector.readyToSend(*this);
644-
return RequestEncoder<BUFFER>::getSync();
644+
return impl->enc.getSync();
645645
}
646646

647647
template<class BUFFER, class NetProvider>
@@ -650,7 +650,7 @@ Connection<BUFFER, NetProvider>::ping()
650650
{
651651
impl->enc.encodePing();
652652
impl->connector.readyToSend(*this);
653-
return RequestEncoder<BUFFER>::getSync();
653+
return impl->enc.getSync();
654654
}
655655

656656
template<class BUFFER, class NetProvider>
@@ -660,7 +660,7 @@ Connection<BUFFER, NetProvider>::insert(const T &tuple, uint32_t space_id)
660660
{
661661
impl->enc.encodeInsert(tuple, space_id);
662662
impl->connector.readyToSend(*this);
663-
return RequestEncoder<BUFFER>::getSync();
663+
return impl->enc.getSync();
664664
}
665665

666666
template<class BUFFER, class NetProvider>
@@ -670,7 +670,7 @@ Connection<BUFFER, NetProvider>::replace(const T &tuple, uint32_t space_id)
670670
{
671671
impl->enc.encodeReplace(tuple, space_id);
672672
impl->connector.readyToSend(*this);
673-
return RequestEncoder<BUFFER>::getSync();
673+
return impl->enc.getSync();
674674
}
675675

676676
template<class BUFFER, class NetProvider>
@@ -681,7 +681,7 @@ Connection<BUFFER, NetProvider>::delete_(const T &key, uint32_t space_id,
681681
{
682682
impl->enc.encodeDelete(key, space_id, index_id);
683683
impl->connector.readyToSend(*this);
684-
return RequestEncoder<BUFFER>::getSync();
684+
return impl->enc.getSync();
685685
}
686686

687687
template<class BUFFER, class NetProvider>
@@ -692,7 +692,7 @@ Connection<BUFFER, NetProvider>::update(const K &key, const T &tuple,
692692
{
693693
impl->enc.encodeUpdate(key, tuple, space_id, index_id);
694694
impl->connector.readyToSend(*this);
695-
return RequestEncoder<BUFFER>::getSync();
695+
return impl->enc.getSync();
696696
}
697697

698698
template<class BUFFER, class NetProvider>
@@ -703,7 +703,7 @@ Connection<BUFFER, NetProvider>::upsert(const T &tuple, const O &ops,
703703
{
704704
impl->enc.encodeUpsert(tuple, ops, space_id, index_base);
705705
impl->connector.readyToSend(*this);
706-
return RequestEncoder<BUFFER>::getSync();
706+
return impl->enc.getSync();
707707
}
708708

709709
template<class BUFFER, class NetProvider>
@@ -716,7 +716,7 @@ Connection<BUFFER, NetProvider>::select(const T &key, uint32_t space_id,
716716
impl->enc.encodeSelect(key, space_id, index_id, limit,
717717
offset, iterator);
718718
impl->connector.readyToSend(*this);
719-
return RequestEncoder<BUFFER>::getSync();
719+
return impl->enc.getSync();
720720
}
721721

722722
template<class BUFFER, class NetProvider>

src/Client/RequestEncoder.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ class RequestEncoder {
9797
const Greeting &greet);
9898

9999
/** Sync value is used as request id. */
100-
static size_t getSync() { return sync; }
100+
size_t getSync() { return sync; }
101101
static constexpr size_t PREHEADER_SIZE = 5;
102102
private:
103103
void encodeHeader(int request);
104104
BUFFER &m_Buf;
105-
inline static ssize_t sync = 0;
105+
ssize_t sync = 0;
106106
};
107107

108108
template<class BUFFER>

test/ClientMultithreadTest.cpp

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
3+
*
4+
* Redistribution and use in source and binary forms, with or
5+
* without modification, are permitted provided that the following
6+
* conditions are met:
7+
*
8+
* 1. Redistributions of source code must retain the above
9+
* copyright notice, this list of conditions and the
10+
* following disclaimer.
11+
*
12+
* 2. Redistributions in binary form must reproduce the above
13+
* copyright notice, this list of conditions and the following
14+
* disclaimer in the documentation and/or other materials
15+
* provided with the distribution.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18+
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19+
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21+
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22+
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25+
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26+
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28+
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29+
* SUCH DAMAGE.
30+
*/
31+
#include "Utils/Helpers.hpp"
32+
#include "Utils/System.hpp"
33+
#include "Utils/UserTuple.hpp"
34+
35+
#include "../src/Client/LibevNetProvider.hpp"
36+
#include "../src/Client/Connector.hpp"
37+
38+
#include <thread>
39+
40+
const char *localhost = "127.0.0.1";
41+
int port = 3301;
42+
int dummy_server_port = 3302;
43+
const char *unixsocket = "./tnt.sock";
44+
int WAIT_TIMEOUT = 1000; //milliseconds
45+
46+
using Buf_t = tnt::Buffer<16 * 1024, tnt::MempoolInstance<16 * 1024>>;
47+
48+
#ifdef TNTCXX_ENABLE_SSL
49+
constexpr bool enable_ssl = true;
50+
constexpr StreamTransport transport = STREAM_SSL;
51+
#else
52+
constexpr bool enable_ssl = false;
53+
constexpr StreamTransport transport = STREAM_PLAIN;
54+
#endif
55+
56+
#ifdef __linux__
57+
using NetProvider = EpollNetProvider<Buf_t, DefaultStream>;
58+
#else
59+
using NetProvider = LibevNetProvider<Buf_t, DefaultStream>;
60+
#endif
61+
62+
template <class Connector, class Connection>
63+
static int
64+
test_connect(Connector &client, Connection &conn, const std::string &addr,
65+
unsigned port,
66+
const std::string user = {}, const std::string passwd = {})
67+
{
68+
std::string service = port == 0 ? std::string{} : std::to_string(port);
69+
return client.connect(conn, {
70+
.address = addr,
71+
.service = service,
72+
.transport = transport,
73+
.user = user,
74+
.passwd = passwd,
75+
});
76+
}
77+
78+
void
79+
single_conn_ping(void)
80+
{
81+
TEST_INIT(0);
82+
static constexpr int ITER_NUM = 10000;
83+
static constexpr int THREAD_NUM = 100;
84+
std::vector<std::thread> threads;
85+
threads.reserve(THREAD_NUM);
86+
for (int t = 0; t < THREAD_NUM; t++) {
87+
threads.emplace_back([]() {
88+
Connector<Buf_t, NetProvider> client;
89+
Connection<Buf_t, NetProvider> conn(client);
90+
int rc = test_connect(client, conn, localhost, port);
91+
fail_unless(rc == 0);
92+
93+
for (int i = 0; i < ITER_NUM; i++) {
94+
rid_t f = conn.ping();
95+
fail_unless(!conn.futureIsReady(f));
96+
client.wait(conn, f, WAIT_TIMEOUT);
97+
fail_unless(conn.futureIsReady(f));
98+
std::optional<Response<Buf_t>> response = conn.getResponse(f);
99+
fail_unless(response != std::nullopt);
100+
fail_unless(response->header.code == 0);
101+
}
102+
});
103+
}
104+
for (auto &thread : threads)
105+
thread.join();
106+
}
107+
108+
int main()
109+
{
110+
#ifdef TNTCXX_ENABLE_SSL
111+
#ifndef __FreeBSD__
112+
// There's no way to disable SIGPIPE for SSL on non-FreeBSD platforms,
113+
// so it is needed to disable signal handling.
114+
signal(SIGPIPE, SIG_IGN);
115+
#endif
116+
#endif
117+
118+
if (cleanDir() != 0)
119+
return -1;
120+
121+
#ifdef TNTCXX_ENABLE_SSL
122+
if (genSSLCert() != 0)
123+
return -1;
124+
#endif
125+
126+
if (launchTarantool(enable_ssl) != 0)
127+
return -1;
128+
129+
sleep(1);
130+
131+
single_conn_ping();
132+
return 0;
133+
}

0 commit comments

Comments
 (0)