Skip to content

Commit 25906df

Browse files
committed
refactor(aggregator): use stop signal channel with 'SequentialSignatureProcessor'
In the 'serve' command of the aggregator.
1 parent 82589be commit 25906df

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

mithril-aggregator/src/commands/serve_command.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,18 +132,23 @@ impl ServeCommand {
132132
let mut dependencies_builder =
133133
DependenciesBuilder::new(root_logger.clone(), Arc::new(config.clone()));
134134

135-
// start servers
136135
println!("Starting server...");
137136
println!("Press Ctrl+C to stop");
138137

139-
// start the monitoring thread
138+
// Create the stop signal channel
139+
let (stop_tx, _stop_rx) = dependencies_builder
140+
.get_stop_signal_channel()
141+
.await
142+
.with_context(|| "Dependencies Builder can not create stop signal channel")?;
143+
144+
// Start the monitoring thread
140145
let mut event_store = dependencies_builder
141146
.create_event_store()
142147
.await
143148
.with_context(|| "Dependencies Builder can not create event store")?;
144149
let event_store_thread = tokio::spawn(async move { event_store.run().await.unwrap() });
145150

146-
// start the database vacuum operation, if needed
151+
// Start the database vacuum operation, if needed
147152
self.perform_database_vacuum_if_needed(
148153
&config.data_stores_directory,
149154
&mut dependencies_builder,
@@ -152,15 +157,15 @@ impl ServeCommand {
152157
)
153158
.await?;
154159

155-
// start the aggregator runtime
160+
// Start the aggregator runtime
156161
let mut runtime = dependencies_builder
157162
.create_aggregator_runner()
158163
.await
159164
.with_context(|| "Dependencies Builder can not create aggregator runner")?;
160165
let mut join_set = JoinSet::new();
161166
join_set.spawn(async move { runtime.run().await.map_err(|e| e.to_string()) });
162167

163-
// start the cardano transactions preloader
168+
// Start the cardano transactions preloader
164169
let cardano_transactions_preloader = dependencies_builder
165170
.create_cardano_transactions_preloader()
166171
.await
@@ -170,7 +175,7 @@ impl ServeCommand {
170175
let preload_task =
171176
tokio::spawn(async move { cardano_transactions_preloader.preload().await });
172177

173-
// start the HTTP server
178+
// sStart the HTTP server
174179
let (shutdown_tx, shutdown_rx) = oneshot::channel();
175180
let routes = dependencies_builder
176181
.create_http_routes()
@@ -279,14 +284,16 @@ impl ServeCommand {
279284
.send(())
280285
.map_err(|e| anyhow!("Metrics server shutdown signal could not be sent: {e:?}"))?;
281286

282-
// stop servers
287+
// Stop servers
283288
join_set.shutdown().await;
284289
let _ = shutdown_tx.send(());
285290

291+
// Send the stop signal to all services
292+
let _ = stop_tx.send(());
293+
286294
if !preload_task.is_finished() {
287295
preload_task.abort();
288296
}
289-
signature_processor.stop().await?;
290297

291298
info!(root_logger, "Event store is finishing...");
292299
event_store_thread.await.unwrap();

0 commit comments

Comments
 (0)