@@ -100,6 +100,23 @@ class Connector
100100 */
101101 int connectionDecodeResponses (Connection<BUFFER, NetProvider> &conn, int req_sync = -1 ,
102102 Response<BUFFER> *result = nullptr );
103+ /* *
104+ * A helper to check the readiness of requested responses.
105+ * Decodes new responses from the connection and checks the readiness of the requested responses. `finish`
106+ * indicates that all the requested responses are ready, and `last_not_ready` is reused between consecutive
107+ * calls to this function.
108+ * Returns -1 in case of any error, 0 on success.
109+ */
110+ int connectionCheckResponsesReadiness (Connection<BUFFER, NetProvider> &conn, const std::vector<rid_t > &futures,
111+ size_t *last_not_ready, bool *finish);
112+ /* *
113+ * A helper to check the readiness of at least `future_count` responses. Decodes new responses from the
114+ * connection and checks that at least `future_count` responses are ready. `finish` indicates that at least
115+ * `future_count` responses are ready.
116+ * Returns -1 in case of any error, 0 on success.
117+ */
118+ int connectionCheckCountResponsesReadiness (Connection<BUFFER, NetProvider> &conn, size_t future_count,
119+ bool *finish);
103120
104121private:
105122 NetProvider m_NetProvider;
@@ -205,6 +222,36 @@ Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, Net
205222 return rc;
206223}
207224
225+ template <class BUFFER , class NetProvider >
226+ int
227+ Connector<BUFFER, NetProvider>::connectionCheckResponsesReadiness(Connection<BUFFER, NetProvider> &conn,
228+ const std::vector<rid_t > &futures,
229+ size_t *last_not_ready, bool *finish)
230+ {
231+ if (conn.hasError () || connectionDecodeResponses (conn) != 0 )
232+ return -1 ;
233+ *finish = true ;
234+ for (size_t i = *last_not_ready; i < futures.size (); ++i) {
235+ if (!conn.futureIsReady (futures[i])) {
236+ *finish = false ;
237+ *last_not_ready = i;
238+ break ;
239+ }
240+ }
241+ return 0 ;
242+ }
243+
244+ template <class BUFFER , class NetProvider >
245+ int
246+ Connector<BUFFER, NetProvider>::connectionCheckCountResponsesReadiness(Connection<BUFFER, NetProvider> &conn,
247+ size_t future_count, bool *finish)
248+ {
249+ if (conn.hasError () || connectionDecodeResponses (conn) != 0 )
250+ return -1 ;
251+ *finish = conn.getFutureCount () >= future_count;
252+ return 0 ;
253+ }
254+
208255template <class BUFFER , class NetProvider >
209256int
210257Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
@@ -271,16 +318,9 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
271318 strerror (errno), errno);
272319 return -1 ;
273320 }
274- if (connectionDecodeResponses (conn) != 0 )
321+ bool finish = false ;
322+ if (connectionCheckResponsesReadiness (conn, futures, &last_not_ready, &finish) != 0 )
275323 return -1 ;
276- bool finish = true ;
277- for (size_t i = last_not_ready; i < futures.size (); ++i) {
278- if (!conn.futureIsReady (futures[i])) {
279- finish = false ;
280- last_not_ready = i;
281- break ;
282- }
283- }
284324 if (finish)
285325 return 0 ;
286326 if (timer.isExpired ())
@@ -324,15 +364,17 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
324364 Timer timer{timeout};
325365 timer.start ();
326366 size_t ready_futures = conn.getFutureCount ();
367+ size_t expected_future_count = ready_futures + future_count;
327368 while (!conn.hasError ()) {
328369 if (m_NetProvider.wait (timer.timeLeft ()) != 0 ) {
329370 conn.setError (std::string (" Failed to poll: " ) +
330371 strerror (errno), errno);
331372 return -1 ;
332373 }
333- if (connectionDecodeResponses (conn) != 0 )
374+ bool finish = false ;
375+ if (connectionCheckCountResponsesReadiness (conn, expected_future_count, &finish) != 0 )
334376 return -1 ;
335- if ((conn. getFutureCount () - ready_futures) >= future_count )
377+ if (finish )
336378 return 0 ;
337379 if (timer.isExpired ())
338380 break ;
0 commit comments