Skip to content

Commit 9fd774a

Browse files
committed
feat: properly run ingestor and block builder concurrently. also fix ctrl+c
1 parent 6215a3f commit 9fd774a

File tree

5 files changed

+73
-81
lines changed

5 files changed

+73
-81
lines changed

Cargo.lock

Lines changed: 11 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

metabased-translator/bin/metabased-translator/src/main.rs

Lines changed: 34 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async fn run(
3333
let sequencing_config = ingestion_config.sequencing;
3434
let settlement_config = ingestion_config.settlement;
3535

36-
let (mut sequencer_ingestor, sequencer_rx) =
36+
let (mut sequencing_ingestor, sequencer_rx) =
3737
Ingestor::new(sequencing_config.into()).await.map_err(|e| {
3838
RuntimeError::Initialization(format!(
3939
"Failed to create ingestor for sequencing chain: {}",
@@ -50,60 +50,53 @@ async fn run(
5050
})?;
5151

5252
let slotter = Slotter::new(sequencer_rx, settlement_rx, slotting_config);
53-
let slotter_rx = slotter.await.start(); // TODO(SEQ-515): refactor me to get the channel without starting the slotter already?
5453

55-
let block_builder = BlockBuilder::new(slotter_rx, block_builder_config).await.map_err(|e| {
54+
// TODO(SEQ-515): refactor me to get the channel without starting the slotter already?
55+
// TODO(SEQ-515): slotter assumes that it starts first, or else it errors here
56+
let slot_rx = slotter.await.start();
57+
58+
let block_builder = BlockBuilder::new(slot_rx, block_builder_config).await.map_err(|e| {
5659
RuntimeError::Initialization(format!("Failed to create block builder: {}", e))
5760
})?;
5861

59-
let _ = block_builder.start().await;
6062
info!("Starting Metabased Translator");
63+
let sequencing_ingestor_handle = tokio::spawn(async move {
64+
if let Err(e) = sequencing_ingestor.start_polling().await {
65+
error!("Ingestor error: {}", e);
66+
}
67+
});
6168

62-
sequencer_ingestor.start_polling().await.map_err(|e| {
63-
error!("Sequencer ingestor error: {}", e);
64-
RuntimeError::TaskFailure(e.to_string())
65-
})?;
66-
67-
settlement_ingestor.start_polling().await.map_err(|e| {
68-
error!("Settlement ingestor error: {}", e);
69-
RuntimeError::TaskFailure(e.to_string())
70-
})?;
69+
let settlement_ingestor_handle = tokio::spawn(async move {
70+
if let Err(e) = settlement_ingestor.start_polling().await {
71+
error!("Ingestor error: {}", e);
72+
}
73+
});
7174

72-
// TODO(SEQ-515): Improve this
73-
// // Spawn ingestor task
74-
// let ingestor_handle = tokio::spawn(async move {
75-
// if let Err(e) = ingestor?.start_polling().await {
76-
// error!("Ingestor error: {}", e);
77-
// }
78-
// });
79-
80-
// TODO(SEQ-515): Improve this
81-
// // Spawn block builder task
82-
// let block_builder_handle = tokio::spawn(async move {
83-
// if let Err(e) = block_builder.start().await {
84-
// error!("Block builder error: {}", e);
85-
// }
86-
// });
75+
// TODO(SEQ-515): Block builder doesn't error
76+
let block_builder_handle = tokio::spawn(async move { block_builder.start().await });
8777

8878
// Main control loop
8979
tokio::select! {
9080
// Wait for shutdown signal
9181
_ = &mut shutdown_rx => {
9282
info!("Metabased Translator shutting down...");
9383
}
94-
95-
// TODO(SEQ-515): Improve this
96-
// // Watch for task completion/errors
97-
// res = ingestor_handle => {
98-
// if let Err(e) = res {
99-
// error!("Ingestor task failed: {}", e);
100-
// }
101-
// }
102-
// res = block_builder_handle => {
103-
// if let Err(e) = res {
104-
// error!("Block builder task failed: {}", e);
105-
// }
106-
// }
84+
// Watch for task completion/errors
85+
res = settlement_ingestor_handle => {
86+
if let Err(e) = res {
87+
error!("Settlement chain ingestor task failed: {}", e);
88+
}
89+
}
90+
res = sequencing_ingestor_handle => {
91+
if let Err(e) = res {
92+
error!("Sequencing chain ingestor task failed: {}", e);
93+
}
94+
}
95+
res = block_builder_handle => {
96+
if let Err(e) = res {
97+
error!("Block builder task failed: {}", e);
98+
}
99+
}
107100
}
108101

109102
info!("Metabased Translator shutdown complete");

metabased-translator/crates/block-builder/src/block_builder.rs

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,36 +37,34 @@ impl BlockBuilder {
3737

3838
/// Start the block builder
3939
pub async fn start(mut self) {
40-
tokio::spawn(async move {
41-
while let Some(slot) = self.slotter_rx.recv().await {
42-
info!("Received slot: {:?}", slot);
43-
44-
// Process sequencing chain blocks into mB transactions
45-
let mbtxs = self.builder.parse_blocks_to_mbtxs(slot.sequencing_chain_blocks);
46-
47-
// TODO (SEQ-416): [OP / ARB] Process deposit transactions
48-
49-
// [OP / ARB] Build and submit batch
50-
let batch_txn = match self.builder.build_batch_txn(mbtxs).await {
51-
Ok(txn) => txn,
52-
Err(e) => {
53-
log_error!("Error building batch transaction: {}", e);
54-
continue;
55-
}
56-
};
57-
58-
// Submit batch transaction to mchain
59-
if let Err(e) = self.mchain.submit_txn(batch_txn).await {
60-
log_error!("Error submitting transaction: {}", e);
40+
while let Some(slot) = self.slotter_rx.recv().await {
41+
info!("Received slot: {:?}", slot);
42+
43+
// Process sequencing chain blocks into mB transactions
44+
let mbtxs = self.builder.parse_blocks_to_mbtxs(slot.sequencing_chain_blocks);
45+
46+
// TODO (SEQ-416): [OP / ARB] Process deposit transactions
47+
48+
// [OP / ARB] Build and submit batch
49+
let batch_txn = match self.builder.build_batch_txn(mbtxs).await {
50+
Ok(txn) => txn,
51+
Err(e) => {
52+
log_error!("Error building batch transaction: {}", e);
6153
continue;
6254
}
55+
};
6356

64-
// Mine mchain block
65-
if let Err(e) = self.mchain.mine_block(slot.timestamp).await {
66-
log_error!("Error mining block: {}", e);
67-
}
57+
// Submit batch transaction to mchain
58+
if let Err(e) = self.mchain.submit_txn(batch_txn).await {
59+
log_error!("Error submitting transaction: {}", e);
60+
continue;
61+
}
62+
63+
// Mine mchain block
64+
if let Err(e) = self.mchain.mine_block(slot.timestamp).await {
65+
log_error!("Error mining block: {}", e);
6866
}
69-
});
67+
}
7068
}
7169
}
7270

metabased-translator/crates/block-builder/src/connectors/anvil.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ impl MetaChainProvider {
104104
/// Mines a block on the `MetaChain`
105105
pub async fn mine_block(&self, block_timestamp_secs: u64) -> eyre::Result<()> {
106106
let opts = MineOptions::Options { timestamp: Some(block_timestamp_secs), blocks: Some(1) };
107-
self.provider.anvil_mine_detailed(Some(opts)).await?;
107+
let result = self.provider.anvil_mine_detailed(Some(opts)).await;
108+
info!("{}", format!("Mined block on MetaChain {:?}", result));
109+
result?;
108110

109111
Ok(())
110112
}

metabased-translator/crates/slotting/src/slotting.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ impl Slotter {
112112
/// passed)
113113
/// 4. Send completed slots through the returned channel
114114
///
115-
/// # Returns a receiver that will get slots as they are processed.
116115
/// TODO SEQ-480 - implement restore from DB
117116
pub fn start(mut self) -> Receiver<Slot> {
118117
self.status.store(Status::Started);
@@ -186,7 +185,6 @@ impl Slotter {
186185
}
187186
}
188187
});
189-
190188
receiver
191189
}
192190

@@ -456,6 +454,7 @@ mod tests {
456454
use super::*;
457455
use alloy::primitives::B256;
458456
use std::str::FromStr;
457+
use tokio::sync::mpsc::channel;
459458
async fn create_slotter(
460459
slot_start_timestamp_ms: u64,
461460
slot_duration_ms: u64,

0 commit comments

Comments
 (0)