Skip to content

Commit 39d4c91

Browse files
re-implementation
1 parent 4968347 commit 39d4c91

File tree

1 file changed

+92
-137
lines changed
  • crates/apollo_central_sync/src

1 file changed

+92
-137
lines changed

crates/apollo_central_sync/src/lib.rs

Lines changed: 92 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub mod sources;
88
mod sync_test;
99

1010
use std::cmp::min;
11-
use std::collections::{BTreeMap, HashMap};
11+
use std::collections::BTreeMap;
1212
use std::pin::Pin;
1313
use std::sync::Arc;
1414
use std::time::{Duration, Instant};
@@ -32,7 +32,7 @@ use apollo_state_sync_metrics::metrics::{
3232
STATE_SYNC_STATE_MARKER,
3333
};
3434
use apollo_storage::base_layer::{BaseLayerStorageReader, BaseLayerStorageWriter};
35-
use apollo_storage::body::{BodyStorageReader, BodyStorageWriter};
35+
use apollo_storage::body::BodyStorageWriter;
3636
use apollo_storage::class::{ClassStorageReader, ClassStorageWriter};
3737
use apollo_storage::class_manager::{ClassManagerStorageReader, ClassManagerStorageWriter};
3838
use apollo_storage::compiled_class::{CasmStorageReader, CasmStorageWriter};
@@ -53,7 +53,6 @@ use serde::{Deserialize, Serialize};
5353
use sources::base_layer::BaseLayerSourceError;
5454
use starknet_api::block::{
5555
Block,
56-
BlockBody,
5756
BlockHash,
5857
BlockHashAndNumber,
5958
BlockNumber,
@@ -89,15 +88,20 @@ const SLEEP_TIME_SYNC_PROGRESS: Duration = Duration::from_secs(300);
8988
// will compile them, in a backward-compatible manner.
9089
const STARKNET_VERSION_TO_COMPILE_FROM: StarknetVersion = StarknetVersion::V0_12_0;
9190

92-
// Unified data type for ALL blocks from ALL streams
91+
// Unified data type for ALL items from ALL streams
92+
// Each item goes into middle_queue as a separate future
9393
#[derive(Debug)]
9494
enum ProcessedBlockData {
95-
// COMPLETE block: header + state diff processed together (NEW UNIFIED DESIGN)
96-
// This ensures ONE future per block number in middle_queue
97-
CompleteBlock {
95+
// From BlockAvailable stream - header + body + signature
96+
Block {
9897
block_number: BlockNumber,
9998
block: Block,
10099
signature: BlockSignature,
100+
},
101+
// From StateDiffAvailable stream - state diff (after compilation if needed)
102+
StateDiff {
103+
block_number: BlockNumber,
104+
block_hash: BlockHash,
101105
thin_state_diff: ThinStateDiff,
102106
classes: IndexMap<ClassHash, SierraContractClass>,
103107
deprecated_classes: IndexMap<ClassHash, DeprecatedContractClass>,
@@ -273,15 +277,13 @@ pub struct GenericStateSync<
273277
writer: Arc<Mutex<StorageWriter>>,
274278
sequencer_pub_key: Option<SequencerPublicKey>,
275279
class_manager_client: Option<SharedClassManagerClient>,
276-
// Temporary buffer for headers waiting for their corresponding state diff
277-
// Key: block_number, Value: (Block, BlockSignature)
278-
pending_blocks: HashMap<BlockNumber, (Block, BlockSignature)>,
279-
// Middle queue: FuturesOrdered containing ONE future per block number
280-
// Each future processes the COMPLETE block (header + state diff + compilation if needed)
281-
// This queue maintains ordering while allowing concurrent compilation
280+
// Middle queue: FuturesOrdered containing ALL incoming items from feeder gateway
281+
// Each event (BlockAvailable, StateDiffAvailable, etc.) creates a future
282+
// Futures are immediate for headers, async for state diff compilation
283+
// FuturesOrdered maintains ORDER: items come out in the order they went in
282284
middle_queue: FuturesOrdered<ProcessingTask>,
283-
// Batch queue: Final queue for ready-to-flush blocks
284-
// When this fills up to batch_size, we flush it to the database in a single transaction
285+
// Batch queue: accumulates ready items from middle_queue before flushing
286+
// When this reaches config.block_batch_size, we flush all items in ONE DB transaction
285287
batch_queue: Vec<ProcessedBlockData>,
286288
}
287289

@@ -523,18 +525,16 @@ impl<
523525
Some(result) = self.middle_queue.next(), if !self.middle_queue.is_empty() => {
524526
match result {
525527
Ok(processed_block) => {
526-
let block_number = match &processed_block {
527-
ProcessedBlockData::CompleteBlock { block_number, .. } => *block_number,
528-
ProcessedBlockData::CompiledClass { .. } => {
529-
debug!("Compiled class ready from middle_queue.");
530-
BlockNumber(0) // Placeholder for logging
531-
},
532-
ProcessedBlockData::BaseLayerBlock { block_number, .. } => *block_number,
528+
let item_desc = match &processed_block {
529+
ProcessedBlockData::Block { block_number, .. } => format!("Block {} (header)", block_number),
530+
ProcessedBlockData::StateDiff { block_number, .. } => format!("Block {} (state diff)", block_number),
531+
ProcessedBlockData::CompiledClass { .. } => "Compiled class".to_string(),
532+
ProcessedBlockData::BaseLayerBlock { block_number, .. } => format!("Base layer block {}", block_number),
533533
};
534534

535-
info!(
536-
"Block {} (COMPLETE) ready from middle_queue. Adding to batch_queue ({}/{})...",
537-
block_number,
535+
debug!(
536+
"{} ready from middle_queue. Adding to batch_queue ({}/{})...",
537+
item_desc,
538538
self.batch_queue.len() + 1,
539539
self.config.block_batch_size
540540
);
@@ -592,17 +592,25 @@ impl<
592592
match sync_event {
593593
SyncEvent::BlockAvailable { block_number, block, signature } => {
594594
if self.config.enable_block_batching {
595-
// CRITICAL: Write header to DB immediately so state_diff_stream can progress!
596-
// Otherwise state_diff_stream waits for header marker and never delivers StateDiffAvailable
597-
self.store_block(block_number, block.clone(), signature.clone()).await?;
598-
599-
// ALSO store in pending_blocks for combining with state diff later
600-
self.pending_blocks.insert(block_number, (block, signature));
601-
595+
// Create an immediate future that returns the Block data
596+
// No async work needed - just wrap the data
597+
let block_future: ProcessingTask = Box::pin(async move {
598+
Ok(ProcessedBlockData::Block {
599+
block_number,
600+
block,
601+
signature,
602+
})
603+
});
604+
605+
// Push to middle_queue (FuturesOrdered maintains order)
606+
self.middle_queue.push_back(block_future);
607+
602608
debug!(
603-
"Block {} header written to DB and stored in pending_blocks. Pending: {}",
609+
"Block {} (header) added to middle_queue. Queue size: {}, Batch queue: {}/{}",
604610
block_number,
605-
self.pending_blocks.len()
611+
self.middle_queue.len(),
612+
self.batch_queue.len(),
613+
self.config.block_batch_size
606614
);
607615

608616
Ok(())
@@ -617,58 +625,11 @@ impl<
617625
deployed_contract_class_definitions,
618626
} => {
619627
if self.config.enable_block_batching {
620-
// Get the header for this block from pending_blocks, or fetch from DB
621-
let (block, signature) = if let Some(header) = self.pending_blocks.remove(&block_number) {
622-
header
623-
} else {
624-
// Header not in pending_blocks - fetch from DB
625-
// This can happen if we're resuming from a previous run
626-
let txn = self.reader.begin_ro_txn()?;
627-
let header = txn.get_block_header(block_number)?
628-
.ok_or_else(|| StorageError::DBInconsistency {
629-
msg: format!("State diff arrived for block {} but header not found in DB or pending_blocks", block_number)
630-
})?;
631-
let signature = txn.get_block_signature(block_number)?
632-
.ok_or_else(|| StorageError::DBInconsistency {
633-
msg: format!("State diff arrived for block {} but signature not found in DB", block_number)
634-
})?;
635-
let transactions = txn.get_block_transactions(block_number)?
636-
.ok_or_else(|| StorageError::DBInconsistency {
637-
msg: format!("State diff arrived for block {} but transactions not found in DB", block_number)
638-
})?;
639-
let transaction_outputs = txn.get_block_transaction_outputs(block_number)?
640-
.ok_or_else(|| StorageError::DBInconsistency {
641-
msg: format!("State diff arrived for block {} but transaction outputs not found in DB", block_number)
642-
})?;
643-
let transaction_hashes = txn.get_block_transaction_hashes(block_number)?
644-
.ok_or_else(|| StorageError::DBInconsistency {
645-
msg: format!("State diff arrived for block {} but transaction hashes not found in DB", block_number)
646-
})?;
647-
drop(txn);
648-
649-
let block = Block {
650-
header,
651-
body: BlockBody {
652-
transactions,
653-
transaction_outputs,
654-
transaction_hashes,
655-
},
656-
};
657-
658-
debug!(
659-
"Block {} fetched from DB (not in pending_blocks). Pending: {}",
660-
block_number,
661-
self.pending_blocks.len()
662-
);
663-
664-
(block, signature)
665-
};
666-
667-
// Create ONE future that processes the COMPLETE block (header + state diff)
628+
// Create an async future that processes (compiles) the state diff
668629
let class_manager_client = self.class_manager_client.clone();
669630
let reader = self.reader.clone();
670631

671-
let complete_block_future: ProcessingTask = Box::pin(async move {
632+
let state_diff_future: ProcessingTask = Box::pin(async move {
672633
// Process state diff (with compilation if needed)
673634
let (thin_state_diff, classes, deprecated_classes, deployed_contract_class_definitions, block_contains_old_classes) =
674635
Self::process_state_diff(
@@ -679,11 +640,10 @@ impl<
679640
deployed_contract_class_definitions,
680641
).await?;
681642

682-
// Return complete block data with BOTH header and state diff
683-
Ok(ProcessedBlockData::CompleteBlock {
643+
// Return state diff data only (header was already added separately)
644+
Ok(ProcessedBlockData::StateDiff {
684645
block_number,
685-
block,
686-
signature,
646+
block_hash,
687647
thin_state_diff,
688648
classes,
689649
deprecated_classes,
@@ -692,16 +652,15 @@ impl<
692652
})
693653
});
694654

695-
// Add to middle_queue - ONE future per block number
696-
self.middle_queue.push_back(complete_block_future);
655+
// Add to middle_queue
656+
self.middle_queue.push_back(state_diff_future);
697657

698-
info!(
699-
"Block {} (COMPLETE: header + state diff) added to middle_queue. Queue size: {}, Batch queue: {}/{}, Pending headers: {}",
658+
debug!(
659+
"Block {} (state diff) added to middle_queue. Queue size: {}, Batch queue: {}/{}",
700660
block_number,
701661
self.middle_queue.len(),
702662
self.batch_queue.len(),
703-
self.config.block_batch_size,
704-
self.pending_blocks.len()
663+
self.config.block_batch_size
705664
);
706665

707666
Ok(())
@@ -1065,7 +1024,8 @@ impl<
10651024
let mut highest_block = BlockNumber(0);
10661025

10671026
// Separate lists for different item types
1068-
let mut complete_blocks_to_write = Vec::new();
1027+
let mut blocks_to_write = Vec::new();
1028+
let mut state_diffs_to_write = Vec::new();
10691029
let mut casms_to_write = Vec::new();
10701030
let mut base_layer_blocks = Vec::new();
10711031
let mut compiled_classes_for_class_manager = Vec::new();
@@ -1075,12 +1035,27 @@ impl<
10751035
let mut max_block = BlockNumber(0);
10761036

10771037
// Separate items by type
1078-
for processed_block in batch {
1079-
match processed_block {
1080-
ProcessedBlockData::CompleteBlock {
1038+
for processed_item in batch {
1039+
match processed_item {
1040+
ProcessedBlockData::Block {
10811041
block_number,
10821042
block,
10831043
signature,
1044+
} => {
1045+
if block_number > highest_block {
1046+
highest_block = block_number;
1047+
}
1048+
if block_number < min_block {
1049+
min_block = block_number;
1050+
}
1051+
if block_number > max_block {
1052+
max_block = block_number;
1053+
}
1054+
blocks_to_write.push((block_number, block, signature));
1055+
}
1056+
ProcessedBlockData::StateDiff {
1057+
block_number,
1058+
block_hash,
10841059
thin_state_diff,
10851060
classes,
10861061
deprecated_classes,
@@ -1096,10 +1071,9 @@ impl<
10961071
if block_number > max_block {
10971072
max_block = block_number;
10981073
}
1099-
complete_blocks_to_write.push((
1074+
state_diffs_to_write.push((
11001075
block_number,
1101-
block,
1102-
signature,
1076+
block_hash,
11031077
thin_state_diff,
11041078
classes,
11051079
deprecated_classes,
@@ -1127,8 +1101,9 @@ impl<
11271101
}
11281102

11291103
info!(
1130-
"Batch separated: {} complete blocks ({} to {}), {} casms, {} base layer. Writing in ONE transaction...",
1131-
complete_blocks_to_write.len(),
1104+
"Batch separated: {} blocks, {} state diffs ({} to {}), {} casms, {} base layer. Writing in ONE transaction...",
1105+
blocks_to_write.len(),
1106+
state_diffs_to_write.len(),
11321107
min_block,
11331108
max_block,
11341109
casms_to_write.len(),
@@ -1142,30 +1117,11 @@ impl<
11421117
let mut txn = writer.begin_rw_txn()?;
11431118
info!("BATCH_STORAGE_TIMING: Single txn begin took {:?}", txn_start.elapsed());
11441119

1145-
// Get current state marker to skip already-written blocks (for crash recovery)
1146-
let current_state_marker = txn.get_state_marker()?;
1147-
info!("BATCH_STORAGE_TIMING: Current state marker is {:?}. Will skip blocks < this.", current_state_marker);
1148-
1149-
let mut blocks_written = 0;
1150-
let mut blocks_skipped = 0;
1151-
1152-
// Write all complete blocks (STATE DIFFS only - headers already written individually)
1153-
// Headers were written immediately when BlockAvailable arrived (to prevent deadlock)
1154-
for (block_number, block, signature, thin_state_diff, classes, deprecated_classes, deployed_contract_class_definitions, block_contains_old_classes) in complete_blocks_to_write {
1155-
// Skip blocks that are already written (for crash recovery)
1156-
if block_number < current_state_marker {
1157-
debug!("Skipping block {} (already written, marker at {})", block_number, current_state_marker);
1158-
blocks_skipped += 1;
1159-
continue;
1160-
}
1161-
1162-
blocks_written += 1;
1163-
if blocks_written == 1 {
1164-
info!("BATCH_STORAGE_TIMING: First block to write: {}", block_number);
1165-
}
1166-
1167-
// NOTE: Headers/bodies/signatures already written in BlockAvailable event handler!
1168-
// We only write state diffs and classes here
1120+
// Write all HEADERS first (they MUST come before state diffs for state_diff_stream to work)
1121+
for (block_number, block, signature) in &blocks_to_write {
1122+
txn = txn.append_header(*block_number, &block.header)?;
1123+
txn = txn.append_block_signature(*block_number, signature)?;
1124+
txn = txn.append_body(*block_number, block.body.clone())?;
11691125

11701126
if block.header.block_header_without_hash.starknet_version
11711127
< STARKNET_VERSION_TO_COMPILE_FROM
@@ -1174,20 +1130,24 @@ impl<
11741130
&block_number.unchecked_next(),
11751131
)?;
11761132
}
1133+
}
1134+
1135+
info!("BATCH_STORAGE_TIMING: Wrote {} headers", blocks_to_write.len());
11771136

1178-
// Write state diff (headers already in DB)
1137+
// Write all STATE DIFFS (after headers are written)
1138+
for (block_number, _block_hash, thin_state_diff, classes, deprecated_classes, deployed_contract_class_definitions, block_contains_old_classes) in &state_diffs_to_write {
11791139
// Update class manager marker if needed
11801140
if has_class_manager {
11811141
txn = txn.update_class_manager_block_marker(&block_number.unchecked_next())?;
11821142
}
11831143

11841144
// Write state diff
1185-
txn = txn.append_state_diff(block_number, thin_state_diff)?;
1145+
txn = txn.append_state_diff(*block_number, thin_state_diff.clone())?;
11861146

11871147
// Write classes if needed
1188-
if store_sierras_and_casms || block_contains_old_classes {
1148+
if store_sierras_and_casms || *block_contains_old_classes {
11891149
txn = txn.append_classes(
1190-
block_number,
1150+
*block_number,
11911151
&classes
11921152
.iter()
11931153
.map(|(class_hash, class)| (*class_hash, class))
@@ -1200,12 +1160,8 @@ impl<
12001160
)?;
12011161
}
12021162
}
1203-
1204-
info!(
1205-
"BATCH_STORAGE_TIMING: Wrote {} state diffs, skipped {} (already in DB)",
1206-
blocks_written,
1207-
blocks_skipped
1208-
);
1163+
1164+
info!("BATCH_STORAGE_TIMING: Wrote {} state diffs", state_diffs_to_write.len());
12091165

12101166
// Write all compiled classes
12111167
for (class_hash, compiled_class) in casms_to_write {
@@ -1819,7 +1775,6 @@ impl StateSync {
18191775
writer: Arc::new(Mutex::new(writer)),
18201776
sequencer_pub_key: None,
18211777
class_manager_client,
1822-
pending_blocks: HashMap::new(),
18231778
middle_queue: FuturesOrdered::new(),
18241779
batch_queue: Vec::new(),
18251780
}

0 commit comments

Comments
 (0)