Skip to content

Commit 82589be

Browse files
committed
refactor(aggregator): add a stop signal channel in DI
1 parent 27adb8f commit 82589be

File tree

4 files changed

+28
-1
lines changed

4 files changed

+28
-1
lines changed

mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,11 @@ impl DependenciesBuilder {
8181

8282
/// Builds a [SignatureProcessor]
8383
pub async fn create_signature_processor(&mut self) -> Result<Arc<dyn SignatureProcessor>> {
84+
let (_stop_tx, stop_rx) = self.get_stop_signal_channel().await?;
8485
let signature_processor = SequentialSignatureProcessor::new(
8586
self.build_signature_consumer().await?,
8687
self.get_certifier_service().await?,
88+
stop_rx,
8789
self.root_logger(),
8890
);
8991

mithril-aggregator/src/dependency_injection/builder/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{path::PathBuf, sync::Arc};
88
use tokio::{
99
sync::{
1010
mpsc::{UnboundedReceiver, UnboundedSender},
11-
Mutex,
11+
watch, Mutex,
1212
},
1313
time::Duration,
1414
};
@@ -273,6 +273,9 @@ pub struct DependenciesBuilder {
273273

274274
/// Protocol parameters retriever
275275
pub protocol_parameters_retriever: Option<Arc<dyn ProtocolParametersRetriever>>,
276+
277+
/// Stop signal channel
278+
pub stop_signal_channel: Option<(watch::Sender<()>, watch::Receiver<()>)>,
276279
}
277280

278281
impl DependenciesBuilder {
@@ -335,6 +338,7 @@ impl DependenciesBuilder {
335338
metrics_service: None,
336339
leader_aggregator_client: None,
337340
protocol_parameters_retriever: None,
341+
stop_signal_channel: None,
338342
}
339343
}
340344

mithril-aggregator/src/dependency_injection/builder/support/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
mod compatibility;
66
mod observability;
7+
mod signal;
78
mod sqlite;
89
mod stores;
910
mod upkeep;
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use tokio::sync::watch;
2+
3+
use crate::dependency_injection::{DependenciesBuilder, Result};
4+
use crate::get_dependency;
5+
6+
impl DependenciesBuilder {
7+
/// Builds a stop signal channel
8+
pub async fn build_stop_signal_channel(
9+
&mut self,
10+
) -> Result<(watch::Sender<()>, watch::Receiver<()>)> {
11+
Ok(watch::channel(()))
12+
}
13+
14+
/// Get the stop signal channel
15+
pub async fn get_stop_signal_channel(
16+
&mut self,
17+
) -> Result<(watch::Sender<()>, watch::Receiver<()>)> {
18+
get_dependency!(self.stop_signal_channel)
19+
}
20+
}

0 commit comments

Comments
 (0)