Skip to content

Commit 5fbef96

Browse files
committed
client: redesign waiting methods
Currently, call of any waiting method (`wait`, `waitAll` and so on) with zero timeout blocks execution until all required responses are received. That's not great because in some scenarios users want to check if there are any responses ready without blocking execution. Of course, one can use `wait(1)` which sleeps for only a millisecond, but anyway it's often unexpected that `wait(0)` sleeps forever. Let's stick to `epoll_wait` interface - `wait(0)` is non-blocking and `wait(-1)` blocks forever. Note that before the commit, passing `-1` would lead to an assertion failure. Note that we used to block forever in waiting methods by default. To maintain this behavior, the commit updates default value of timeout since the old default (zero) means non-blocking polling of connections now. Do not forget that `wait(0)` must poll connections - for this purpose, we should check if the timer has expired after the first call of `NetProvider.wait()`. The `timer.isExpired()` call actually protected us from calling `NetProvider.wait()` with negative timeout, and now it's gone, so let's replace `timer.elapsed()` method with new `timer.timeLeft()` which is more safe to use here (see #128 for details). Closes #111 Closes #128
1 parent 3339726 commit 5fbef96

File tree

7 files changed

+153
-22
lines changed

7 files changed

+153
-22
lines changed

src/Client/Connector.hpp

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@ class Connector
7575
const std::string& addr, unsigned port);
7676

