Skip to content

Commit 1c825a8

Browse files
committed
issue #123 producer
1 parent 36fc543 commit 1c825a8

File tree

3 files changed

+43
-2
lines changed

3 files changed

+43
-2
lines changed

validator_worker/src/core/events.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ use primitives::sentry::{AggregateEvents, EventAggregate};
99
use primitives::validator::Accounting;
1010
use primitives::{BalancesMap, BigNum, Channel, DomainError};
1111

12-
#[allow(dead_code)]
13-
fn merge_aggrs(
12+
pub(crate) fn merge_aggrs(
1413
accounting: &Accounting,
1514
aggregates: &[EventAggregate],
1615
channel: &Channel,

validator_worker/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
pub mod error;
77
pub mod follower;
88
pub mod leader;
9+
pub mod producer;
910
pub mod sentry_interface;
1011

1112
pub use self::follower::Follower;

validator_worker/src/producer.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::error::Error;
2+
3+
use chrono::{TimeZone, Utc};
4+
5+
use primitives::{BalancesMap, Channel};
6+
use primitives::adapter::Adapter;
7+
use primitives::validator::{Accounting, MessageTypes};
8+
9+
use crate::core::events::merge_aggrs;
10+
use crate::sentry_interface::SentryApi;
11+
12+
pub type Result = std::result::Result<(BalancesMap, Option<MessageTypes>), Box<dyn Error>>;
13+
14+
pub async fn tick<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result {
15+
let validator_msg_resp = await!(iface.get_our_latest_msg("Accounting".to_owned()))?;
16+
17+
let accounting = validator_msg_resp.msg.get(0).and_then(|accounting| match accounting {
18+
MessageTypes::Accounting(accounting) => Some(accounting.to_owned()),
19+
_ => None,
20+
}).unwrap_or_else(|| {
21+
Accounting {
22+
last_event_aggregate: Utc.timestamp(0, 0),
23+
balances_before_fees: Default::default(),
24+
balances: Default::default(),
25+
}
26+
});
27+
28+
let aggrs = await!(iface.get_event_aggregates(accounting.last_event_aggregate))?;
29+
30+
if aggrs.events.len() > 0 {
31+
// TODO: Log the merge
32+
let (balances, new_accounting) = merge_aggrs(&accounting, &aggrs.events, &iface.channel)?;
33+
34+
let message_types = MessageTypes::Accounting(new_accounting);
35+
iface.propagate(vec![message_types.clone()]);
36+
37+
Ok((balances, Some(message_types)))
38+
} else {
39+
Ok((accounting.balances.clone(), None))
40+
}
41+
}

0 commit comments

Comments
 (0)