Skip to content

Commit 27adb8f

Browse files
committed
refactor(aggregator): remove stop function in 'SignatureProcessor' trait
And use a watch channel to be notified in the 'SequentialSignatureProcessor' implementation instead.
1 parent af16236 commit 27adb8f

File tree

1 file changed

+39
-45
lines changed

1 file changed

+39
-45
lines changed

mithril-aggregator/src/services/signature_processor.rs

Lines changed: 39 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use slog::{error, warn, Logger};
44

55
use mithril_common::{logging::LoggerExtensions, StdResult};
6-
use tokio::sync::Mutex;
6+
use tokio::{select, sync::watch::Receiver};
77

88
use super::{CertifierService, SignatureConsumer};
99

@@ -15,48 +15,37 @@ pub trait SignatureProcessor: Sync + Send {
1515
async fn process_signatures(&self) -> StdResult<()>;
1616

1717
/// Starts the processor, which will run indefinitely, processing signatures as they arrive.
18-
async fn run(&self) -> StdResult<()> {
19-
loop {
20-
self.process_signatures().await?;
21-
}
22-
}
23-
24-
/// Stops the processor. This method should be called to gracefully shut down the processor.
25-
async fn stop(&self) -> StdResult<()>;
18+
async fn run(&self) -> StdResult<()>;
2619
}
2720

2821
/// A sequential signature processor receives messages and processes them sequentially
2922
pub struct SequentialSignatureProcessor {
3023
consumer: Arc<dyn SignatureConsumer>,
3124
certifier: Arc<dyn CertifierService>,
25+
stop_rx: Receiver<()>,
3226
logger: Logger,
33-
stop: Mutex<bool>,
3427
}
3528

3629
impl SequentialSignatureProcessor {
3730
/// Creates a new `SignatureProcessor` instance.
3831
pub fn new(
3932
consumer: Arc<dyn SignatureConsumer>,
4033
certifier: Arc<dyn CertifierService>,
34+
stop_rx: Receiver<()>,
4135
logger: Logger,
4236
) -> Self {
4337
Self {
4438
consumer,
4539
certifier,
40+
stop_rx,
4641
logger: logger.new_with_component_name::<Self>(),
47-
stop: Mutex::new(false),
4842
}
4943
}
5044
}
5145

5246
#[async_trait::async_trait]
5347
impl SignatureProcessor for SequentialSignatureProcessor {
5448
async fn process_signatures(&self) -> StdResult<()> {
55-
if *self.stop.lock().await {
56-
warn!(self.logger, "Stopped signature processor");
57-
return Ok(());
58-
}
59-
6049
match self.consumer.get_signatures().await {
6150
Ok(signatures) => {
6251
for (signature, signed_entity_type) in signatures {
@@ -77,11 +66,18 @@ impl SignatureProcessor for SequentialSignatureProcessor {
7766
Ok(())
7867
}
7968

80-
async fn stop(&self) -> StdResult<()> {
81-
warn!(self.logger, "Stopping signature processor...");
82-
*self.stop.lock().await = true;
69+
async fn run(&self) -> StdResult<()> {
70+
loop {
71+
let mut stop_rx = self.stop_rx.clone();
72+
select! {
73+
_ = stop_rx.changed() => {
74+
warn!(self.logger, "Stopping signature processor...");
8375

84-
Ok(())
76+
return Ok(());
77+
}
78+
_ = self.process_signatures() => {}
79+
}
80+
}
8581
}
8682
}
8783

@@ -93,10 +89,16 @@ mod tests {
9389
test_utils::fake_data,
9490
};
9591
use mockall::predicate::eq;
96-
use tokio::time::{sleep, Duration};
92+
use tokio::{
93+
sync::watch::channel,
94+
time::{sleep, Duration},
95+
};
9796

9897
use crate::{
99-
services::{MockCertifierService, MockSignatureConsumer, SignatureRegistrationStatus},
98+
services::{
99+
FakeSignatureConsumer, MockCertifierService, MockSignatureConsumer,
100+
SignatureRegistrationStatus,
101+
},
100102
test_tools::TestLogger,
101103
};
102104

@@ -145,9 +147,11 @@ mod tests {
145147

146148
mock_certifier
147149
};
150+
let (_stop_tx, stop_rx) = channel(());
148151
let processor = SequentialSignatureProcessor::new(
149152
Arc::new(mock_consumer),
150153
Arc::new(mock_certifier),
154+
stop_rx,
151155
logger,
152156
);
153157

@@ -160,26 +164,13 @@ mod tests {
160164
#[tokio::test]
161165
async fn processor_run_succeeds() {
162166
let logger = TestLogger::stdout();
163-
let mock_consumer = {
164-
let mut mock_consumer = MockSignatureConsumer::new();
165-
mock_consumer
166-
.expect_get_signatures()
167-
.returning(|| Err(anyhow!("Error consuming signatures")))
168-
.times(1);
169-
mock_consumer
170-
.expect_get_signatures()
171-
.returning(|| {
172-
Ok(vec![(
173-
fake_data::single_signature(vec![1, 2, 3]),
174-
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
175-
)])
176-
})
177-
.times(1);
178-
mock_consumer
179-
.expect_get_signatures()
180-
.returning(|| Ok(vec![]));
181-
mock_consumer
182-
};
167+
let fake_consumer = FakeSignatureConsumer::new(vec![
168+
Err(anyhow!("Error consuming signatures")),
169+
Ok(vec![(
170+
fake_data::single_signature(vec![1, 2, 3]),
171+
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
172+
)]),
173+
]);
183174
let mock_certifier = {
184175
let mut mock_certifier = MockCertifierService::new();
185176
mock_certifier
@@ -193,16 +184,19 @@ mod tests {
193184

194185
mock_certifier
195186
};
187+
let (stop_tx, stop_rx) = channel(());
196188
let processor = SequentialSignatureProcessor::new(
197-
Arc::new(mock_consumer),
189+
Arc::new(fake_consumer),
198190
Arc::new(mock_certifier),
191+
stop_rx,
199192
logger,
200193
);
201194

202195
tokio::select!(
203-
_res = processor.run() => {},
196+
_res = processor.run() => {},
204197
_res = sleep(Duration::from_millis(10)) => {
205-
processor.stop().await.expect("Failed to stop processor");
198+
println!("Stopping signature processor...");
199+
stop_tx.send(()).unwrap();
206200
},
207201
);
208202
}

0 commit comments

Comments
 (0)