Skip to content

Commit c8ce73b

Browse files
authored
Merge pull request #281 from AdExNetwork/issue-278-propagation-logging
Issue #278 Propagation logging
2 parents 03ba8dd + 03d4d3b commit c8ce73b

File tree

14 files changed

+250
-149
lines changed

14 files changed

+250
-149
lines changed

adapter/src/ethereum.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ fn hash_message(message: &[u8]) -> [u8; 32] {
341341
}
342342

343343
// Ethereum Web Tokens
344-
#[derive(Clone, Debug, Serialize, Deserialize)]
344+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
345345
pub struct Payload {
346346
pub id: String,
347347
pub era: i64,
@@ -350,7 +350,7 @@ pub struct Payload {
350350
pub identity: Option<ValidatorId>,
351351
}
352352

353-
#[derive(Clone, Debug)]
353+
#[derive(Clone, Debug, PartialEq, Eq)]
354354
pub struct VerifyPayload {
355355
pub from: ValidatorId,
356356
pub payload: Payload,
@@ -704,15 +704,23 @@ mod test {
704704
let expected = "eyJ0eXBlIjoiSldUIiwiYWxnIjoiRVRIIn0.eyJpZCI6ImF3ZXNvbWVWYWxpZGF0b3IiLCJlcmEiOjEwMDAwMCwiYWRkcmVzcyI6IjB4MmJEZUFGQUU1Mzk0MDY2OURhQTZGNTE5MzczZjY4NmMxZjNkMzM5MyJ9.gGw_sfnxirENdcX5KJQWaEt4FVRvfEjSLD4f3OiPrJIltRadeYP2zWy9T2GYcK5xxD96vnqAw4GebAW7rMlz4xw";
705705
assert_eq!(response, expected, "generated wrong ewt signature");
706706

707-
let expected_verification_response = r#"VerifyPayload { from: ValidatorId([43, 222, 175, 174, 83, 148, 6, 105, 218, 166, 245, 25, 55, 63, 104, 108, 31, 61, 51, 147]), payload: Payload { id: "awesomeValidator", era: 100000, address: "0x2bDeAFAE53940669DaA6F519373f686c1f3d3393", identity: None } }"#;
707+
let expected_verification_response = VerifyPayload {
708+
from: ValidatorId::try_from("0x2bdeafae53940669daa6f519373f686c1f3d3393")
709+
.expect("Valid ValidatorId"),
710+
payload: Payload {
711+
id: "awesomeValidator".to_string(),
712+
era: 100000,
713+
address: "0x2bDeAFAE53940669DaA6F519373f686c1f3d3393".to_string(),
714+
identity: None,
715+
},
716+
};
708717

709718
let parts: Vec<&str> = expected.split('.').collect();
710719
let verification =
711720
ewt_verify(parts[0], parts[1], parts[2]).expect("Failed to verify ewt token");
712721

713722
assert_eq!(
714-
expected_verification_response,
715-
format!("{:?}", verification),
723+
expected_verification_response, verification,
716724
"generated wrong verification payload"
717725
);
718726
}

primitives/src/channel.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{AdUnit, EventSubmission, TargetingTag, ValidatorDesc, ValidatorId};
1111
use hex::{FromHex, FromHexError};
1212
use std::ops::Deref;
1313

14-
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Copy, Clone, Hash)]
14+
#[derive(Serialize, Deserialize, PartialEq, Eq, Copy, Clone, Hash)]
1515
#[serde(transparent)]
1616
pub struct ChannelId(
1717
#[serde(
@@ -21,6 +21,12 @@ pub struct ChannelId(
2121
[u8; 32],
2222
);
2323

24+
impl fmt::Debug for ChannelId {
25+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26+
write!(f, "ChannelId({})", self)
27+
}
28+
}
29+
2430
fn channel_id_from_str<'de, D>(deserializer: D) -> Result<[u8; 32], D::Error>
2531
where
2632
D: Deserializer<'de>,
@@ -151,23 +157,19 @@ pub struct ChannelSpec {
151157
/// A (leader, follower) tuple
152158
pub struct SpecValidators(ValidatorDesc, ValidatorDesc);
153159

160+
#[derive(Debug)]
154161
pub enum SpecValidator<'a> {
155162
Leader(&'a ValidatorDesc),
156163
Follower(&'a ValidatorDesc),
157-
None,
158164
}
159165

160166
impl<'a> SpecValidator<'a> {
161-
pub fn is_some(&self) -> bool {
162-
match &self {
163-
SpecValidator::None => false,
164-
_ => true,
167+
pub fn validator(&self) -> &'a ValidatorDesc {
168+
match self {
169+
SpecValidator::Leader(validator) => validator,
170+
SpecValidator::Follower(validator) => validator,
165171
}
166172
}
167-
168-
pub fn is_none(&self) -> bool {
169-
!self.is_some()
170-
}
171173
}
172174

173175
impl SpecValidators {
@@ -183,13 +185,13 @@ impl SpecValidators {
183185
&self.1
184186
}
185187

186-
pub fn find(&self, validator_id: &ValidatorId) -> SpecValidator<'_> {
188+
pub fn find(&self, validator_id: &ValidatorId) -> Option<SpecValidator<'_>> {
187189
if &self.leader().id == validator_id {
188-
SpecValidator::Leader(&self.leader())
190+
Some(SpecValidator::Leader(&self.leader()))
189191
} else if &self.follower().id == validator_id {
190-
SpecValidator::Follower(&self.follower())
192+
Some(SpecValidator::Follower(&self.follower()))
191193
} else {
192-
SpecValidator::None
194+
None
193195
}
194196
}
195197

primitives/src/channel_validator.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ pub trait ChannelValidator {
1414
) -> Result<(), ChannelError> {
1515
let adapter_channel_validator = match channel.spec.validators.find(validator_identity) {
1616
// check if the channel validators include our adapter identity
17-
SpecValidator::None => return Err(ChannelError::AdapterNotIncluded),
18-
SpecValidator::Leader(validator) | SpecValidator::Follower(validator) => validator,
17+
None => return Err(ChannelError::AdapterNotIncluded),
18+
Some(SpecValidator::Leader(validator)) | Some(SpecValidator::Follower(validator)) => {
19+
validator
20+
}
1921
};
2022

2123
if channel.valid_until < Utc::now() {

primitives/src/validator.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub enum ValidatorError {
1414
InvalidTransition,
1515
}
1616

17-
#[derive(Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
17+
#[derive(Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1818
#[serde(transparent)]
1919
pub struct ValidatorId(
2020
#[serde(
@@ -24,6 +24,12 @@ pub struct ValidatorId(
2424
[u8; 20],
2525
);
2626

27+
impl fmt::Debug for ValidatorId {
28+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29+
write!(f, "ValidatorId({})", self.to_hex_prefix_string())
30+
}
31+
}
32+
2733
fn validator_id_from_str<'de, D>(deserializer: D) -> Result<[u8; 20], D::Error>
2834
where
2935
D: Deserializer<'de>,

sentry/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,6 @@ async fn channels_router<A: Adapter + 'static>(
252252
) -> Result<Response<Body>, ResponseError> {
253253
let (path, method) = (req.uri().path().to_owned(), req.method());
254254

255-
// example with
256-
// @TODO remove later
257255
// regex matching for routes with params
258256
if let (Some(caps), &Method::GET) = (LAST_APPROVED_BY_CHANNEL_ID.captures(&path), method) {
259257
let param = RouteParams(vec![caps

sentry/src/routes/channel.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use futures::future::try_join_all;
1212
use hex::FromHex;
1313
use hyper::{Body, Request, Response};
1414
use primitives::adapter::Adapter;
15-
use primitives::channel::SpecValidator;
1615
use primitives::sentry::{Event, LastApproved, LastApprovedResponse, SuccessResponse};
1716
use primitives::validator::MessageTypes;
1817
use primitives::{Channel, ChannelId};
@@ -235,7 +234,7 @@ pub async fn create_validator_messages<A: Adapter + 'static>(
235234
.ok_or_else(|| ResponseError::BadRequest("missing messages body".to_string()))?;
236235

237236
match channel.spec.validators.find(&session.uid) {
238-
SpecValidator::None => Err(ResponseError::Unauthorized),
237+
None => Err(ResponseError::Unauthorized),
239238
_ => {
240239
try_join_all(messages.iter().map(|message| {
241240
insert_validator_messages(&app.pool, &channel, &session.uid, &message)

validator_worker/src/error.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ pub enum TickError {
1111
impl fmt::Display for TickError {
1212
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1313
match self {
14-
TickError::TimedOut(err) => write!(f, "Tick timed out - {}", err),
15-
TickError::Tick(err) => write!(f, "Tick error - {}", err),
14+
TickError::TimedOut(err) => write!(f, "Tick timed out ({})", err),
15+
TickError::Tick(err) => write!(f, "Tick: {}", err),
1616
}
1717
}
1818
}
@@ -31,17 +31,13 @@ impl<AE: AdapterErrorKind> fmt::Display for Error<AE> {
3131
use Error::*;
3232

3333
match self {
34-
SentryApi(err) => write!(f, "Sentry Api error - {}", err),
35-
LeaderTick(channel_id, err) => write!(
36-
f,
37-
"Error for Leader tick of Channel ({}) - {}",
38-
channel_id, err
39-
),
40-
FollowerTick(channel_id, err) => write!(
41-
f,
42-
"Error for Follower tick of Channel ({}) - {}",
43-
channel_id, err
44-
),
34+
SentryApi(err) => write!(f, "Sentry Api: {}", err),
35+
LeaderTick(channel_id, err) => {
36+
write!(f, "Error for Leader tick of {:#?}: {}", channel_id, err)
37+
}
38+
FollowerTick(channel_id, err) => {
39+
write!(f, "Error for Follower tick of {:#?}: {}", channel_id, err)
40+
}
4541
}
4642
}
4743
}

validator_worker/src/follower.rs

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,44 @@
11
use std::error::Error;
22

3-
use primitives::adapter::Adapter;
3+
use primitives::adapter::{Adapter, AdapterErrorKind};
44
use primitives::validator::{ApproveState, MessageTypes, NewState, RejectState};
55
use primitives::BalancesMap;
66

77
use crate::core::follower_rules::{get_health, is_valid_transition};
8-
use crate::heartbeat::heartbeat;
9-
use crate::sentry_interface::SentryApi;
8+
use crate::heartbeat::{heartbeat, HeartbeatStatus};
9+
use crate::sentry_interface::{PropagationResult, SentryApi};
1010
use crate::{get_state_root_hash, producer};
1111
use chrono::Utc;
1212

13-
enum InvalidNewState {
13+
#[derive(Debug)]
14+
pub enum InvalidNewState {
1415
RootHash,
1516
Signature,
1617
Transition,
1718
Health,
1819
}
1920

20-
enum NewStateResult {
21-
Ok,
22-
Err(InvalidNewState),
21+
#[derive(Debug)]
22+
pub enum ApproveStateResult<AE: AdapterErrorKind> {
23+
Sent(Vec<PropagationResult<AE>>),
24+
/// Conditions for handling the new state haven't been met
25+
NotSent,
26+
Err {
27+
new_state: InvalidNewState,
28+
reject_state_propagation: Vec<PropagationResult<AE>>,
29+
},
2330
}
2431

25-
pub async fn tick<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result<(), Box<dyn Error>> {
32+
#[derive(Debug)]
33+
pub struct TickStatus<AE: AdapterErrorKind> {
34+
pub heartbeat: HeartbeatStatus<AE>,
35+
pub approve_state: ApproveStateResult<AE>,
36+
pub producer_tick: producer::TickStatus<AE>,
37+
}
38+
39+
pub async fn tick<A: Adapter + 'static>(
40+
iface: &SentryApi<A>,
41+
) -> Result<TickStatus<A::AdapterError>, Box<dyn Error>> {
2642
let from = &iface.channel.spec.validators.leader().id;
2743
let new_msg_response = iface.get_latest_msg(from, &["NewState"]).await?;
2844
let new_msg = match new_msg_response {
@@ -42,21 +58,32 @@ pub async fn tick<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result<(), Box<
4258

4359
let latest_is_responded_to = match (&new_msg, &our_latest_msg_state_root) {
4460
(Some(new_msg), Some(state_root)) => &new_msg.state_root == state_root,
45-
(_, _) => false,
61+
_ => false,
4662
};
4763

48-
let (balances, _) = producer::tick(&iface).await?;
49-
if let (Some(new_state), false) = (new_msg, latest_is_responded_to) {
50-
on_new_state(&iface, &balances, &new_state).await?;
51-
}
52-
heartbeat(&iface, balances).await.map(|_| ())
64+
let producer_tick = producer::tick(&iface).await?;
65+
let balances = match &producer_tick {
66+
producer::TickStatus::AccountingSent { balances, .. } => balances,
67+
producer::TickStatus::AccountingNotSent(balances) => balances,
68+
};
69+
let approve_state_result = if let (Some(new_state), false) = (new_msg, latest_is_responded_to) {
70+
on_new_state(&iface, &balances, &new_state).await?
71+
} else {
72+
ApproveStateResult::NotSent
73+
};
74+
75+
Ok(TickStatus {
76+
heartbeat: heartbeat(&iface, balances).await?,
77+
approve_state: approve_state_result,
78+
producer_tick,
79+
})
5380
}
5481

5582
async fn on_new_state<'a, A: Adapter + 'static>(
5683
iface: &'a SentryApi<A>,
5784
balances: &'a BalancesMap,
5885
new_state: &'a NewState,
59-
) -> Result<NewStateResult, Box<dyn Error>> {
86+
) -> Result<ApproveStateResult<A::AdapterError>, Box<dyn Error>> {
6087
let proposed_balances = new_state.balances.clone();
6188
let proposed_state_root = new_state.state_root.clone();
6289
if proposed_state_root != hex::encode(get_state_root_hash(&iface, &proposed_balances)?) {
@@ -96,22 +123,22 @@ async fn on_new_state<'a, A: Adapter + 'static>(
96123
let health_threshold = u64::from(iface.config.health_threshold_promilles);
97124
let is_healthy = health >= health_threshold;
98125

99-
iface
126+
let propagation_result = iface
100127
.propagate(&[&MessageTypes::ApproveState(ApproveState {
101128
state_root: proposed_state_root,
102129
signature,
103130
is_healthy,
104131
})])
105132
.await;
106133

107-
Ok(NewStateResult::Ok)
134+
Ok(ApproveStateResult::Sent(propagation_result))
108135
}
109136

110137
async fn on_error<'a, A: Adapter + 'static>(
111138
iface: &'a SentryApi<A>,
112139
new_state: &'a NewState,
113140
status: InvalidNewState,
114-
) -> NewStateResult {
141+
) -> ApproveStateResult<A::AdapterError> {
115142
use InvalidNewState::*;
116143
let reason = match &status {
117144
RootHash => "InvalidRootHash",
@@ -121,7 +148,7 @@ async fn on_error<'a, A: Adapter + 'static>(
121148
}
122149
.to_string();
123150

124-
iface
151+
let reject_state_propagation = iface
125152
.propagate(&[&MessageTypes::RejectState(RejectState {
126153
reason,
127154
state_root: new_state.state_root.clone(),
@@ -132,5 +159,8 @@ async fn on_error<'a, A: Adapter + 'static>(
132159
})])
133160
.await;
134161

135-
NewStateResult::Err(status)
162+
ApproveStateResult::Err {
163+
reject_state_propagation,
164+
new_state: status,
165+
}
136166
}

0 commit comments

Comments
 (0)