Skip to content

Commit a103995

Browse files
committed
client: return requested future with result argument of wait
Currently, the argument returns any decoded future - that is inconvenient and completely unusable. Let's return only the requested future, or nothing, if it's not ready. Closes #112
1 parent 0ff6493 commit a103995

File tree

3 files changed

+35
-11
lines changed

3 files changed

+35
-11
lines changed

src/Client/Connection.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ class Connection
231231
template<class B, class N>
232232
friend
233233
enum DecodeStatus processResponse(Connection<B, N> &conn,
234-
Response<B> *result);
234+
int req_sync, Response<B> *result);
235235

236236
template<class B, class N>
237237
friend
@@ -531,7 +531,7 @@ inputBufGC(Connection<BUFFER, NetProvider> &conn)
531531
template<class BUFFER, class NetProvider>
532532
DecodeStatus
533533
processResponse(Connection<BUFFER, NetProvider> &conn,
534-
Response<BUFFER> *result)
534+
int req_sync, Response<BUFFER> *result)
535535
{
536536
//Decode response. In case of success - fill in feature map
537537
//and adjust end-of-decoded data pointer. Call GC if needed.
@@ -563,7 +563,7 @@ processResponse(Connection<BUFFER, NetProvider> &conn,
563563
}
564564
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
565565
response.header.code, ", schema=", response.header.schema_id);
566-
if (result != nullptr) {
566+
if (result != nullptr && response.header.sync == req_sync) {
567567
*result = std::move(response);
568568
} else {
569569
conn.impl->futures.insert({response.header.sync,

src/Client/Connector.hpp

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,12 @@ class Connector
9393
/**
9494
* A helper to decode responses of a connection.
9595
* Can be called when the connection is not ready to decode - it's just no-op.
96+
* If `result` is not `nullptr`, it is used to return response for a request with
97+
* `req_sync` sync. If `result` is `nullptr` - `req_sync` is ignored.
9698
* Returns -1 in the case of any error, 0 on success.
9799
*/
98100
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
99-
Response<BUFFER> *result);
101+
int req_sync, Response<BUFFER> *result);
100102
private:
101103
NetProvider m_NetProvider;
102104
/**
@@ -171,7 +173,7 @@ Connector<BUFFER, NetProvider>::close(ConnectionImpl<BUFFER, NetProvider> &conn)
171173
template<class BUFFER, class NetProvider>
172174
int
173175
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
174-
Response<BUFFER> *result)
176+
int req_sync, Response<BUFFER> *result)
175177
{
176178
if (!hasDataToDecode(conn))
177179
return 0;
@@ -181,7 +183,7 @@ Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, Net
181183

182184
int rc = 0;
183185
while (hasDataToDecode(conn)) {
184-
DecodeStatus status = processResponse(conn, result);
186+
DecodeStatus status = processResponse(conn, req_sync, result);
185187
if (status == DECODE_ERR) {
186188
rc = -1;
187189
break;
@@ -211,9 +213,10 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
211213
Timer timer{timeout};
212214
timer.start();
213215
static constexpr int INVALID_SYNC = -1;
216+
int req_sync = static_cast<int>(future);
214217
if (result != NULL)
215218
result->header.sync = INVALID_SYNC;
216-
if (connectionDecodeResponses(conn, result) != 0)
219+
if (connectionDecodeResponses(conn, req_sync, result) != 0)
217220
return -1;
218221
if (result != NULL && result->header.sync != INVALID_SYNC) {
219222
LOG_DEBUG("Future ", future, " is ready and decoded");
@@ -225,7 +228,7 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
225228
strerror(errno), errno);
226229
return -1;
227230
}
228-
if (connectionDecodeResponses(conn, result) != 0)
231+
if (connectionDecodeResponses(conn, req_sync, result) != 0)
229232
return -1;
230233
if (result != NULL && result->header.sync != INVALID_SYNC) {
231234
LOG_DEBUG("Future ", future, " is ready and decoded");
@@ -264,7 +267,7 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
264267
strerror(errno), errno);
265268
return -1;
266269
}
267-
if (connectionDecodeResponses(conn, nullptr) != 0)
270+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
268271
return -1;
269272
bool finish = true;
270273
for (size_t i = last_not_ready; i < futures.size(); ++i) {
@@ -304,7 +307,7 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
304307
}
305308
Connection<BUFFER, NetProvider> conn = *m_ReadyToDecode.begin();
306309
assert(hasDataToDecode(conn));
307-
if (connectionDecodeResponses(conn, nullptr) != 0)
310+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
308311
return std::nullopt;
309312
return conn;
310313
}
@@ -323,7 +326,7 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
323326
strerror(errno), errno);
324327
return -1;
325328
}
326-
if (connectionDecodeResponses(conn, nullptr) != 0)
329+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
327330
return -1;
328331
if ((conn.getFutureCount() - ready_futures) >= future_count)
329332
return 0;

test/ClientTest.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,27 @@ test_wait(Connector<BUFFER, NetProvider> &client)
13031303
fail_unless(result.header.sync == static_cast<int>(f));
13041304
fail_unless(result.header.code == 0);
13051305

1306+
TEST_CASE("wait with argument result - several requests");
1307+
/* Obtain in direct order. */
1308+
f1 = conn.ping();
1309+
f2 = conn.ping();
1310+
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
1311+
fail_unless(result.header.sync == static_cast<int>(f1));
1312+
fail_unless(result.header.code == 0);
1313+
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
1314+
fail_unless(result.header.sync == static_cast<int>(f2));
1315+
fail_unless(result.header.code == 0);
1316+
1317+
/* Obtain in reversed order. */
1318+
f1 = conn.ping();
1319+
f2 = conn.ping();
1320+
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
1321+
fail_unless(result.header.sync == static_cast<int>(f2));
1322+
fail_unless(result.header.code == 0);
1323+
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
1324+
fail_unless(result.header.sync == static_cast<int>(f1));
1325+
fail_unless(result.header.code == 0);
1326+
13061327
client.close(conn);
13071328
}
13081329

0 commit comments

Comments
 (0)