@@ -89,14 +89,17 @@ class Connector
8989 std::set<Connection<BUFFER, NetProvider>> m_ReadyToSend;
9090 void close (Connection<BUFFER, NetProvider> &conn);
9191 void close (ConnectionImpl<BUFFER, NetProvider> &conn);
92+
9293private:
9394 /* *
9495 * A helper to decode responses of a connection.
9596 * Can be called when the connection is not ready to decode - it's just no-op.
97+ * If `result` is not `nullptr`, it is used to return response for a request with
98+ * `req_sync` sync. If `result` is `nullptr` - `req_sync` is ignored.
9699 * Returns -1 in the case of any error, 0 on success.
97100 */
98- int connectionDecodeResponses (Connection<BUFFER, NetProvider> &conn,
99- Response<BUFFER> *result);
101+ int connectionDecodeResponses (Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result);
102+
100103private:
101104 NetProvider m_NetProvider;
102105 /* *
@@ -170,7 +173,7 @@ Connector<BUFFER, NetProvider>::close(ConnectionImpl<BUFFER, NetProvider> &conn)
170173
171174template <class BUFFER , class NetProvider >
172175int
173- Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
176+ Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync,
174177 Response<BUFFER> *result)
175178{
176179 if (!hasDataToDecode (conn))
@@ -181,7 +184,7 @@ Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, Net
181184
182185 int rc = 0 ;
183186 while (hasDataToDecode (conn)) {
184- DecodeStatus status = processResponse (conn, result);
187+ DecodeStatus status = processResponse (conn, req_sync, result);
185188 if (status == DECODE_ERR) {
186189 rc = -1 ;
187190 break ;
@@ -211,9 +214,10 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
211214 Timer timer{timeout};
212215 timer.start ();
213216 static constexpr int INVALID_SYNC = -1 ;
217+ int req_sync = static_cast <int >(future);
214218 if (result != NULL )
215219 result->header .sync = INVALID_SYNC;
216- if (connectionDecodeResponses (conn, result) != 0 )
220+ if (connectionDecodeResponses (conn, req_sync, result) != 0 )
217221 return -1 ;
218222 if (result != NULL && result->header .sync != INVALID_SYNC) {
219223 LOG_DEBUG (" Future " , future, " is ready and decoded" );
@@ -225,7 +229,7 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
225229 strerror (errno), errno);
226230 return -1 ;
227231 }
228- if (connectionDecodeResponses (conn, result) != 0 )
232+ if (connectionDecodeResponses (conn, req_sync, result) != 0 )
229233 return -1 ;
230234 if (result != NULL && result->header .sync != INVALID_SYNC) {
231235 LOG_DEBUG (" Future " , future, " is ready and decoded" );
@@ -264,7 +268,7 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
264268 strerror (errno), errno);
265269 return -1 ;
266270 }
267- if (connectionDecodeResponses (conn, nullptr ) != 0 )
271+ if (connectionDecodeResponses (conn, 0 , nullptr ) != 0 )
268272 return -1 ;
269273 bool finish = true ;
270274 for (size_t i = last_not_ready; i < futures.size (); ++i) {
@@ -304,7 +308,7 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
304308 }
305309 Connection<BUFFER, NetProvider> conn = *m_ReadyToDecode.begin ();
306310 assert (hasDataToDecode (conn));
307- if (connectionDecodeResponses (conn, nullptr ) != 0 )
311+ if (connectionDecodeResponses (conn, 0 , nullptr ) != 0 )
308312 return std::nullopt ;
309313 return conn;
310314}
@@ -323,7 +327,7 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
323327 strerror (errno), errno);
324328 return -1 ;
325329 }
326- if (connectionDecodeResponses (conn, nullptr ) != 0 )
330+ if (connectionDecodeResponses (conn, 0 , nullptr ) != 0 )
327331 return -1 ;
328332 if ((conn.getFutureCount () - ready_futures) >= future_count)
329333 return 0 ;
0 commit comments