Skip to content

Commit d44367d

Browse files
committed
feat(aggregator): implement a 'SequentialSignatureProcessor' for 'SignatureProcessor' trait
1 parent 852df5b commit d44367d

File tree

1 file changed

+191
-1
lines changed

1 file changed

+191
-1
lines changed

mithril-aggregator/src/services/signature_processor.rs

Lines changed: 191 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
use mithril_common::StdResult;
1+
use std::sync::Arc;
2+
3+
use slog::{error, warn, Logger};
4+
5+
use mithril_common::{logging::LoggerExtensions, StdResult};
6+
use tokio::sync::Mutex;
7+
8+
use super::{CertifierService, SignatureConsumer};
29

310
/// A signature processor which receives signature and processes them.
411
#[cfg_attr(test, mockall::automock)]
@@ -17,3 +24,186 @@ pub trait SignatureProcessor: Sync + Send {
1724
/// Stops the processor. This method should be called to gracefully shut down the processor.
1825
async fn stop(&self) -> StdResult<()>;
1926
}
27+
28+
/// A sequential signature processor receives messages and processes them sequentially
29+
pub struct SequentialSignatureProcessor {
30+
consumer: Arc<dyn SignatureConsumer>,
31+
certifier: Arc<dyn CertifierService>,
32+
logger: Logger,
33+
stop: Mutex<bool>,
34+
}
35+
36+
impl SequentialSignatureProcessor {
37+
/// Creates a new `SignatureProcessor` instance.
38+
pub fn new(
39+
consumer: Arc<dyn SignatureConsumer>,
40+
certifier: Arc<dyn CertifierService>,
41+
logger: Logger,
42+
) -> Self {
43+
Self {
44+
consumer,
45+
certifier,
46+
logger: logger.new_with_component_name::<Self>(),
47+
stop: Mutex::new(false),
48+
}
49+
}
50+
}
51+
52+
#[async_trait::async_trait]
53+
impl SignatureProcessor for SequentialSignatureProcessor {
54+
async fn process_signatures(&self) -> StdResult<()> {
55+
if *self.stop.lock().await {
56+
warn!(self.logger, "Stoped signature processor");
57+
return Ok(());
58+
}
59+
60+
match self.consumer.get_signatures().await {
61+
Ok(signatures) => {
62+
for (signature, signed_entity_type) in signatures {
63+
if let Err(e) = self
64+
.certifier
65+
.register_single_signature(&signed_entity_type, &signature)
66+
.await
67+
{
68+
error!(self.logger, "Error dispatching single signature"; "error" => ?e);
69+
}
70+
}
71+
}
72+
Err(e) => {
73+
error!(self.logger, "Error consuming single signatures"; "error" => ?e);
74+
}
75+
}
76+
77+
Ok(())
78+
}
79+
80+
async fn stop(&self) -> StdResult<()> {
81+
warn!(self.logger, "Stopping signature processor...");
82+
*self.stop.lock().await = true;
83+
84+
Ok(())
85+
}
86+
}
87+
88+
#[cfg(test)]
89+
mod tests {
90+
use anyhow::anyhow;
91+
use mithril_common::{
92+
entities::{Epoch, SignedEntityType},
93+
test_utils::fake_data,
94+
};
95+
use mockall::predicate::eq;
96+
use tokio::time::{sleep, Duration};
97+
98+
use crate::{
99+
services::{MockCertifierService, MockSignatureConsumer, SignatureRegistrationStatus},
100+
test_tools::TestLogger,
101+
};
102+
103+
use super::*;
104+
105+
#[tokio::test]
106+
async fn processor_process_signatures_succeeds() {
107+
let logger = TestLogger::stdout();
108+
let mock_consumer = {
109+
let mut mock_consumer = MockSignatureConsumer::new();
110+
mock_consumer
111+
.expect_get_signatures()
112+
.returning(|| {
113+
Ok(vec![
114+
(
115+
fake_data::single_signature(vec![1, 2, 3]),
116+
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
117+
),
118+
(
119+
fake_data::single_signature(vec![4, 5, 6]),
120+
SignedEntityType::MithrilStakeDistribution(Epoch(2)),
121+
),
122+
])
123+
})
124+
.times(1);
125+
mock_consumer
126+
};
127+
let mock_certifier = {
128+
let mut mock_certifier = MockCertifierService::new();
129+
mock_certifier
130+
.expect_register_single_signature()
131+
.with(
132+
eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
133+
eq(fake_data::single_signature(vec![1, 2, 3])),
134+
)
135+
.returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
136+
.times(1);
137+
mock_certifier
138+
.expect_register_single_signature()
139+
.with(
140+
eq(SignedEntityType::MithrilStakeDistribution(Epoch(2))),
141+
eq(fake_data::single_signature(vec![4, 5, 6])),
142+
)
143+
.returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
144+
.times(1);
145+
146+
mock_certifier
147+
};
148+
let processor = SequentialSignatureProcessor::new(
149+
Arc::new(mock_consumer),
150+
Arc::new(mock_certifier),
151+
logger,
152+
);
153+
154+
processor
155+
.process_signatures()
156+
.await
157+
.expect("Failed to process signatures");
158+
}
159+
160+
#[tokio::test]
161+
async fn processor_run_succeeds() {
162+
let logger = TestLogger::stdout();
163+
let mock_consumer = {
164+
let mut mock_consumer = MockSignatureConsumer::new();
165+
mock_consumer
166+
.expect_get_signatures()
167+
.returning(|| Err(anyhow!("Error consuming signatures")))
168+
.times(1);
169+
mock_consumer
170+
.expect_get_signatures()
171+
.returning(|| {
172+
Ok(vec![(
173+
fake_data::single_signature(vec![1, 2, 3]),
174+
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
175+
)])
176+
})
177+
.times(1);
178+
mock_consumer
179+
.expect_get_signatures()
180+
.returning(|| Ok(vec![]));
181+
mock_consumer
182+
};
183+
let mock_certifier = {
184+
let mut mock_certifier = MockCertifierService::new();
185+
mock_certifier
186+
.expect_register_single_signature()
187+
.with(
188+
eq(SignedEntityType::MithrilStakeDistribution(Epoch(1))),
189+
eq(fake_data::single_signature(vec![1, 2, 3])),
190+
)
191+
.returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
192+
.times(1);
193+
194+
mock_certifier
195+
};
196+
let processor = SequentialSignatureProcessor::new(
197+
Arc::new(mock_consumer),
198+
Arc::new(mock_certifier),
199+
logger,
200+
);
201+
202+
tokio::select!(
203+
_res = processor.run() => {},
204+
_res = sleep(Duration::from_millis(10)) => {
205+
processor.stop().await.expect("Failed to stop processor");
206+
},
207+
);
208+
}
209+
}

0 commit comments

Comments
 (0)