7777
int wait(Connection<BUFFER, NetProvider> &conn, rid_t future,
78-
int timeout = 0, Response<BUFFER> *result = nullptr);
78+
int timeout = -1, Response<BUFFER> *result = nullptr);
7979
int waitAll(Connection<BUFFER, NetProvider> &conn,
80-
const std::vector<rid_t > &futures, int timeout = 0);
80+
const std::vector<rid_t > &futures, int timeout = -1);
8181
int waitCount(Connection<BUFFER, NetProvider> &conn,
82-
size_t feature_count, int timeout = 0);
83-
std::optional<Connection<BUFFER, NetProvider>> waitAny(int timeout = 0);
82+
size_t feature_count, int timeout = -1);
83+
std::optional<Connection<BUFFER, NetProvider>> waitAny(int timeout = -1);
8484
////////////////////////////Service interfaces//////////////////////////
8585
void readyToDecode(const Connection<BUFFER, NetProvider> &conn);
8686
void readyToSend(const Connection<BUFFER, NetProvider> &conn);
@@ -185,9 +185,8 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
185185
timer.start();
186186
if (connectionDecodeResponses(conn, result) != 0)
187187
return -1;
188-
while (!conn.hasError() && !conn.futureIsReady(future) &&
189-
!timer.isExpired()) {
190-
if (m_NetProvider.wait(timeout - timer.elapsed()) != 0) {
188+
while (!conn.hasError() && !conn.futureIsReady(future)) {
189+
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
191190
conn.setError(std::string("Failed to poll: ") +
192191
strerror(errno), errno);
193192
return -1;
@@ -203,6 +202,8 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
203202
if (!hasDataToDecode(conn))
204203
m_ReadyToDecode.erase(conn);
205204
}
205+
if (timer.isExpired())
206+
break;
206207
}
207208
if (conn.hasError()) {
208209
LOG_ERROR("Connection got an error: ", conn.getError().msg);
@@ -226,8 +227,8 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
226227
Timer timer{timeout};
227228
timer.start();
228229
size_t last_not_ready = 0;
229-
while (!conn.hasError() && !timer.isExpired()) {
230-
if (m_NetProvider.wait(timeout - timer.elapsed()) != 0) {
230+
while (!conn.hasError()) {
231+
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
231232
conn.setError(std::string("Failed to poll: ") +
232233
strerror(errno), errno);
233234
return -1;
@@ -249,6 +250,8 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
249250
}
250251
if (finish)
251252
return 0;
253+
if (timer.isExpired())
254+
break;
252255
}
253256
if (conn.hasError()) {
254257
LOG_ERROR("Connection got an error: ", conn.getError().msg);
@@ -264,8 +267,11 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
264267
{
265268
Timer timer{timeout};
266269
timer.start();
267-
while (m_ReadyToDecode.empty() && !timer.isExpired())
268-
m_NetProvider.wait(timeout - timer.elapsed());
270+
while (m_ReadyToDecode.empty()) {
271+
m_NetProvider.wait(timer.timeLeft());
272+
if (timer.isExpired())
273+
break;
274+
}
269275
if (m_ReadyToDecode.empty()) {
270276
LOG_ERROR("wait() has been timed out! No responses are received");
271277
return std::nullopt;
@@ -287,8 +293,8 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
287293
Timer timer{timeout};
288294
timer.start();
289295
size_t ready_futures = conn.getFutureCount();
290-
while (!conn.hasError() && !timer.isExpired()) {
291-
if (m_NetProvider.wait(timeout - timer.elapsed()) != 0) {
296+
while (!conn.hasError()) {
297+
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
292298
conn.setError(std::string("Failed to poll: ") +
293299
strerror(errno), errno);
294300
return -1;
@@ -302,6 +308,8 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
302308
}
303309
if ((conn.getFutureCount() - ready_futures) >= future_count)
304310
return 0;
311+
if (timer.isExpired())
312+
break;
305313
}
306314
if (conn.hasError()) {
307315
LOG_ERROR("Connection got an error: ", conn.getError().msg);

src/Client/EpollNetProvider.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,8 @@ template<class BUFFER, class Stream>
258258
int
259259
EpollNetProvider<BUFFER, Stream>::wait(int timeout)
260260
{
261-
assert(timeout >= 0);
262-
if (timeout == 0)
261+
assert(timeout >= -1);
262+
if (timeout == -1)
263263
timeout = TIMEOUT_INFINITY;
264264
LOG_DEBUG("Network engine wait for ", timeout, " milliseconds");
265265
/* Send pending requests. */

src/Client/LibevNetProvider.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ template<class BUFFER, class Stream>
363363
int
364364
LibevNetProvider<BUFFER, Stream>::wait(int timeout)
365365
{
366-
assert(timeout >= 0);
366+
assert(timeout >= -1);
367367
if (timeout > 0) {
368368
ev_timer_init(&m_TimeoutWatcher, &timeout_cb, timeout / MILLISECONDS, 0 /* repeat */);
369369
ev_timer_start(m_Loop, &m_TimeoutWatcher);
@@ -381,6 +381,8 @@ LibevNetProvider<BUFFER, Stream>::wait(int timeout)
381381

382382
}
383383
}
384-
ev_run(m_Loop, EVRUN_ONCE);
384+
/* Work in non-blocking mode when the timeout is zero. */
385+
int flags = timeout == 0 ? EVRUN_NOWAIT : EVRUN_ONCE;
386+
ev_run(m_Loop, flags);
385387
return 0;
386388
}

src/Utils/Timer.hpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
3030
* SUCH DAMAGE.
3131
*/
32+
#include <algorithm>
3233
#include <chrono>
3334

3435
class Timer {
@@ -40,21 +41,30 @@ class Timer {
4041
}
4142
bool isExpired() const
4243
{
43-
if (m_Timeout == std::chrono::milliseconds{0})
44+
if (m_Timeout == std::chrono::milliseconds{-1})
4445
return false;
4546
std::chrono::time_point<std::chrono::steady_clock> end =
4647
std::chrono::steady_clock::now();
4748
std::chrono::milliseconds elapsed =
4849
std::chrono::duration_cast<std::chrono::milliseconds>(end - m_Start);
4950
return elapsed >= m_Timeout;
5051
}
51-
int elapsed() const
52+
/**
53+
* The function to obtain amount of time left. Returns:
54+
* 1. `-1` if the initial timeout was `-1`.
55+
* 2. `0` if the timer has expired.
56+
* 3. Otherwise, amount of milliseconds left is returned.
57+
* NB: the function should not be used for expiration check - use `isExpired` instead.
58+
*/
59+
int timeLeft() const
5260
{
53-
if (m_Timeout == std::chrono::milliseconds{0})
54-
return 0;
61+
if (m_Timeout == std::chrono::milliseconds{-1})
62+
return -1;
5563
std::chrono::time_point<std::chrono::steady_clock> end =
5664
std::chrono::steady_clock::now();
57-
return std::chrono::duration_cast<std::chrono::milliseconds>(end - m_Start).count();
65+
int timeLeft =
66+
m_Timeout.count() - std::chrono::duration_cast<std::chrono::milliseconds>(end - m_Start).count();
67+
return std::max(0, timeLeft);
5868
}
5969
private:
6070
std::chrono::milliseconds m_Timeout;

test/ClientTest.cpp

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,6 +1144,104 @@ response_decoding(Connector<BUFFER, NetProvider> &client)
11441144
client.close(conn);
11451145
}
11461146

1147+
/** Checks all available `wait` methods of connector. */
1148+
template <class BUFFER, class NetProvider>
1149+
void
1150+
test_wait(Connector<BUFFER, NetProvider> &client)
1151+
{
1152+
TEST_INIT(0);
1153+
static constexpr double SLEEP_TIME = 0.1;
1154+
1155+
Connection<Buf_t, NetProvider> conn(client);
1156+
int rc = test_connect(client, conn, localhost, port);
1157+
fail_unless(rc == 0);
1158+
1159+
TEST_CASE("wait(0) and wait(-1)");
1160+
rid_t f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1161+
fail_unless(!conn.futureIsReady(f));
1162+
client.wait(conn, f, 0);
1163+
fail_unless(!conn.futureIsReady(f));
1164+
client.wait(conn, f, -1);
1165+
fail_unless(conn.futureIsReady(f));
1166+
std::optional<Response<Buf_t>> response = conn.getResponse(f);
1167+
fail_unless(response.has_value());
1168+
1169+
TEST_CASE("wait(0) polls connections");
1170+
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1171+
fail_unless(!conn.futureIsReady(f));
1172+
while (!conn.futureIsReady(f)) {
1173+
client.wait(conn, f, 0);
1174+
usleep(10 * 1000); /* 10ms */
1175+
}
1176+
fail_unless(conn.futureIsReady(f));
1177+
response = conn.getResponse(f);
1178+
fail_unless(response.has_value());
1179+
1180+
TEST_CASE("waitAny(0) and waitAny(-1)");
1181+
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1182+
fail_unless(!client.waitAny(0).has_value());
1183+
fail_unless(client.waitAny(-1).has_value());
1184+
response = conn.getResponse(f);
1185+
fail_unless(response.has_value());
1186+
1187+
TEST_CASE("waitAny(0) polls connections");
1188+
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1189+
fail_unless(!conn.futureIsReady(f));
1190+
while (!conn.futureIsReady(f)) {
1191+
client.waitAny(0);
1192+
usleep(10 * 1000); /* 10ms */
1193+
}
1194+
fail_unless(conn.futureIsReady(f));
1195+
response = conn.getResponse(f);
1196+
fail_unless(response.has_value());
1197+
1198+
TEST_CASE("waitAll(0) and waitAll(-1)");
1199+
std::vector<rid_t> fs;
1200+
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
1201+
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
1202+
fail_unless(client.waitAll(conn, fs, 0) == -1);
1203+
fail_unless(client.waitAll(conn, fs, -1) == 0);
1204+
response = conn.getResponse(fs[0]);
1205+
fail_unless(response.has_value());
1206+
response = conn.getResponse(fs[1]);
1207+
fail_unless(response.has_value());
1208+
1209+
TEST_CASE("waitAll(0) polls connections");
1210+
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1211+
fail_unless(!conn.futureIsReady(f));
1212+
while (!conn.futureIsReady(f)) {
1213+
client.waitAll(conn, std::vector<rid_t>{f}, 0);
1214+
usleep(10 * 1000); /* 10ms */
1215+
}
1216+
fail_unless(conn.futureIsReady(f));
1217+
response = conn.getResponse(f);
1218+
fail_unless(response.has_value());
1219+
1220+
TEST_CASE("waitCount(0) and waitCount(-1)");
1221+
fs.clear();
1222+
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
1223+
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
1224+
fail_unless(client.waitCount(conn, 2, 0) == -1);
1225+
fail_unless(client.waitCount(conn, 2, -1) == 0);
1226+
response = conn.getResponse(fs[0]);
1227+
fail_unless(response.has_value());
1228+
response = conn.getResponse(fs[1]);
1229+
fail_unless(response.has_value());
1230+
1231+
TEST_CASE("waitCount(0) polls connections");
1232+
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1233+
fail_unless(!conn.futureIsReady(f));
1234+
while (!conn.futureIsReady(f)) {
1235+
client.waitCount(conn, 1, 0);
1236+
usleep(10 * 1000); /* 10ms */
1237+
}
1238+
fail_unless(conn.futureIsReady(f));
1239+
response = conn.getResponse(f);
1240+
fail_unless(response.has_value());
1241+
1242+
client.close(conn);
1243+
}
1244+
11471245
int main()
11481246
{
11491247
#ifdef TNTCXX_ENABLE_SSL
@@ -1193,5 +1291,6 @@ int main()
11931291
#endif
11941292
::test_dead_connection_wait();
11951293
response_decoding(client);
1294+
test_wait(client);
11961295
return 0;
11971296
}

test/cfg.lua

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ function remote_echo(...)
3434
return {...}
3535
end
3636

37+
function remote_sleep(timeout)
38+
local fiber = require('fiber')
39+
fiber.sleep(timeout)
40+
return nil
41+
end
42+
3743
function get_rps()
3844
return box.stat.net().REQUESTS.rps
3945
end

test/cfg_ssl.lua

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ function remote_echo(...)
3232
return {...}
3333
end
3434

35+
function remote_sleep(timeout)
36+
local fiber = require('fiber')
37+
fiber.sleep(timeout)
38+
return nil
39+
end
40+
3541
function get_rps()
3642
return box.stat.net().REQUESTS.rps
3743
end

0 commit comments

Comments
 (0)