Skip to content

Commit 924b662

Browse files
Better shutdown. (#867)
## 📝 Summary Shutdown is now performed in 2 steps: 1- Cancellation: We propagate a cancel signal so we stop building, data stops flowing and channels close. We expect all critical threads to finish. 2- Abort: If some critical thread is still alive we trigger a second signal so critical tasks stop waiting for data (assuming the source is hanged) and perform any necessary clean up. Sleeps were replaced by JoinHandles. ## ✅ I have completed the following steps: * [X] Run `make lint` * [X] Run `make test` * [ ] Added tests (if applicable) --------- Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com>
1 parent 6aa0e31 commit 924b662

File tree

11 files changed

+316
-117
lines changed

11 files changed

+316
-117
lines changed

crates/rbuilder-operator/src/clickhouse.rs

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use rbuilder::{
1313
relay_submit::{AlwaysSubmitPolicy, RelaySubmissionPolicy},
1414
},
1515
payload_events::MevBoostSlotData,
16-
process_killer::RUN_SUBMIT_TO_RELAYS_JOB_CANCEL_TIME,
1716
},
1817
};
1918
use rbuilder_primitives::{Order, OrderId};
@@ -28,6 +27,7 @@ use rbuilder_utils::clickhouse::{
2827
use serde::{Deserialize, Serialize};
2928
use time::OffsetDateTime;
3029
use tokio::sync::mpsc;
30+
use tokio::task::JoinHandle;
3131
use tokio_util::sync::CancellationToken;
3232
use tracing::error;
3333

@@ -164,13 +164,20 @@ impl RelaySubmissionPolicy for BackupNotTooBigRelaySubmissionPolicy {
164164
}
165165

166166
impl BuiltBlocksWriter {
167-
/// Returns the writer and the submission policy what asks to stop submitting blocks if the disk backup is too big.
168-
/// It's a little ugly/coupled that we generate this here but it was easier than injecting a metric observer.
167+
/// Returns the writer, the submission policy, and a JoinHandle for the clickhouse tasks.
168+
/// The JoinHandle can be awaited to ensure clickhouse tasks have completed during shutdown.
169+
/// It's a little ugly/coupled that we generate the submission policy here but it was easier than injecting a metric observer.
170+
/// abort_token is a last resort cancellation token used to cancel clickhouse tasks if the BuiltBlocksWriter is not
171+
/// released. It will tell clickhouse to stop listening for new blocks and flush the backup process.
169172
pub fn new(
170173
config: BuiltBlocksClickhouseConfig,
171174
rbuilder_commit: String,
172-
cancellation_token: CancellationToken,
173-
) -> eyre::Result<(Self, Box<dyn RelaySubmissionPolicy + Send + Sync>)> {
175+
abort_token: CancellationToken,
176+
) -> eyre::Result<(
177+
Self,
178+
Box<dyn RelaySubmissionPolicy + Send + Sync>,
179+
JoinHandle<()>,
180+
)> {
174181
let backup_max_size_bytes =
175182
config.disk_max_size_mb.unwrap_or(DEFAULT_MAX_DISK_SIZE_MB) * MEGA;
176183
let submission_policy: Box<dyn RelaySubmissionPolicy + Send + Sync> =
@@ -222,35 +229,37 @@ impl BuiltBlocksWriter {
222229
let end_timeout =
223230
Duration::from_millis(config.end_timeout_ms.unwrap_or(DEFAULT_END_TIMEOUT_MS));
224231

225-
spawn_clickhouse_inserter_and_backup::<BlockRow, BlockRow, ClickhouseMetrics>(
226-
&client,
227-
block_rx,
228-
&task_executor,
229-
BLOCKS_TABLE_NAME.to_string(),
230-
"".to_string(), // No buildername used in blocks table.
231-
disk_backup,
232-
config
233-
.memory_max_size_mb
234-
.unwrap_or(DEFAULT_MAX_MEMORY_SIZE_MB)
235-
* MEGA,
236-
send_timeout,
237-
end_timeout,
238-
BLOCKS_TABLE_NAME,
239-
);
240-
// Task to forward the cancellation to the task_manager.
232+
let clickhouse_shutdown_handle =
233+
spawn_clickhouse_inserter_and_backup::<BlockRow, BlockRow, ClickhouseMetrics>(
234+
&client,
235+
block_rx,
236+
&task_executor,
237+
BLOCKS_TABLE_NAME.to_string(),
238+
"".to_string(), // No buildername used in blocks table.
239+
disk_backup,
240+
config
241+
.memory_max_size_mb
242+
.unwrap_or(DEFAULT_MAX_MEMORY_SIZE_MB)
243+
* MEGA,
244+
send_timeout,
245+
end_timeout,
246+
BLOCKS_TABLE_NAME,
247+
);
248+
249+
// Task to forward the abort to the task_manager.
241250
tokio::spawn(async move {
242-
cancellation_token.cancelled().await;
243-
// @Pending: Needed to avoid losing blocks but we should try to avoid this.
244-
tokio::time::sleep(RUN_SUBMIT_TO_RELAYS_JOB_CANCEL_TIME).await;
245-
task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
251+
abort_token.cancelled().await;
252+
task_manager.graceful_shutdown();
246253
});
254+
247255
Ok((
248256
Self {
249257
blocks_tx: block_tx,
250258
rbuilder_commit,
251259
builder_name: config.builder_name,
252260
},
253261
submission_policy,
262+
clickhouse_shutdown_handle,
254263
))
255264
}
256265
}

crates/rbuilder-operator/src/flashbots_config.rs

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use crate::{
4848

4949
use clickhouse::Client;
5050
use std::{path::PathBuf, sync::Arc};
51+
use tokio::task::JoinHandle;
5152

5253
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)]
5354
pub struct ClickhouseConfig {
@@ -155,6 +156,7 @@ impl LiveBuilderConfig for FlashbotsConfig {
155156
where
156157
P: StateProviderFactory + Clone + 'static,
157158
{
159+
let abort_token = CancellationToken::new();
158160
if self.l1_config.relay_bid_scrapers.is_empty() {
159161
eyre::bail!("relay_bid_scrapers is not set");
160162
}
@@ -171,34 +173,45 @@ impl LiveBuilderConfig for FlashbotsConfig {
171173
)
172174
.await?;
173175

174-
let (bid_observer, submission_policy) = self
175-
.create_bid_observer_and_submission_policy(&cancellation_token)
176+
let (bid_observer, submission_policy, clickhouse_shutdown_handle) = self
177+
.create_bid_observer_and_submission_policy(&cancellation_token, &abort_token)
176178
.await?;
177179

178-
let (sink_factory, slot_info_provider, adjustment_fee_payers) =
179-
create_sink_factory_and_relays(
180-
&self.base_config,
181-
&self.l1_config,
182-
bidding_service.relay_sets().to_vec(),
183-
wallet_balance_watcher,
184-
bid_observer,
185-
submission_policy,
186-
bidding_service.clone(),
187-
cancellation_token.clone(),
188-
)
189-
.await?;
180+
let (
181+
sink_factory,
182+
slot_info_provider,
183+
adjustment_fee_payers,
184+
optimistic_v3_server_join_handle,
185+
) = create_sink_factory_and_relays(
186+
&self.base_config,
187+
&self.l1_config,
188+
bidding_service.relay_sets().to_vec(),
189+
wallet_balance_watcher,
190+
bid_observer,
191+
submission_policy,
192+
bidding_service.clone(),
193+
cancellation_token.clone(),
194+
)
195+
.await?;
190196

191-
let live_builder = create_builder_from_sink(
197+
let mut live_builder = create_builder_from_sink(
192198
&self.base_config,
193199
&self.l1_config,
194200
provider,
195201
sink_factory,
196202
slot_info_provider,
197203
adjustment_fee_payers,
198204
cancellation_token,
205+
abort_token,
199206
)
200207
.await?;
201208

209+
if let Some(handle) = clickhouse_shutdown_handle {
210+
live_builder.add_critical_task(handle);
211+
}
212+
if let Some(optimistic_v3_server_join_handle) = optimistic_v3_server_join_handle {
213+
live_builder.add_critical_task(optimistic_v3_server_join_handle);
214+
}
202215
let mut module = RpcModule::new(());
203216
module.register_async_method("bid_subsidiseBlock", move |params, _| {
204217
handle_subsidise_block(bidding_service.clone(), params)
@@ -318,28 +331,35 @@ impl FlashbotsConfig {
318331
/// Depending on the cfg may create:
319332
/// - Dummy sink (no built_blocks_clickhouse_config)
320333
/// - BuiltBlocksWriter that writes to clickhouse
334+
///
335+
/// Returns (BidObserver, RelaySubmissionPolicy, Option<JoinHandle> for clickhouse shutdown)
321336
#[allow(clippy::type_complexity)]
322337
fn create_clickhouse_writer_and_submission_policy(
323338
&self,
324-
cancellation_token: &CancellationToken,
339+
clickhouse_abort_token: &CancellationToken,
325340
block_processor_key: Option<PrivateKeySigner>,
326341
) -> eyre::Result<(
327342
Option<Box<dyn BidObserver + Send + Sync>>,
328343
Box<dyn RelaySubmissionPolicy + Send + Sync>,
344+
Option<JoinHandle<()>>,
329345
)> {
330346
if let Some(built_blocks_clickhouse_config) = &self.built_blocks_clickhouse_config {
331347
let rbuilder_version = rbuilder_version();
332-
let (writer, submission_policy) = BuiltBlocksWriter::new(
348+
let (writer, submission_policy, shutdown_handle) = BuiltBlocksWriter::new(
333349
built_blocks_clickhouse_config.clone(),
334350
rbuilder_version.git_commit,
335-
cancellation_token.clone(),
351+
clickhouse_abort_token.clone(),
336352
)?;
337-
Ok((Some(Box::new(writer)), submission_policy))
353+
Ok((
354+
Some(Box::new(writer)),
355+
submission_policy,
356+
Some(shutdown_handle),
357+
))
338358
} else {
339359
if block_processor_key.is_some() {
340360
return Self::bail_blocks_processor_url_not_set();
341361
}
342-
Ok((None, Box::new(AlwaysSubmitPolicy {})))
362+
Ok((None, Box::new(AlwaysSubmitPolicy {}), None))
343363
}
344364
}
345365

@@ -348,12 +368,17 @@ impl FlashbotsConfig {
348368
}
349369

350370
/// Depending on the cfg add a BlocksProcessorClientBidObserver and/or a true value pusher.
371+
/// Returns (BidObserver, RelaySubmissionPolicy, Option<JoinHandle> for clickhouse shutdown)
372+
/// cancellation_token: used to cancel tbv_pusher
373+
/// clickhouse_abort_token: used to cancel clickhouse tasks if source is hanged.
351374
async fn create_bid_observer_and_submission_policy(
352375
&self,
353376
cancellation_token: &CancellationToken,
377+
clickhouse_abort_token: &CancellationToken,
354378
) -> eyre::Result<(
355379
Box<dyn BidObserver + Send + Sync>,
356380
Box<dyn RelaySubmissionPolicy + Send + Sync>,
381+
Option<JoinHandle<()>>,
357382
)> {
358383
let block_processor_key = if let Some(key_registration_url) = &self.key_registration_url {
359384
if self.blocks_processor_url.is_none() {
@@ -364,16 +389,20 @@ impl FlashbotsConfig {
364389
None
365390
};
366391

367-
let (clickhouse_writer, submission_policy) = self
392+
let (clickhouse_writer, submission_policy, clickhouse_shutdown_handle) = self
368393
.create_clickhouse_writer_and_submission_policy(
369-
cancellation_token,
394+
clickhouse_abort_token,
370395
block_processor_key.clone(),
371396
)?;
372397
let bid_observer = RbuilderOperatorBidObserver {
373398
clickhouse_writer,
374399
tbv_pusher: self.create_tbv_pusher(block_processor_key, cancellation_token)?,
375400
};
376-
Ok((Box::new(bid_observer), submission_policy))
401+
Ok((
402+
Box::new(bid_observer),
403+
submission_policy,
404+
clickhouse_shutdown_handle,
405+
))
377406
}
378407

379408
fn create_tbv_pusher(

crates/rbuilder-utils/src/clickhouse/backup/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use derive_more::{Deref, DerefMut};
1313
use redb::{ReadableDatabase, ReadableTable, ReadableTableMetadata};
1414
use strum::AsRefStr;
1515
use tokio::sync::mpsc;
16+
use tokio::task::JoinHandle;
1617

1718
use crate::{
1819
backoff::BackoffInterval,
@@ -761,7 +762,14 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> Backup<T, MetricsType> {
761762
}
762763

763764
/// Spawns the inserter runner on the given task executor.
764-
pub fn spawn(mut self, task_executor: &TaskExecutor, name: String, target: &'static str)
765+
/// Returns a JoinHandle that resolves when the task completes.
766+
/// On shutdown will stop processing new data flush the backup. New data might be lost.
767+
pub fn spawn(
768+
mut self,
769+
task_executor: &TaskExecutor,
770+
name: String,
771+
target: &'static str,
772+
) -> JoinHandle<()>
765773
where
766774
MetricsType: Send + Sync + 'static,
767775
for<'a> <T as clickhouse::Row>::Value<'a>: Sync,
@@ -784,7 +792,7 @@ impl<T: ClickhouseRowExt, MetricsType: Metrics> Backup<T, MetricsType> {
784792
"Clickhouse backup cleanup complete"
785793
);
786794
drop(shutdown_guard);
787-
});
795+
})
788796
}
789797
}
790798

crates/rbuilder-utils/src/clickhouse/indexer.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use clickhouse::{
1313
};
1414
use reth_tasks::TaskExecutor;
1515
use tokio::sync::mpsc;
16+
use tokio::task::JoinHandle;
1617

1718
use crate::{
1819
clickhouse::{
@@ -215,7 +216,14 @@ impl<T: ClickhouseIndexableData, MetricsType: Metrics> InserterRunner<T, Metrics
215216
}
216217

217218
/// Spawns the inserter runner on the given task executor.
218-
pub fn spawn(mut self, task_executor: &TaskExecutor, name: String, target: &'static str)
219+
/// Returns a JoinHandle that resolves when the task completes.
220+
/// On shutdown will stop processing new data flush the inserter. New data might be lost.
221+
pub fn spawn(
222+
mut self,
223+
task_executor: &TaskExecutor,
224+
name: String,
225+
target: &'static str,
226+
) -> JoinHandle<()>
219227
where
220228
T: Send + Sync + 'static,
221229
MetricsType: Send + Sync + 'static,
@@ -242,8 +250,7 @@ impl<T: ClickhouseIndexableData, MetricsType: Metrics> InserterRunner<T, Metrics
242250
}
243251
}
244252
drop(shutdown_guard);
245-
246-
});
253+
})
247254
}
248255
}
249256

crates/rbuilder-utils/src/clickhouse/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use ::serde::{Deserialize, Serialize};
77
use clickhouse::Client;
88
use reth_tasks::TaskExecutor;
99
use tokio::sync::mpsc;
10+
use tokio::task::JoinHandle;
1011

1112
use crate::clickhouse::{
1213
backup::{
@@ -59,6 +60,7 @@ impl From<Quantities> for clickhouse::inserter::Quantities {
5960
const BACKUP_INPUT_CHANNEL_BUFFER_SIZE: usize = 128;
6061

6162
/// Main func to spawn the clickhouse inserter and backup tasks.
63+
/// Returns a JoinHandle that resolves when both tasks have completed.
6264
#[allow(clippy::too_many_arguments)]
6365
pub fn spawn_clickhouse_inserter_and_backup<
6466
DataType: ClickhouseIndexableData + Send + Sync + 'static,
@@ -75,7 +77,8 @@ pub fn spawn_clickhouse_inserter_and_backup<
7577
send_timeout: Duration,
7678
end_timeout: Duration,
7779
tracing_target: &'static str,
78-
) where
80+
) -> JoinHandle<()>
81+
where
7982
for<'a> <DataType::ClickhouseRowType as clickhouse::Row>::Value<'a>: Sync,
8083
{
8184
let backup_table_name = RowType::TABLE_NAME.to_string();
@@ -93,6 +96,13 @@ pub fn spawn_clickhouse_inserter_and_backup<
9396
disk_backup_db,
9497
)
9598
.with_memory_backup_config(MemoryBackupConfig::new(memory_max_size_bytes));
96-
inserter_runner.spawn(task_executor, backup_table_name.clone(), tracing_target);
97-
backup.spawn(task_executor, backup_table_name, tracing_target);
99+
100+
let inserter_handle =
101+
inserter_runner.spawn(task_executor, backup_table_name.clone(), tracing_target);
102+
let backup_handle = backup.spawn(task_executor, backup_table_name, tracing_target);
103+
104+
// Spawn a wrapper task that waits for both to complete
105+
tokio::spawn(async move {
106+
let _ = tokio::join!(inserter_handle, backup_handle);
107+
})
98108
}

crates/rbuilder/src/live_builder/base_config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ impl BaseConfig {
219219
pub async fn create_builder_with_provider_factory<P>(
220220
&self,
221221
cancellation_token: tokio_util::sync::CancellationToken,
222+
global_abort: tokio_util::sync::CancellationToken,
222223
unfinished_built_blocks_input_factory: UnfinishedBuiltBlocksInputFactory<P>,
223224
slot_source: MevBoostSlotDataGenerator,
224225
provider: P,
@@ -259,6 +260,8 @@ impl BaseConfig {
259260
blocklist_provider,
260261

261262
global_cancellation: cancellation_token.clone(),
263+
global_abort,
264+
critical_tasks_join_handles: Vec::new(),
262265
process_killer: ProcessKiller::new(cancellation_token),
263266
extra_rpc: RpcModule::new(()),
264267
unfinished_built_blocks_input_factory,

0 commit comments

Comments
 (0)