@@ -13,6 +13,7 @@ use crate::{
1313 TransactionOutputsWithProofRequest , TransactionsOrOutputsWithProofRequest ,
1414 TransactionsWithProofRequest ,
1515 } ,
16+ dynamic_prefetching:: DynamicPrefetchingState ,
1617 error:: Error ,
1718 logging:: { LogEntry , LogEvent , LogSchema } ,
1819 metrics,
@@ -119,6 +120,9 @@ pub struct DataStream<T> {
119120
120121 // The time service to track elapsed time (e.g., during stream lag checks)
121122 time_service : TimeService ,
123+
124+ // The dynamic prefetching state (if enabled)
125+ dynamic_prefetching_state : DynamicPrefetchingState ,
122126}
123127
124128impl < T : AptosDataClientInterface + Send + Clone + ' static > DataStream < T > {
@@ -141,6 +145,10 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
141145 // Create a new stream engine
142146 let stream_engine = StreamEngine :: new ( data_stream_config, stream_request, advertised_data) ?;
143147
148+ // Create the dynamic prefetching state
149+ let dynamic_prefetching_state =
150+ DynamicPrefetchingState :: new ( data_stream_config, time_service. clone ( ) ) ;
151+
144152 // Create a new data stream
145153 let data_stream = Self {
146154 data_client_config,
@@ -159,6 +167,7 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
159167 send_failure : false ,
160168 subscription_stream_lag : None ,
161169 time_service,
170+ dynamic_prefetching_state,
162171 } ;
163172
164173 Ok ( ( data_stream, data_stream_listener) )
@@ -255,17 +264,6 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
255264 Ok ( ( ) )
256265 }
257266
258- /// Returns the maximum number of concurrent requests that can be executing
259- /// at any given time.
260- fn get_max_concurrent_requests ( & self ) -> u64 {
261- match self . stream_engine {
262- StreamEngine :: StateStreamEngine ( _) => {
263- self . streaming_service_config . max_concurrent_state_requests
264- } ,
265- _ => self . streaming_service_config . max_concurrent_requests ,
266- }
267- }
268-
269267 /// Creates and sends a batch of aptos data client requests to the network
270268 fn create_and_send_client_requests (
271269 & mut self ,
@@ -285,7 +283,8 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
285283 // Otherwise, calculate the max number of requests to send based on
286284 // the max concurrent requests and the number of pending request slots.
287285 let remaining_concurrent_requests = self
288- . get_max_concurrent_requests ( )
286+ . dynamic_prefetching_state
287+ . get_max_concurrent_requests ( & self . stream_engine )
289288 . saturating_sub ( num_in_flight_requests) ;
290289 let remaining_request_slots = max_pending_requests. saturating_sub ( num_pending_requests) ;
291290 min ( remaining_concurrent_requests, remaining_request_slots)
@@ -453,85 +452,89 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
453452 return Ok ( ( ) ) ; // There's nothing left to do
454453 }
455454
456- // Process any ready data responses
457- for _ in 0 ..self . get_num_pending_data_requests ( ) ? {
458- if let Some ( pending_response) = self . pop_pending_response_queue ( ) ? {
459- // Get the client request and response information
460- let maybe_client_response = pending_response. lock ( ) . client_response . take ( ) ;
461- let client_response = maybe_client_response. ok_or_else ( || {
462- Error :: UnexpectedErrorEncountered ( "The client response should be ready!" . into ( ) )
463- } ) ?;
464- let client_request = & pending_response. lock ( ) . client_request . clone ( ) ;
465-
466- // Process the client response
467- match client_response {
468- Ok ( client_response) => {
469- // Sanity check and process the response
470- if sanity_check_client_response_type ( client_request, & client_response) {
471- // The response is valid, send the data notification to the client
472- let client_response_payload = client_response. payload . clone ( ) ;
473- self . send_data_notification_to_client ( client_request, client_response)
474- . await ?;
475-
476- // If the response wasn't enough to satisfy the original request (e.g.,
477- // it was truncated), missing data should be requested.
478- match self
479- . request_missing_data ( client_request, & client_response_payload)
480- {
481- Ok ( missing_data_requested) => {
482- if missing_data_requested {
483- break ; // We're now head of line blocked on the missing data
484- }
485- } ,
486- Err ( error) => {
487- warn ! ( LogSchema :: new( LogEntry :: ReceivedDataResponse )
488- . stream_id( self . data_stream_id)
489- . event( LogEvent :: Error )
490- . error( & error)
491- . message(
492- "Failed to determine if missing data was requested!"
493- ) ) ;
494- } ,
495- }
496-
497- // If the request was a subscription request and the subscription
498- // stream is lagging behind the data advertisements, the stream
499- // engine should be notified (e.g., so that it can catch up).
500- if client_request. is_subscription_request ( ) {
501- if let Err ( error) = self . check_subscription_stream_lag (
502- & global_data_summary,
503- & client_response_payload,
504- ) {
505- self . notify_new_data_request_error ( client_request, error) ?;
506- break ; // We're now head of line blocked on the failed stream
455+ // Continuously process any ready data responses
456+ while let Some ( pending_response) = self . pop_pending_response_queue ( ) ? {
457+ // Get the client request and response information
458+ let maybe_client_response = pending_response. lock ( ) . client_response . take ( ) ;
459+ let client_response = maybe_client_response. ok_or_else ( || {
460+ Error :: UnexpectedErrorEncountered ( "The client response should be ready!" . into ( ) )
461+ } ) ?;
462+ let client_request = & pending_response. lock ( ) . client_request . clone ( ) ;
463+
464+ // Process the client response
465+ match client_response {
466+ Ok ( client_response) => {
467+ // Sanity check and process the response
468+ if sanity_check_client_response_type ( client_request, & client_response) {
469+ // If the response wasn't enough to satisfy the original request (e.g.,
470+ // it was truncated), missing data should be requested.
471+ let mut head_of_line_blocked = false ;
472+ match self . request_missing_data ( client_request, & client_response. payload ) {
473+ Ok ( missing_data_requested) => {
474+ if missing_data_requested {
475+ head_of_line_blocked = true ; // We're now head of line blocked on the missing data
507476 }
477+ } ,
478+ Err ( error) => {
479+ warn ! ( LogSchema :: new( LogEntry :: ReceivedDataResponse )
480+ . stream_id( self . data_stream_id)
481+ . event( LogEvent :: Error )
482+ . error( & error)
483+ . message( "Failed to determine if missing data was requested!" ) ) ;
484+ } ,
485+ }
486+
487+ // If the request was a subscription request and the subscription
488+ // stream is lagging behind the data advertisements, the stream
489+ // engine should be notified (e.g., so that it can catch up).
490+ if client_request. is_subscription_request ( ) {
491+ if let Err ( error) = self . check_subscription_stream_lag (
492+ & global_data_summary,
493+ & client_response. payload ,
494+ ) {
495+ self . notify_new_data_request_error ( client_request, error) ?;
496+ head_of_line_blocked = true ; // We're now head of line blocked on the failed stream
508497 }
509- } else {
510- // The sanity check failed
511- self . handle_sanity_check_failure (
512- client_request,
513- & client_response. context ,
514- ) ?;
515- break ; // We're now head of line blocked on the failed request
516498 }
517- } ,
518- Err ( error) => {
519- // Handle the error depending on the request type
520- if client_request. is_subscription_request ( )
521- || client_request. is_optimistic_fetch_request ( )
522- {
523- // The request was for new data. We should notify the
524- // stream engine and clear the requests queue.
525- self . notify_new_data_request_error ( client_request, error) ?;
526- } else {
527- // Otherwise, we should handle the error and simply retry
528- self . handle_data_client_error ( client_request, & error) ?;
499+
500+ // The response is valid, send the data notification to the client
501+ self . send_data_notification_to_client ( client_request, client_response)
502+ . await ?;
503+
504+ // If the request is for specific data, increase the prefetching limit.
505+ // Note: we don't increase the limit for new data requests because
506+ // those don't invoke the prefetcher (as we're already up-to-date).
507+ if !client_request. is_new_data_request ( ) {
508+ self . dynamic_prefetching_state
509+ . increase_max_concurrent_requests ( ) ;
529510 }
511+
512+ // If we're head of line blocked, we should return early
513+ if head_of_line_blocked {
514+ break ;
515+ }
516+ } else {
517+ // The sanity check failed
518+ self . handle_sanity_check_failure ( client_request, & client_response. context ) ?;
530519 break ; // We're now head of line blocked on the failed request
531- } ,
532- }
533- } else {
534- break ; // The first response hasn't arrived yet
520+ }
521+ } ,
522+ Err ( error) => {
523+ // Handle the error depending on the request type
524+ if client_request. is_new_data_request ( ) {
525+ // The request was for new data. We should notify the
526+ // stream engine and clear the requests queue.
527+ self . notify_new_data_request_error ( client_request, error) ?;
528+ } else {
529+ // Decrease the prefetching limit on an error
530+ self . dynamic_prefetching_state
531+ . decrease_max_concurrent_requests ( ) ;
532+
533+ // Handle the error and simply retry
534+ self . handle_data_client_error ( client_request, & error) ?;
535+ }
536+ break ; // We're now head of line blocked on the failed request
537+ } ,
535538 }
536539 }
537540
@@ -705,6 +708,7 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
705708 data_client_request : & DataClientRequest ,
706709 data_client_error : & aptos_data_client:: error:: Error ,
707710 ) -> Result < ( ) , Error > {
711+ // Log the error
708712 warn ! ( LogSchema :: new( LogEntry :: ReceivedDataResponse )
709713 . stream_id( self . data_stream_id)
710714 . event( LogEvent :: Error )
0 commit comments