Skip to content

Commit 8d24ec7

Browse files
committed
[State Sync] Add backpressure to fast sync receiver.
1 parent ea17e7e commit 8d24ec7

File tree

6 files changed

+27
-15
lines changed

6 files changed

+27
-15
lines changed

config/src/config/state_sync_config.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl Default for StateSyncDriverConfig {
136136
max_connection_deadline_secs: 10,
137137
max_consecutive_stream_notifications: 10,
138138
max_num_stream_timeouts: 12,
139-
max_pending_data_chunks: 100,
139+
max_pending_data_chunks: 50,
140140
max_pending_mempool_notifications: 100,
141141
max_stream_wait_time_ms: 5000,
142142
num_versions_to_skip_snapshot_sync: 100_000_000, // At 5k TPS, this allows a node to fail for about 6 hours.
@@ -219,9 +219,7 @@ pub struct DataStreamingServiceConfig {
219219
/// Maximum number of concurrent data client requests (per stream) for state keys/values.
220220
pub max_concurrent_state_requests: u64,
221221

222-
/// Maximum channel sizes for each data stream listener. If messages are not
223-
/// consumed, they will be dropped (oldest messages first). The remaining
224-
/// messages will be retrieved using FIFO ordering.
222+
/// Maximum channel sizes for each data stream listener (per stream).
225223
pub max_data_stream_channel_sizes: u64,
226224

227225
/// Maximum number of notification ID to response context mappings held in
@@ -256,7 +254,7 @@ impl Default for DataStreamingServiceConfig {
256254
global_summary_refresh_interval_ms: 50,
257255
max_concurrent_requests: MAX_CONCURRENT_REQUESTS,
258256
max_concurrent_state_requests: MAX_CONCURRENT_STATE_REQUESTS,
259-
max_data_stream_channel_sizes: 300,
257+
max_data_stream_channel_sizes: 50,
260258
max_notification_id_mappings: 300,
261259
max_num_consecutive_subscriptions: 40, // At ~4 blocks per second, this should last 10 seconds
262260
max_pending_requests: 50,
@@ -297,7 +295,7 @@ impl Default for DynamicPrefetchingConfig {
297295
Self {
298296
enable_dynamic_prefetching: true,
299297
initial_prefetching_value: 3,
300-
max_prefetching_value: 50,
298+
max_prefetching_value: 30,
301299
min_prefetching_value: 3,
302300
prefetching_value_increase: 1,
303301
prefetching_value_decrease: 2,

state-sync/data-streaming-service/src/data_stream.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,8 +393,11 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
393393
pending_client_response
394394
}
395395

396-
// TODO(joshlind): this function shouldn't be blocking when trying to send! If there are
397-
// multiple streams, a single blocked stream could cause them all to block.
396+
// TODO(joshlind): this function shouldn't be blocking when trying to send.
397+
// If there are multiple streams, a single blocked stream could cause them
398+
// all to block. This is acceptable for now (because there is only ever
399+
// a single stream in use by the driver) but it should be fixed if we want
400+
// to generalize this for multiple streams.
398401
async fn send_data_notification(
399402
&mut self,
400403
data_notification: DataNotification,

state-sync/state-sync-driver/src/bootstrapper.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,7 @@ impl<
10301030
if let Err(error) = self
10311031
.storage_synchronizer
10321032
.save_state_values(notification_id, state_value_chunk_with_proof)
1033+
.await
10331034
{
10341035
self.reset_active_stream(Some(NotificationAndFeedback::new(
10351036
notification_id,

state-sync/state-sync-driver/src/storage_synchronizer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ pub trait StorageSynchronizerInterface {
9696
///
9797
/// Note: this requires that `initialize_state_synchronizer` has been
9898
/// called.
99-
fn save_state_values(
99+
async fn save_state_values(
100100
&mut self,
101101
notification_id: NotificationId,
102102
state_value_chunk_with_proof: StateValueChunkWithProof,
@@ -403,17 +403,20 @@ impl<
403403
load_pending_data_chunks(self.pending_data_chunks.clone()) > 0
404404
}
405405

406-
fn save_state_values(
406+
async fn save_state_values(
407407
&mut self,
408408
notification_id: NotificationId,
409409
state_value_chunk_with_proof: StateValueChunkWithProof,
410410
) -> Result<(), Error> {
411+
// Get the snapshot notifier and create the storage data chunk
411412
let state_snapshot_notifier = self.state_snapshot_notifier.as_mut().ok_or_else(|| {
412413
Error::UnexpectedError("The state snapshot receiver has not been initialized!".into())
413414
})?;
414415
let storage_data_chunk =
415416
StorageDataChunk::States(notification_id, state_value_chunk_with_proof);
416-
if let Err(error) = state_snapshot_notifier.try_send(storage_data_chunk) {
417+
418+
// Notify the snapshot receiver of the storage data chunk
419+
if let Err(error) = state_snapshot_notifier.send(storage_data_chunk).await {
417420
Err(Error::UnexpectedError(format!(
418421
"Failed to send storage data chunk to state snapshot listener: {:?}",
419422
error

state-sync/state-sync-driver/src/tests/mocks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ mock! {
475475

476476
fn pending_storage_data(&self) -> bool;
477477

478-
fn save_state_values(
478+
async fn save_state_values(
479479
&mut self,
480480
notification_id: NotificationId,
481481
state_value_chunk_with_proof: StateValueChunkWithProof,

state-sync/state-sync-driver/src/tests/storage_synchronizer.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -750,9 +750,11 @@ async fn test_save_states_completion() {
750750
// Save multiple state chunks (including the last chunk)
751751
storage_synchronizer
752752
.save_state_values(0, create_state_value_chunk_with_proof(false))
753+
.await
753754
.unwrap();
754755
storage_synchronizer
755756
.save_state_values(1, create_state_value_chunk_with_proof(true))
757+
.await
756758
.unwrap();
757759

758760
// Verify we get a commit notification
@@ -808,6 +810,7 @@ async fn test_save_states_dropped_error_listener() {
808810
let notification_id = 0;
809811
storage_synchronizer
810812
.save_state_values(notification_id, create_state_value_chunk_with_proof(true))
813+
.await
811814
.unwrap();
812815

813816
// The handler should panic as the commit listener was dropped
@@ -849,13 +852,14 @@ async fn test_save_states_invalid_chunk() {
849852
let notification_id = 0;
850853
storage_synchronizer
851854
.save_state_values(notification_id, create_state_value_chunk_with_proof(false))
855+
.await
852856
.unwrap();
853857
verify_error_notification(&mut error_listener, notification_id).await;
854858
}
855859

856-
#[test]
860+
#[tokio::test]
857861
#[should_panic]
858-
fn test_save_states_without_initialize() {
862+
async fn test_save_states_without_initialize() {
859863
// Create the storage synchronizer
860864
let (_, _, _, _, _, mut storage_synchronizer, _) = create_storage_synchronizer(
861865
create_mock_executor(),
@@ -864,7 +868,10 @@ fn test_save_states_without_initialize() {
864868

865869
// Attempting to save the states should panic as the state
866870
// synchronizer was not initialized!
867-
let _ = storage_synchronizer.save_state_values(0, create_state_value_chunk_with_proof(false));
871+
storage_synchronizer
872+
.save_state_values(0, create_state_value_chunk_with_proof(false))
873+
.await
874+
.unwrap();
868875
}
869876

870877
/// Creates a storage synchronizer for testing

0 commit comments

Comments
 (0)