Skip to content

Commit 44d52ed

Browse files
authored
Merge branch 'dev' into issue-113-remove-Distribution-struct
2 parents f0b6a35 + 6c427a0 commit 44d52ed

File tree

3 files changed

+45
-2
lines changed

3 files changed

+45
-2
lines changed

validator_worker/src/core/events.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ use primitives::sentry::{AggregateEvents, EventAggregate};
99
use primitives::validator::Accounting;
1010
use primitives::{BalancesMap, BigNum, Channel, DomainError};
1111

12-
#[allow(dead_code)]
13-
fn merge_aggrs(
12+
pub(crate) fn merge_aggrs(
1413
accounting: &Accounting,
1514
aggregates: &[EventAggregate],
1615
channel: &Channel,

validator_worker/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
pub mod error;
77
pub mod follower;
88
pub mod leader;
9+
pub mod producer;
910
pub mod sentry_interface;
1011

1112
pub use self::follower::Follower;

validator_worker/src/producer.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use std::error::Error;
2+
3+
use chrono::{TimeZone, Utc};
4+
5+
use primitives::adapter::Adapter;
6+
use primitives::validator::{Accounting, MessageTypes};
7+
use primitives::{BalancesMap, Channel};
8+
9+
use crate::core::events::merge_aggrs;
10+
use crate::sentry_interface::SentryApi;
11+
12+
pub type Result = std::result::Result<(BalancesMap, Option<MessageTypes>), Box<dyn Error>>;
13+
14+
pub async fn tick<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result {
15+
let validator_msg_resp = await!(iface.get_our_latest_msg("Accounting".to_owned()))?;
16+
17+
let accounting = validator_msg_resp
18+
.msg
19+
.get(0)
20+
.and_then(|accounting| match accounting {
21+
MessageTypes::Accounting(accounting) => Some(accounting.to_owned()),
22+
_ => None,
23+
})
24+
.unwrap_or_else(|| Accounting {
25+
last_event_aggregate: Utc.timestamp(0, 0),
26+
balances_before_fees: Default::default(),
27+
balances: Default::default(),
28+
});
29+
30+
let aggrs = await!(iface.get_event_aggregates(accounting.last_event_aggregate))?;
31+
32+
if !aggrs.events.is_empty() {
33+
// TODO: Log the merge
34+
let (balances, new_accounting) = merge_aggrs(&accounting, &aggrs.events, &iface.channel)?;
35+
36+
let message_types = MessageTypes::Accounting(new_accounting);
37+
iface.propagate(vec![message_types.clone()]);
38+
39+
Ok((balances, Some(message_types)))
40+
} else {
41+
Ok((accounting.balances.clone(), None))
42+
}
43+
}

0 commit comments

Comments
 (0)