Skip to content

Commit cdba6f6

Browse files
committed
client: always erase decoded futures from corresponding set
Connector uses a special helper for decoding ready responses and then it erases them from `m_ReadyToDecode` set by himself. That's not a good design and it can lead to various bugs (there is already a bug, actually) because it's easy to forgot to earse the decoded future. So let's move the erasure right to the `connectionDecodeResponses` helper. Closes #124
1 parent b89ebc6 commit cdba6f6

File tree

2 files changed

+83
-36
lines changed

2 files changed

+83
-36
lines changed

src/Client/Connector.hpp

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,21 @@ class Connector
8989
std::set<Connection<BUFFER, NetProvider>> m_ReadyToSend;
9090
void close(Connection<BUFFER, NetProvider> &conn);
9191
void close(ConnectionImpl<BUFFER, NetProvider> &conn);
92+
private:
93+
/**
94+
* A helper to decode responses of a connection.
95+
* Can be called when the connection is not ready to decode - it's just no-op.
96+
* Returns -1 in the case of any error, 0 on success.
97+
*/
98+
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
99+
Response<BUFFER> *result);
92100
private:
93101
NetProvider m_NetProvider;
102+
/**
103+
* Set of connections that are ready to decode.
104+
* Shouldn't be modified directly - is managed by methods `readyToDecode`
105+
* and `connectionDecodeResponses`.
106+
*/
94107
std::set<Connection<BUFFER, NetProvider>> m_ReadyToDecode;
95108
};
96109

@@ -157,21 +170,35 @@ Connector<BUFFER, NetProvider>::close(ConnectionImpl<BUFFER, NetProvider> &conn)
157170

158171
template<class BUFFER, class NetProvider>
159172
int
160-
connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
161-
Response<BUFFER> *result)
173+
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
174+
Response<BUFFER> *result)
162175
{
176+
if (!hasDataToDecode(conn))
177+
return 0;
178+
179+
/* Ready to decode connection must be in the corresponding set. */
180+
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
181+
182+
int rc = 0;
163183
while (hasDataToDecode(conn)) {
164-
DecodeStatus rc = processResponse(conn, result);
165-
if (rc == DECODE_ERR)
166-
return -1;
184+
DecodeStatus status = processResponse(conn, result);
185+
if (status == DECODE_ERR) {
186+
rc = -1;
187+
break;
188+
}
167189
//In case we've received only a part of response
168190
//we should wait until the rest arrives - otherwise
169191
//we can't properly decode response. */
170-
if (rc == DECODE_NEEDMORE)
171-
return 0;
172-
assert(rc == DECODE_SUCC);
192+
if (status == DECODE_NEEDMORE) {
193+
rc = 0;
194+
break;
195+
}
196+
assert(status == DECODE_SUCC);
173197
}
174-
return 0;
198+
/* A connection that has no data to decode must not be left in the set. */
199+
if (!hasDataToDecode(conn))
200+
m_ReadyToDecode.erase(conn);
201+
return rc;
175202
}
176203

177204
template<class BUFFER, class NetProvider>
@@ -191,17 +218,8 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
191218
strerror(errno), errno);
192219
return -1;
193220
}
194-
if (hasDataToDecode(conn)) {
195-
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
196-
if (connectionDecodeResponses(conn, result) != 0)
197-
return -1;
198-
/*
199-
* In case we've handled whole data in input buffer -
200-
* mark connection as completed.
201-
*/
202-
if (!hasDataToDecode(conn))
203-
m_ReadyToDecode.erase(conn);
204-
}
221+
if (connectionDecodeResponses(conn, result) != 0)
222+
return -1;
205223
if (timer.isExpired())
206224
break;
207225
}
@@ -233,13 +251,8 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
233251
strerror(errno), errno);
234252
return -1;
235253
}
236-
if (hasDataToDecode(conn)) {
237-
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
238-
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
239-
return -1;
240-
if (!hasDataToDecode(conn))
241-
m_ReadyToDecode.erase(conn);
242-
}
254+
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
255+
return -1;
243256
bool finish = true;
244257
for (size_t i = last_not_ready; i < futures.size(); ++i) {
245258
if (!conn.futureIsReady(futures[i])) {
@@ -280,8 +293,6 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
280293
assert(hasDataToDecode(conn));
281294
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
282295
return std::nullopt;
283-
if (!hasDataToDecode(conn))
284-
m_ReadyToDecode.erase(conn);
285296
return conn;
286297
}
287298

@@ -299,13 +310,8 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
299310
strerror(errno), errno);
300311
return -1;
301312
}
302-
if (hasDataToDecode(conn)) {
303-
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
304-
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
305-
return -1;
306-
if (!hasDataToDecode(conn))
307-
m_ReadyToDecode.erase(conn);
308-
}
313+
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
314+
return -1;
309315
if ((conn.getFutureCount() - ready_futures) >= future_count)
310316
return 0;
311317
if (timer.isExpired())

test/ClientTest.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,6 +1239,47 @@ test_wait(Connector<BUFFER, NetProvider> &client)
12391239
response = conn.getResponse(f);
12401240
fail_unless(response.has_value());
12411241

1242+
TEST_CASE("waitAny after several waits (gh-124)");
1243+
Connection<Buf_t, NetProvider> conn1(client);
1244+
Connection<Buf_t, NetProvider> conn2(client);
1245+
Connection<Buf_t, NetProvider> conn3(client);
1246+
rc = test_connect(client, conn1, localhost, port);
1247+
fail_unless(rc == 0);
1248+
rc = test_connect(client, conn2, localhost, port);
1249+
fail_unless(rc == 0);
1250+
rc = test_connect(client, conn3, localhost, port);
1251+
fail_unless(rc == 0);
1252+
rid_t f1 = conn1.ping();
1253+
rid_t f2 = conn2.ping();
1254+
rid_t f3 = conn3.ping();
1255+
1256+
/* Wait for all connections. */
1257+
fail_unless(client.wait(conn1, f1, WAIT_TIMEOUT) == 0);
1258+
fail_unless(conn1.futureIsReady(f1));
1259+
fail_unless(conn1.getResponse(f1).header.code == 0);
1260+
1261+
fail_unless(client.wait(conn2, f2, WAIT_TIMEOUT) == 0);
1262+
fail_unless(conn2.futureIsReady(f2));
1263+
fail_unless(conn2.getResponse(f2).header.code == 0);
1264+
1265+
fail_unless(client.wait(conn3, f3, WAIT_TIMEOUT) == 0);
1266+
fail_unless(conn3.futureIsReady(f3));
1267+
fail_unless(conn3.getResponse(f3).header.code == 0);
1268+
1269+
/*
1270+
* Wait any - we shouldn't get any of the connections here since we've
1271+
* received all the responses.
1272+
* Note that the connector used to crash here (gh-124) because some of the
1273+
* connnections still could appear in `m_ReadyToDecode` set.
1274+
*/
1275+
std::optional<Connection<Buf_t, NetProvider>> conn_opt = client.waitAny(WAIT_TIMEOUT);
1276+
fail_if(conn_opt.has_value());
1277+
1278+
/* Close all connections used only by the case. */
1279+
client.close(conn1);
1280+
client.close(conn2);
1281+
client.close(conn3);
1282+
12421283
client.close(conn);
12431284
}
12441285

0 commit comments

Comments
 (0)