Skip to content

Commit 151ea12

Browse files
design 2.1
1 parent 7aa671f commit 151ea12

File tree

1 file changed

+51
-99
lines changed
  • crates/apollo_central_sync/src

1 file changed

+51
-99
lines changed

crates/apollo_central_sync/src/lib.rs

Lines changed: 51 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub mod sources;
88
mod sync_test;
99

1010
use std::cmp::min;
11-
use std::collections::{BTreeMap, HashSet};
11+
use std::collections::BTreeMap;
1212
use std::pin::Pin;
1313
use std::sync::Arc;
1414
use std::time::{Duration, Instant};
@@ -285,15 +285,13 @@ pub struct GenericStateSync<
285285
// Batch queue: accumulates ready items from middle_queue before flushing
286286
// When this reaches config.block_batch_size, we flush all items in ONE DB transaction
287287
batch_queue: Vec<ProcessedBlockData>,
288-
// Deduplication sets: track items already in queue to prevent duplicate processing
289-
// This is critical for performance: streams re-fetch same data until DB markers update
290-
// Without dedup, ~90% of batch items would be duplicates!
291-
queued_blocks: HashSet<BlockNumber>,
292-
queued_state_diffs: HashSet<BlockNumber>,
293-
queued_compiled_classes: HashSet<ClassHash>,
294-
queued_base_layer_blocks: HashSet<BlockNumber>,
295-
// Counters for skipped duplicates (for logging/monitoring)
296-
duplicates_skipped: usize,
288+
// Queue markers: track the highest block/class already queued (in-flight)
289+
// Streams use max(db_marker, queue_marker) to know where to start fetching
290+
// This prevents duplicate requests to FGW while batch is being filled
291+
queue_header_marker: Arc<RwLock<BlockNumber>>,
292+
queue_state_marker: Arc<RwLock<BlockNumber>>,
293+
queue_compiled_class_marker: Arc<RwLock<BlockNumber>>,
294+
queue_base_layer_marker: Arc<RwLock<BlockNumber>>,
297295
}
298296

