Skip to content

Commit 568095a

Browse files
committed
client: always erase decoded connections from corresponding set
Connector uses a special helper for decoding responses of ready-to-decode connections and then it erases them from `m_ReadyToDecode` set by himself. That's not a good design and it can lead to various bugs (there is already a bug, actually) because it's easy to forgot to erase a decoded connection. So let's move the erasure right to the `connectionDecodeResponses` helper. Note that the helper must be turned into a method of `Connector` since it needs to use its fields. The above-mentioned bug happens because we decode responses in `Connector::wait` right before the main loop but we forgot to remove the connection from `m_ReadyToDecode` set there. Closes #124
1 parent b89ebc6 commit 568095a

File tree

2 files changed

+85
-36
lines changed

2 files changed

+85
-36
lines changed

src/Client/Connector.hpp

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,23 @@ class Connector
8989
std::set<Connection<BUFFER, NetProvider>> m_ReadyToSend;
9090
void close(Connection<BUFFER, NetProvider> &conn);
9191
void close(ConnectionImpl<BUFFER, NetProvider> &conn);
92+
93+
private:
94+
/**
95+
* A helper to decode responses of a connection.
96+
* Can be called when the connection is not ready to decode - it's just no-op.
97+
* Returns -1 in the case of any error, 0 on success.
98+
*/
99+
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
100+
Response<BUFFER> *result);
101+
92102
private:
93103
NetProvider m_NetProvider;
104+
/**
105+
* Set of connections that are ready to decode.
106+
* Shouldn't be modified directly - is managed by methods `readyToDecode`
107+
* and `connectionDecodeResponses`.
108+
*/
94109
std::set<Connection<BUFFER, NetProvider>> m_ReadyToDecode;
95110
};
96111

@@ -157,21 +172,35 @@ Connector<BUFFER, NetProvider>::close(ConnectionImpl<BUFFER, NetProvider> &conn)
157172

158173
template<class BUFFER, class NetProvider>
159174
int
160-
connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
161-
Response<BUFFER> *result)
175+
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
176+
Response<BUFFER> *result)
162177
{
178+
if (!hasDataToDecode(conn))
179+
return 0;
180+
181+
/* Ready to decode connection must be in the corresponding set. */
182+
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
183+
184+
int rc = 0;
163185
while (hasDataToDecode(conn)) {
164-
DecodeStatus rc = processResponse(conn, result);
165-
if (rc == DECODE_ERR)
166-
return -1;
186+
DecodeStatus status = processResponse(conn, result);
187+
if (status == DECODE_ERR) {
188+
rc = -1;
189+
break;
190+
}
167191
//In case we've received only a part of response
168192
//we should wait until the rest arrives - otherwise
169193
//we can't properly decode response. */
170-
if (rc == DECODE_NEEDMORE)
171-
return 0;
172-
assert(rc == DECODE_SUCC);
194+
if (status == DECODE_NEEDMORE) {
195+
rc = 0;
196+
break;
197+
}
198+
assert(status == DECODE_SUCC);
173199
}
174-
return 0;
200+
/* A connection that has no data to decode must not be left in the set. */
201+
if (!hasDataToDecode(conn))
202+
m_ReadyToDecode.erase(conn);
203+
return rc;
175204
}
176205

177206
template<class BUFFER, class NetProvider>
@@ -191,17 +220,8 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
191220
strerror(errno), errno);
192221
return -1;
193222
}
194-
if (hasDataToDecode(conn)) {
195-
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
196-
if (connectionDecodeResponses(conn, result) != 0)
197-
return -1;
198-
/*
199-
* In case we've handled whole data in input buffer -
200-
* mark connection as completed.
201-
*/
202-
if (!hasDataToDecode(conn))
203-
m_ReadyToDecode.erase(conn);
204-
}
223+
if (connectionDecodeResponses(conn, result) != 0)
224+
return -1;
205225
if (timer.isExpired())
206226
break;
207227
}
@@ -233,13 +253,8 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
233253
strerror(errno), errno);
234254
return -1;
235255
}
236-
if (hasDataToDecode(conn)) {
237-
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
238-
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
239-
return -1;
240-
if (!hasDataToDecode(conn))
241-
m_ReadyToDecode.erase(conn);
242-
}
256+
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
257+
return -1;
243258
bool finish = true;
244259
for (size_t i = last_not_ready; i < futures.size(); ++i) {
245260
if (!conn.futureIsReady(futures[i])) {
@@ -280,8 +295,6 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
280295
assert(hasDataToDecode(conn));
281296
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
282297
return std::nullopt;
283-
if (!hasDataToDecode(conn))
284-
m_ReadyToDecode.erase(conn);
285298
return conn;
286299
}
287300

@@ -299,13 +312,8 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
299312
strerror(errno), errno);
300313
return -1;
301314
}
302-
if (hasDataToDecode(conn)) {
303-
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
304-
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
305-
return -1;
306-
if (!hasDataToDecode(conn))
307-
m_ReadyToDecode.erase(conn);
308-
}
315+
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
316+
return -1;
309317
if ((conn.getFutureCount() - ready_futures) >= future_count)
310318
return 0;
311319
if (timer.isExpired())

test/ClientTest.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,6 +1239,47 @@ test_wait(Connector<BUFFER, NetProvider> &client)
12391239
response = conn.getResponse(f);
12401240
fail_unless(response.has_value());
12411241

1242+
TEST_CASE("waitAny after several waits (gh-124)");
1243+
Connection<Buf_t, NetProvider> conn1(client);
1244+
Connection<Buf_t, NetProvider> conn2(client);
1245+
Connection<Buf_t, NetProvider> conn3(client);
1246+
rc = test_connect(client, conn1, localhost, port);
1247+
fail_unless(rc == 0);
1248+
rc = test_connect(client, conn2, localhost, port);
1249+
fail_unless(rc == 0);
1250+
rc = test_connect(client, conn3, localhost, port);
1251+
fail_unless(rc == 0);
1252+
rid_t f1 = conn1.ping();
1253+
rid_t f2 = conn2.ping();
1254+
rid_t f3 = conn3.ping();
1255+
1256+
/* Wait for all connections. */
1257+
fail_unless(client.wait(conn1, f1, WAIT_TIMEOUT) == 0);
1258+
fail_unless(conn1.futureIsReady(f1));
1259+
fail_unless(conn1.getResponse(f1).header.code == 0);
1260+
1261+
fail_unless(client.wait(conn2, f2, WAIT_TIMEOUT) == 0);
1262+
fail_unless(conn2.futureIsReady(f2));
1263+
fail_unless(conn2.getResponse(f2).header.code == 0);
1264+
1265+
fail_unless(client.wait(conn3, f3, WAIT_TIMEOUT) == 0);
1266+
fail_unless(conn3.futureIsReady(f3));
1267+
fail_unless(conn3.getResponse(f3).header.code == 0);
1268+
1269+
/*
1270+
* Wait any - we shouldn't get any of the connections here since we've
1271+
* received all the responses.
1272+
* Note that the connector used to crash here (gh-124) because some of the
1273+
* connnections still could appear in `m_ReadyToDecode` set.
1274+
*/
1275+
std::optional<Connection<Buf_t, NetProvider>> conn_opt = client.waitAny(WAIT_TIMEOUT);
1276+
fail_if(conn_opt.has_value());
1277+
1278+
/* Close all connections used only by the case. */
1279+
client.close(conn1);
1280+
client.close(conn2);
1281+
client.close(conn3);
1282+
12421283
client.close(conn);
12431284
}
12441285

0 commit comments

Comments
 (0)