@@ -263,14 +263,25 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
263263 & mut self ,
264264 global_data_summary : & GlobalDataSummary ,
265265 ) -> Result < ( ) , Error > {
266- // Determine how many requests (at most) can be sent to the network
267- let num_sent_requests = self . get_sent_data_requests ( ) ?. len ( ) as u64 ;
268- let max_concurrent_requests = self . get_max_concurrent_requests ( ) ;
269- let max_num_requests_to_send = max_concurrent_requests
270- . checked_sub ( num_sent_requests)
271- . ok_or_else ( || {
272- Error :: IntegerOverflow ( "Max number of requests to send has overflown!" . into ( ) )
273- } ) ?;
266+ // Calculate the number of in-flight requests (i.e., requests that haven't completed)
267+ let num_pending_requests = self . get_num_pending_data_requests ( ) ?;
268+ let num_complete_pending_requests = self . get_num_complete_pending_requests ( ) ?;
269+ let num_in_flight_requests =
270+ num_pending_requests. saturating_sub ( num_complete_pending_requests) ;
271+
272+ // Calculate the max number of requests that can be sent now
273+ let max_pending_requests = self . streaming_service_config . max_pending_requests ;
274+ let max_num_requests_to_send = if num_pending_requests >= max_pending_requests {
275+ 0 // We're already at the max number of pending requests (don't do anything)
276+ } else {
277+ // Otherwise, calculate the max number of requests to send based on
278+ // the max concurrent requests and the number of pending request slots.
279+ let remaining_concurrent_requests = self
280+ . get_max_concurrent_requests ( )
281+ . saturating_sub ( num_in_flight_requests) ;
282+ let remaining_request_slots = max_pending_requests. saturating_sub ( num_pending_requests) ;
283+ min ( remaining_concurrent_requests, remaining_request_slots)
284+ } ;
274285
275286 // Send the client requests
276287 if max_num_requests_to_send > 0 {
@@ -303,8 +314,9 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
303314 ) ;
304315 }
305316
306- // Update the counters for the pending response queue
307- metrics:: set_pending_data_responses ( self . get_sent_data_requests ( ) ?. len ( ) ) ;
317+ // Update the counters for the complete and pending responses
318+ metrics:: set_complete_pending_data_responses ( num_complete_pending_requests) ;
319+ metrics:: set_pending_data_responses ( self . get_num_pending_data_requests ( ) ?) ;
308320
309321 Ok ( ( ) )
310322 }
@@ -435,7 +447,7 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
435447 }
436448
437449 // Process any ready data responses
438- for _ in 0 ..self . get_max_concurrent_requests ( ) {
450+ for _ in 0 ..self . get_num_pending_data_requests ( ) ? {
439451 if let Some ( pending_response) = self . pop_pending_response_queue ( ) ? {
440452 // Get the client request and response information
441453 let maybe_client_response = pending_response. lock ( ) . client_response . take ( ) ;
@@ -851,6 +863,28 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
851863 Ok ( ( ) )
852864 }
853865
866+ /// Returns the number of pending requests in the sent data requests queue
867+ /// that have already completed (i.e., are no longer in-flight).
868+ fn get_num_complete_pending_requests ( & mut self ) -> Result < u64 , Error > {
869+ let mut num_complete_pending_requests = 0 ;
870+ for sent_data_request in self . get_sent_data_requests ( ) ? {
871+ if let Some ( client_response) = sent_data_request. lock ( ) . client_response . as_ref ( ) {
872+ if client_response. is_ok ( ) {
873+ // Only count successful responses as complete. Failures will be retried
874+ num_complete_pending_requests += 1 ;
875+ }
876+ }
877+ }
878+ Ok ( num_complete_pending_requests)
879+ }
880+
881+ /// Returns the number of pending requests in the sent data requests queue
882+ fn get_num_pending_data_requests ( & mut self ) -> Result < u64 , Error > {
883+ let pending_data_requests = self . get_sent_data_requests ( ) ?;
884+ let num_pending_data_requests = pending_data_requests. len ( ) as u64 ;
885+ Ok ( num_pending_data_requests)
886+ }
887+
854888 /// Assumes the caller has already verified that `sent_data_requests` has
855889 /// been initialized.
856890 fn get_sent_data_requests ( & mut self ) -> Result < & mut VecDeque < PendingClientResponse > , Error > {
0 commit comments