Skip to content

Commit 4bf4a2d

Browse files
authored
Issue #74 Validator MemoryMessageRepository (#92)
* validator - main - use the MemoryChannelRepository by default, but leave the API instantiation as well * domain - validator - message - make all fields `pub` & add todo for fixture * [test] validator - persistence - validator - memory - `add()` & 1 case of `latest()` * [test] validator - persistence - validator - memory - `latest` filtering by `type` & `from`
1 parent 21287ed commit 4bf4a2d

File tree

5 files changed

+171
-23
lines changed

5 files changed

+171
-23
lines changed

domain/src/validator/message.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,23 +60,23 @@ pub const ALL_TYPES: [&MessageType; 5] = [
6060
#[derive(Serialize, Deserialize, Debug, Clone)]
6161
#[serde(rename_all = "camelCase")]
6262
pub struct ApproveState<S: State> {
63-
state_root: S::StateRoot,
64-
signature: S::Signature,
65-
is_healthy: bool,
63+
pub state_root: S::StateRoot,
64+
pub signature: S::Signature,
65+
pub is_healthy: bool,
6666
}
6767

6868
#[derive(Serialize, Deserialize, Debug, Clone)]
6969
#[serde(rename_all = "camelCase")]
7070
pub struct NewState<S: State> {
71-
state_root: S::StateRoot,
72-
signature: S::Signature,
73-
balances: BalancesMap,
71+
pub state_root: S::StateRoot,
72+
pub signature: S::Signature,
73+
pub balances: BalancesMap,
7474
}
7575

7676
#[derive(Serialize, Deserialize, Debug, Clone)]
7777
#[serde(rename_all = "camelCase")]
7878
pub struct RejectState {
79-
reason: String,
79+
pub reason: String,
8080
}
8181

8282
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -105,10 +105,10 @@ impl<S: State> Heartbeat<S> {
105105
#[serde(rename_all = "camelCase")]
106106
pub struct Accounting {
107107
#[serde(rename = "last_ev_aggr")]
108-
last_event_aggregate: DateTime<Utc>,
108+
pub last_event_aggregate: DateTime<Utc>,
109109
#[serde(rename = "balances_pre_fees")]
110-
pre_fees: BalancesMap,
111-
balances: BalancesMap,
110+
pub pre_fees: BalancesMap,
111+
pub balances: BalancesMap,
112112
}
113113

114114
#[cfg(any(test, feature = "fixtures"))]

validator/src/application/heartbeat.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl<A: Adapter + State> HeartbeatSender<A> {
6262
let validator = ValidatorId::try_from(self.adapter.config().identity.as_ref()).unwrap();
6363
let latest_future =
6464
self.message_repository
65-
.latest(channel.id, validator, Some(&[&TYPE_HEARTBEAT]));
65+
.latest(&channel.id, &validator, Some(&[&TYPE_HEARTBEAT]));
6666
let latest_heartbeat = await!(latest_future)
6767
.map_err(HeartbeatError::Repository)?
6868
.map(|heartbeat_msg| match heartbeat_msg {

validator/src/domain/validator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ pub mod repository {
2727

2828
fn latest(
2929
&self,
30-
channel_id: ChannelId,
31-
from: ValidatorId,
30+
channel_id: &ChannelId,
31+
from: &ValidatorId,
3232
types: Option<&[&MessageType]>,
3333
) -> RepositoryFuture<Option<Message<S>>>;
3434
}

validator/src/infrastructure/persistence/validator/memory.rs

Lines changed: 153 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ impl State for MemoryState {
2020
#[derive(Clone)]
2121
pub struct MemoryMessage {
2222
pub message: Message<MemoryState>,
23-
pub channel_id: ChannelId,
24-
pub validator_id: ValidatorId,
23+
pub channel: ChannelId,
24+
pub owner: ValidatorId,
2525
}
2626

2727
pub struct MemoryMessageRepository {
@@ -46,8 +46,8 @@ impl MessageRepository<MemoryState> for MemoryMessageRepository {
4646
fn add(&self, channel_id: ChannelId, message: Message<MemoryState>) -> RepositoryFuture<()> {
4747
let message = MemoryMessage {
4848
message,
49-
channel_id,
50-
validator_id: self.self_validator_id.clone(),
49+
channel: channel_id,
50+
owner: self.self_validator_id.clone(),
5151
};
5252
// this should never match against the new record, that's why always pass false.
5353
ready(self.inner.add(&false, message).map_err(Into::into)).boxed()
@@ -58,15 +58,15 @@ impl MessageRepository<MemoryState> for MemoryMessageRepository {
5858
/// If not `types` are provided, it will match against all types.
5959
fn latest(
6060
&self,
61-
channel_id: ChannelId,
62-
from: ValidatorId,
61+
channel_id: &ChannelId,
62+
from: &ValidatorId,
6363
types: Option<&[&MessageType]>,
6464
) -> RepositoryFuture<Option<Message<MemoryState>>> {
6565
let latest = self
6666
.inner
6767
.list_all(|mem_msg| {
68-
let is_from = mem_msg.validator_id == from;
69-
let is_channel_id = mem_msg.channel_id == channel_id;
68+
let is_from = &mem_msg.owner == from;
69+
let is_channel_id = &mem_msg.channel == channel_id;
7070
// if there are no types provided, it should match every type, i.e. default `true` for `None`
7171
let is_in_types = types.map_or(true, |message_types| {
7272
mem_msg.message.is_types(message_types)
@@ -82,3 +82,148 @@ impl MessageRepository<MemoryState> for MemoryMessageRepository {
8282
ready(latest.map_err(Into::into)).boxed()
8383
}
8484
}
85+
86+
#[cfg(test)]
87+
mod test {
88+
use std::convert::TryFrom;
89+
90+
use domain::fixtures::get_channel_id;
91+
use domain::validator::message::fixtures::{get_heartbeat, get_reject_state};
92+
93+
use super::*;
94+
use domain::validator::message::TYPE_REJECT;
95+
96+
fn get_reject_memory_message(
97+
channel: &ChannelId,
98+
owner: &ValidatorId,
99+
reject_reason: Option<String>,
100+
) -> MemoryMessage {
101+
MemoryMessage {
102+
channel: channel.clone(),
103+
owner: owner.clone(),
104+
message: Message::RejectState(get_reject_state(reject_reason)),
105+
}
106+
}
107+
108+
#[test]
109+
fn adds_message_with_the_self_validator_id() {
110+
futures::executor::block_on(async {
111+
let validator_id = ValidatorId::try_from("identity").expect("ValidatorId failed");
112+
let repo = MemoryMessageRepository::new(&[], validator_id.clone());
113+
114+
let message = get_reject_state(None);
115+
let channel_id = get_channel_id("channel id");
116+
117+
await!(repo.add(channel_id, Message::RejectState(message)))
118+
.expect("Adding a message failed");
119+
120+
let list_all = repo
121+
.inner
122+
.list_all(|m| Some(m.clone()))
123+
.expect("Listing all Messages failed");
124+
125+
assert_eq!(1, list_all.len());
126+
assert_eq!(validator_id, list_all[0].owner);
127+
assert_eq!(channel_id, list_all[0].channel);
128+
})
129+
}
130+
131+
#[test]
132+
fn getting_latest_message_with_self_validator_id() {
133+
futures::executor::block_on(async {
134+
let validator_id = ValidatorId::try_from("identity").expect("ValidatorId failed");
135+
let channel_id = get_channel_id("channel id");
136+
137+
let repo = MemoryMessageRepository::new(&[], validator_id.clone());
138+
// add an initial Reject message for checking latest ordering
139+
let init_message =
140+
Message::RejectState(get_reject_state(Some("Initial Message".to_string())));
141+
await!(repo.add(channel_id.clone(), init_message))
142+
.expect("Adding the initial message failed");
143+
144+
let new_message = Message::RejectState(get_reject_state(Some("my reason".to_string())));
145+
await!(repo.add(channel_id.clone(), new_message)).expect("Adding a message failed");
146+
147+
let latest_any = await!(repo.latest(&channel_id, &validator_id, None))
148+
.expect("Getting latest Message failed");
149+
150+
match latest_any.expect("There was no latest message returned") {
151+
Message::RejectState(reject_state) => assert_eq!("my reason", reject_state.reason),
152+
_ => panic!("A Reject state message was not returned as latest message!"),
153+
}
154+
})
155+
}
156+
157+
#[test]
158+
fn getting_latest_message_filters_by_from() {
159+
futures::executor::block_on(async {
160+
let self_validator_id = ValidatorId::try_from("identity").expect("ValidatorId failed");
161+
let channel = get_channel_id("channel id");
162+
163+
let from =
164+
ValidatorId::try_from("another validator").expect("ValidatorId for form failed");
165+
let init_message = get_reject_memory_message(&channel, &self_validator_id, None);
166+
167+
let repo = MemoryMessageRepository::new(&[init_message], self_validator_id.clone());
168+
169+
let result =
170+
await!(repo.latest(&channel, &from, None)).expect("Fetching latest message failed");
171+
172+
assert!(
173+
result.is_none(),
174+
"A latest message was found, even though it has different ValidatorId"
175+
);
176+
})
177+
}
178+
179+
#[test]
180+
fn getting_latest_message_filters_by_channel_id() {
181+
futures::executor::block_on(async {
182+
let self_validator_id = ValidatorId::try_from("identity").expect("ValidatorId failed");
183+
let channel = get_channel_id("channel 1");
184+
let from_channel = get_channel_id("channel 2");
185+
186+
let init_message = get_reject_memory_message(&channel, &self_validator_id, None);
187+
188+
let repo = MemoryMessageRepository::new(&[init_message], self_validator_id.clone());
189+
190+
let result = await!(repo.latest(&from_channel, &self_validator_id, None))
191+
.expect("Fetching latest message failed");
192+
193+
assert!(
194+
result.is_none(),
195+
"A latest message was found, even though it has different ChannelId"
196+
);
197+
})
198+
}
199+
200+
#[test]
201+
fn getting_latest_message_filters_by_types() {
202+
futures::executor::block_on(async {
203+
let self_validator_id = ValidatorId::try_from("identity").expect("ValidatorId failed");
204+
let channel = get_channel_id("channel 1");
205+
206+
let init_messages = [
207+
get_reject_memory_message(&channel, &self_validator_id, Some("reason".to_string())),
208+
MemoryMessage {
209+
message: Message::Heartbeat(get_heartbeat::<MemoryState>(
210+
"state".to_string(),
211+
"signature".to_string(),
212+
)),
213+
channel: channel.clone(),
214+
owner: self_validator_id.clone(),
215+
},
216+
];
217+
218+
let repo = MemoryMessageRepository::new(&init_messages, self_validator_id.clone());
219+
220+
let result = await!(repo.latest(&channel, &self_validator_id, Some(&[&TYPE_REJECT])))
221+
.expect("Fetching latest message failed");
222+
223+
match result.expect("There was no latest message returned") {
224+
Message::RejectState(reject_state) => assert_eq!("reason", reject_state.reason),
225+
_ => panic!("Filtering by type didn't return the expected RejectState message!"),
226+
}
227+
})
228+
}
229+
}

validator/src/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,18 @@ fn run(is_single_tick: bool, adapter: impl Adapter) {
8282
use validator::application::validator::{Follower, Leader};
8383
use validator::application::worker::{InfiniteWorker, TickWorker};
8484
use validator::domain::worker::Worker;
85-
use validator::infrastructure::persistence::channel::api::ApiChannelRepository;
85+
use validator::infrastructure::persistence::channel::{
86+
ApiChannelRepository, MemoryChannelRepository,
87+
};
8688
use validator::infrastructure::sentry::SentryApi;
8789

8890
let sentry = SentryApi {
8991
client: Client::new(),
9092
sentry_url: CONFIG.sentry_url.clone(),
9193
};
9294

93-
let channel_repository = Arc::new(ApiChannelRepository { sentry });
95+
let _channel_repository = Arc::new(ApiChannelRepository { sentry });
96+
let channel_repository = Arc::new(MemoryChannelRepository::new(&[]));
9497

9598
let tick_worker = TickWorker {
9699
leader: Leader {},

0 commit comments

Comments
 (0)