299297
pub type StateSyncResult = Result<(), StateSyncError>;
@@ -482,6 +480,7 @@ impl<
482480
self.config.collect_pending_data,
483481
PENDING_SLEEP_DURATION,
484482
self.config.blocks_max_stream_size,
483+
self.queue_header_marker.clone(),
485484
)
486485
.fuse();
487486
let state_diff_stream = stream_new_state_diffs(
@@ -490,6 +489,7 @@ impl<
490489
self.central_source.clone(),
491490
self.config.block_propagation_sleep_duration,
492491
self.config.state_updates_max_stream_size,
492+
self.queue_state_marker.clone(),
493493
)
494494
.fuse();
495495
let compiled_class_stream = stream_new_compiled_classes(
@@ -499,6 +499,7 @@ impl<
499499
// TODO(yair): separate config param.
500500
self.config.state_updates_max_stream_size,
501501
self.config.store_sierras_and_casms,
502+
self.queue_compiled_class_marker.clone(),
502503
)
503504
.fuse();
504505
let base_layer_block_stream = match &self.base_layer_source {
@@ -560,9 +561,11 @@ impl<
560561
let batch = std::mem::take(&mut self.batch_queue);
561562
self.flush_processed_batch(batch).await?;
562563

563-
// Clean up deduplication sets after successful flush
564-
// Remove entries that are now in DB (below current markers)
565-
self.cleanup_dedup_sets_after_flush().await?;
564+
// NOTE: We do NOT reset queue markers here!
565+
// Queue markers track what we've REQUESTED from FGW, not what we've written.
566+
// middle_queue may still have items compiling (e.g., state diffs 500-999)
567+
// If we reset markers, streams would re-fetch those items (duplicates!)
568+
// Queue markers only advance, never reset.
566569

567570
info!("Batch successfully flushed. Batch queue cleared.");
568571
}
@@ -606,16 +609,12 @@ impl<
606609
match sync_event {
607610
SyncEvent::BlockAvailable { block_number, block, signature } => {
608611
if self.config.enable_block_batching {
609-
// DEDUPLICATION: Skip if already in queue (streams re-fetch until DB markers update)
610-
if self.queued_blocks.contains(&block_number) {
611-
self.duplicates_skipped += 1;
612-
trace!("Skipping duplicate block {} (already in queue)", block_number);
613-
return Ok(());
612+
// Update queue marker to track what's in-flight
613+
{
614+
let mut marker = self.queue_header_marker.write().await;
615+
*marker = (*marker).max(block_number.unchecked_next());
614616
}
615617

616-
// Mark as queued BEFORE adding to prevent races
617-
self.queued_blocks.insert(block_number);
618-
619618
// Create an immediate future that returns the Block data
620619
// No async work needed - just wrap the data
621620
let block_future: ProcessingTask = Box::pin(async move {
@@ -649,16 +648,12 @@ impl<
649648
deployed_contract_class_definitions,
650649
} => {
651650
if self.config.enable_block_batching {
652-
// DEDUPLICATION: Skip if already in queue (streams re-fetch until DB markers update)
653-
if self.queued_state_diffs.contains(&block_number) {
654-
self.duplicates_skipped += 1;
655-
trace!("Skipping duplicate state diff {} (already in queue)", block_number);
656-
return Ok(());
651+
// Update queue marker to track what's in-flight
652+
{
653+
let mut marker = self.queue_state_marker.write().await;
654+
*marker = (*marker).max(block_number.unchecked_next());
657655
}
658656

659-
// Mark as queued BEFORE adding to prevent races
660-
self.queued_state_diffs.insert(block_number);
661-
662657
// Create an async future that processes (compiles) the state diff
663658
let class_manager_client = self.class_manager_client.clone();
664659
let reader = self.reader.clone();
@@ -716,16 +711,6 @@ impl<
716711
is_compiler_backward_compatible,
717712
} => {
718713
if self.config.enable_block_batching {
719-
// DEDUPLICATION: Skip if already in queue (streams re-fetch until DB markers update)
720-
if self.queued_compiled_classes.contains(&class_hash) {
721-
self.duplicates_skipped += 1;
722-
trace!("Skipping duplicate compiled class {:?} (already in queue)", class_hash);
723-
return Ok(());
724-
}
725-
726-
// Mark as queued BEFORE adding to prevent races
727-
self.queued_compiled_classes.insert(class_hash);
728-
729714
// Create immediate future for compiled class (no compilation needed)
730715
let class_future: ProcessingTask = Box::pin(async move {
731716
Ok(ProcessedBlockData::CompiledClass {
@@ -760,16 +745,12 @@ impl<
760745
}
761746
SyncEvent::NewBaseLayerBlock { block_number, block_hash } => {
762747
if self.config.enable_block_batching {
763-
// DEDUPLICATION: Skip if already in queue (streams re-fetch until DB markers update)
764-
if self.queued_base_layer_blocks.contains(&block_number) {
765-
self.duplicates_skipped += 1;
766-
trace!("Skipping duplicate base layer block {} (already in queue)", block_number);
767-
return Ok(());
748+
// Update queue marker to track what's in-flight
749+
{
750+
let mut marker = self.queue_base_layer_marker.write().await;
751+
*marker = (*marker).max(block_number.unchecked_next());
768752
}
769753

770-
// Mark as queued BEFORE adding to prevent races
771-
self.queued_base_layer_blocks.insert(block_number);
772-
773754
// Create immediate future for base layer block (no compilation needed)
774755
let base_layer_future: ProcessingTask = Box::pin(async move {
775756
Ok(ProcessedBlockData::BaseLayerBlock {
@@ -1326,50 +1307,6 @@ impl<
13261307
Ok(())
13271308
}
13281309

1329-
/// Cleans up deduplication sets after a successful flush.
1330-
/// Removes entries that are now below DB markers (already written).
1331-
/// Keeps entries that are still pending in the queue.
1332-
async fn cleanup_dedup_sets_after_flush(&mut self) -> StateSyncResult {
1333-
let txn = self.reader.begin_ro_txn()?;
1334-
let header_marker = txn.get_header_marker()?;
1335-
let state_marker = txn.get_state_marker()?;
1336-
let compiled_class_marker = txn.get_compiled_class_marker()?;
1337-
let base_layer_marker = txn.get_base_layer_block_marker()?;
1338-
drop(txn);
1339-
1340-
// Remove blocks that have been written (block_number < header_marker)
1341-
let before_blocks = self.queued_blocks.len();
1342-
self.queued_blocks.retain(|&bn| bn >= header_marker);
1343-
1344-
// Remove state diffs that have been written (block_number < state_marker)
1345-
let before_state_diffs = self.queued_state_diffs.len();
1346-
self.queued_state_diffs.retain(|&bn| bn >= state_marker);
1347-
1348-
// For compiled classes, we clear all since we don't have a simple way to track by block
1349-
// They're already deduplicated by class_hash in the flush
1350-
let before_compiled = self.queued_compiled_classes.len();
1351-
self.queued_compiled_classes.clear();
1352-
1353-
// Remove base layer blocks that have been written
1354-
let before_base_layer = self.queued_base_layer_blocks.len();
1355-
self.queued_base_layer_blocks.retain(|&bn| bn >= base_layer_marker);
1356-
1357-
info!(
1358-
"DEDUP_STATS: Skipped {} duplicates this batch. Cleanup: blocks {}->{}, state_diffs {}->{}, compiled {}->{}, base_layer {}->{}. Markers: header={}, state={}, casm={}, base={}",
1359-
self.duplicates_skipped,
1360-
before_blocks, self.queued_blocks.len(),
1361-
before_state_diffs, self.queued_state_diffs.len(),
1362-
before_compiled, self.queued_compiled_classes.len(),
1363-
before_base_layer, self.queued_base_layer_blocks.len(),
1364-
header_marker, state_marker, compiled_class_marker, base_layer_marker
1365-
);
1366-
1367-
// Reset counter for next batch
1368-
self.duplicates_skipped = 0;
1369-
1370-
Ok(())
1371-
}
1372-
13731310
// Helper to store already-processed state diff
13741311
async fn store_processed_state_diff(
13751312
&mut self,
@@ -1790,10 +1727,14 @@ fn stream_new_blocks<
17901727
collect_pending_data: bool,
17911728
pending_sleep_duration: Duration,
17921729
max_stream_size: u32,
1730+
queue_header_marker: Arc<RwLock<BlockNumber>>,
17931731
) -> impl Stream<Item = Result<SyncEvent, StateSyncError>> {
17941732
try_stream! {
17951733
loop {
1796-
let header_marker = reader.begin_ro_txn()?.get_header_marker()?;
1734+
let db_header_marker = reader.begin_ro_txn()?.get_header_marker()?;
1735+
let queue_marker = *queue_header_marker.read().await;
1736+
// Use max of DB marker and queue marker to skip blocks already in-flight
1737+
let header_marker = db_header_marker.max(queue_marker);
17971738
let latest_central_block = central_source.get_latest_block().await?;
17981739
*shared_highest_block.write().await = latest_central_block;
17991740
let central_block_marker = latest_central_block.map_or(
@@ -1838,13 +1779,17 @@ fn stream_new_state_diffs<TCentralSource: CentralSourceTrait + Sync + Send>(
18381779
central_source: Arc<TCentralSource>,
18391780
block_propagation_sleep_duration: Duration,
18401781
max_stream_size: u32,
1782+
queue_state_marker: Arc<RwLock<BlockNumber>>,
18411783
) -> impl Stream<Item = Result<SyncEvent, StateSyncError>> {
18421784
try_stream! {
18431785
loop {
18441786
let txn = reader.begin_ro_txn()?;
1845-
let state_marker = txn.get_state_marker()?;
1787+
let db_state_marker = txn.get_state_marker()?;
18461788
let last_block_number = txn.get_header_marker()?;
18471789
drop(txn);
1790+
let queue_marker = *queue_state_marker.read().await;
1791+
// Use max of DB marker and queue marker to skip state diffs already in-flight
1792+
let state_marker = db_state_marker.max(queue_marker);
18481793
if state_marker == last_block_number {
18491794
trace!("State updates syncing reached the last downloaded block {:?}, waiting for more blocks.", state_marker.prev());
18501795
tokio::time::sleep(block_propagation_sleep_duration).await;
@@ -1918,11 +1863,10 @@ impl StateSync {
19181863
class_manager_client,
19191864
middle_queue: FuturesOrdered::new(),
19201865
batch_queue: Vec::new(),
1921-
queued_blocks: HashSet::new(),
1922-
queued_state_diffs: HashSet::new(),
1923-
queued_compiled_classes: HashSet::new(),
1924-
queued_base_layer_blocks: HashSet::new(),
1925-
duplicates_skipped: 0,
1866+
queue_header_marker: Arc::new(RwLock::new(BlockNumber::default())),
1867+
queue_state_marker: Arc::new(RwLock::new(BlockNumber::default())),
1868+
queue_compiled_class_marker: Arc::new(RwLock::new(BlockNumber::default())),
1869+
queue_base_layer_marker: Arc::new(RwLock::new(BlockNumber::default())),
19261870
}
19271871
}
19281872
}
@@ -1933,11 +1877,15 @@ fn stream_new_compiled_classes<TCentralSource: CentralSourceTrait + Sync + Send>
19331877
block_propagation_sleep_duration: Duration,
19341878
max_stream_size: u32,
19351879
store_sierras_and_casms: bool,
1880+
queue_compiled_class_marker: Arc<RwLock<BlockNumber>>,
19361881
) -> impl Stream<Item = Result<SyncEvent, StateSyncError>> {
19371882
try_stream! {
19381883
loop {
19391884
let txn = reader.begin_ro_txn()?;
1940-
let mut from = txn.get_compiled_class_marker()?;
1885+
let db_compiled_class_marker = txn.get_compiled_class_marker()?;
1886+
let queue_marker = *queue_compiled_class_marker.read().await;
1887+
// Use max of DB marker and queue marker to skip compiled classes already in-flight
1888+
let mut from = db_compiled_class_marker.max(queue_marker);
19411889
let state_marker = txn.get_state_marker()?;
19421890
let compiler_backward_compatibility_marker = txn.get_compiler_backward_compatibility_marker()?;
19431891
// Avoid starting streams from blocks without declared classes.
@@ -1977,6 +1925,10 @@ fn stream_new_compiled_classes<TCentralSource: CentralSourceTrait + Sync + Send>
19771925
continue;
19781926
}
19791927

1928+
// Update queue marker BEFORE fetching to prevent duplicate requests
1929+
// This tells future iterations: "we're fetching up to block X, don't re-fetch it"
1930+
*queue_compiled_class_marker.write().await = up_to;
1931+
19801932
debug!("Downloading compiled classes of blocks [{} - {}).", from, up_to);
19811933
let compiled_classes_stream =
19821934
central_source.stream_compiled_classes(from, up_to).fuse();

0 commit comments

Comments
 (0)