Skip to content

Commit c72a519

Browse files
authored
Issue #93 Validator Message propagation (#98)
* validator - message_propagation - MessagePropagator * [test] validator - message_propagation - MessagePropagator * validator - application - HeartbeatSender - use MessagePropagator
1 parent 0b3d0cc commit c72a519

File tree

3 files changed

+162
-5
lines changed

3 files changed

+162
-5
lines changed

validator/src/application.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
pub mod heartbeat;
2+
pub mod message_propagation;
23
pub mod validator;
34
pub mod worker;
5+
6+
pub use self::message_propagation::MessagePropagator;

validator/src/application/heartbeat.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use adapter::{Adapter, AdapterError, BalanceRoot, ChannelId as AdapterChannelId}
77
use domain::validator::message::{Heartbeat, Message, State, TYPE_HEARTBEAT};
88
use domain::{Channel, ChannelId, RepositoryError, ValidatorId};
99

10+
use crate::application::MessagePropagator;
1011
use crate::domain::MessageRepository;
1112

1213
pub struct HeartbeatFactory<A: Adapter> {
@@ -55,6 +56,7 @@ pub struct HeartbeatSender<A: Adapter> {
5556
message_repository: Box<dyn MessageRepository<A::State>>,
5657
adapter: A,
5758
factory: HeartbeatFactory<A>,
59+
propagator: MessagePropagator<A::State>,
5860
// @TODO: Add config value for Heartbeat send frequency
5961
}
6062

@@ -94,12 +96,13 @@ impl<A: Adapter> HeartbeatSender<A> {
9496
// call the HeartbeatFactory and create the new Heartbeat
9597
let heartbeat = await!(self.factory.create(signable_state_root.0))?;
9698

97-
// @TODO: Issue #93 - this should propagate the message to all validators!
98-
// `add()` the heartbeat with the Repository
99+
// Propagate the message to all Validators
100+
// @TODO: Log errors
99101
await!(self
100-
.message_repository
101-
.add(&channel.id, &validator, Message::Heartbeat(heartbeat)))
102-
.map_err(HeartbeatError::Repository)
102+
.propagator
103+
.propagate(&channel, Message::Heartbeat(heartbeat)));
104+
105+
Ok(())
103106
}
104107

105108
fn is_heartbeat_time(&self, latest_heartbeat: &Heartbeat<A::State>) -> bool {
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use std::convert::TryFrom;
2+
use std::error::Error;
3+
use std::fmt;
4+
5+
use domain::validator::message::{Message, State};
6+
use domain::{Channel, RepositoryError, ValidatorId};
7+
8+
use crate::domain::MessageRepository;
9+
10+
pub struct MessagePropagator<S: State> {
11+
pub message_repository: Box<dyn MessageRepository<S>>,
12+
}
13+
14+
#[derive(Debug)]
15+
pub enum PropagationErrorKind {
16+
Repository(RepositoryError),
17+
}
18+
19+
#[derive(Debug)]
20+
pub struct PropagationError {
21+
kind: PropagationErrorKind,
22+
message: String,
23+
}
24+
25+
impl fmt::Display for PropagationError {
26+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27+
write!(f, "{}", self.message)
28+
}
29+
}
30+
31+
impl Error for PropagationError {
32+
fn source(&self) -> Option<&(dyn Error + 'static)> {
33+
match &self.kind {
34+
PropagationErrorKind::Repository(error) => Some(error),
35+
}
36+
}
37+
}
38+
39+
impl From<RepositoryError> for PropagationError {
40+
fn from(error: RepositoryError) -> Self {
41+
Self {
42+
kind: PropagationErrorKind::Repository(error),
43+
message: "Repository call for propagating the message failed".to_string(),
44+
}
45+
}
46+
}
47+
48+
impl<S: State> MessagePropagator<S> {
49+
// @TODO: Make sure we have information for logging the results for particular Validator
50+
pub async fn propagate<'a>(
51+
&'a self,
52+
channel: &'a Channel,
53+
message: Message<S>,
54+
) -> Vec<Result<(), PropagationError>> {
55+
let mut results = Vec::default();
56+
57+
for validator in channel.spec.validators.into_iter() {
58+
// @TODO: Remove once we have ValidatorId in ValidatorDesc
59+
let validator_id = ValidatorId::try_from(validator.id.as_str()).unwrap();
60+
let add_result =
61+
await!(self
62+
.message_repository
63+
.add(&channel.id, &validator_id, message.clone()))
64+
.map_err(Into::into);
65+
results.push(add_result);
66+
}
67+
68+
results
69+
}
70+
}
71+
72+
#[cfg(test)]
73+
mod test {
74+
use std::cell::RefCell;
75+
76+
use futures::future::{ready, FutureExt};
77+
78+
use domain::channel::fixtures::get_channel;
79+
use domain::validator::message::fixtures::{get_reject_state, DummyState};
80+
use domain::validator::message::{Message, MessageType};
81+
use domain::{ChannelId, ValidatorId};
82+
use domain::{RepositoryError, RepositoryFuture};
83+
84+
use crate::application::MessagePropagator;
85+
use crate::domain::MessageRepository;
86+
87+
struct MockMessageRepository<I>
88+
where
89+
I: Iterator<Item = Result<(), RepositoryError>>,
90+
{
91+
add_results: RefCell<I>,
92+
}
93+
94+
impl<I> MessageRepository<DummyState> for MockMessageRepository<I>
95+
where
96+
I: Iterator<Item = Result<(), RepositoryError>>,
97+
{
98+
fn add(
99+
&self,
100+
_channel: &ChannelId,
101+
_validator: &ValidatorId,
102+
_message: Message<DummyState>,
103+
) -> RepositoryFuture<()> {
104+
let result = self
105+
.add_results
106+
.borrow_mut()
107+
.next()
108+
.expect("Whoops, you called add() more than the provided results");
109+
ready(result).boxed()
110+
}
111+
112+
fn latest(
113+
&self,
114+
_channel: &ChannelId,
115+
_from: &ValidatorId,
116+
_types: Option<&[&MessageType]>,
117+
) -> RepositoryFuture<Option<Message<DummyState>>> {
118+
unimplemented!("No need for latest in this Mock")
119+
}
120+
}
121+
122+
#[test]
123+
fn propagates_and_returns_vector_of_results() {
124+
futures::executor::block_on(async {
125+
let add_error = RepositoryError::User;
126+
127+
let iterator = vec![Ok(()), Err(add_error)].into_iter();
128+
let message_repository = MockMessageRepository {
129+
add_results: RefCell::new(iterator),
130+
};
131+
let propagator = MessagePropagator {
132+
message_repository: Box::new(message_repository),
133+
};
134+
135+
let message = get_reject_state(None);
136+
let channel = get_channel("id", &None, None);
137+
138+
let result = await!(propagator.propagate(&channel, Message::RejectState(message)));
139+
140+
assert_eq!(2, result.len());
141+
assert!(result[0].is_ok());
142+
match &result[1] {
143+
Ok(_) => panic!("It should be an error"),
144+
Err(error) => assert_eq!(
145+
"Repository call for propagating the message failed",
146+
error.message
147+
),
148+
}
149+
})
150+
}
151+
}

0 commit comments

Comments
 (0)