Skip to content

Commit f2598f1

Browse files
authored
Merge branch 'dev' into validator-worker
2 parents 3234fcb + b893bbd commit f2598f1

File tree

24 files changed

+826
-115
lines changed

24 files changed

+826
-115
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ you need to be running those in order to run the automated tests:
4545

4646
TODO
4747

48+
### Bug
49+
50+
51+
52+
4853
### Run Validator Worker
4954

5055
TODO

adapter/src/ethereum.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,10 @@ mod test {
598598
token_contract
599599
.call(
600600
"setBalanceTo",
601-
(Token::Address(leader_account), Token::Uint(2000.into())),
601+
(
602+
Token::Address(leader_account),
603+
Token::Uint(U256::from(2000 as u64)),
604+
),
602605
leader_account,
603606
Options::default(),
604607
)
@@ -649,6 +652,7 @@ mod test {
649652
nonce: None,
650653
withdraw_period_start: Utc::now() + Duration::days(1),
651654
ad_units: vec![],
655+
pricing_bounds: None,
652656
},
653657
};
654658

primitives/src/analytics.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,42 @@
1+
use crate::ChannelId;
12
use crate::DomainError;
23
use serde::{Deserialize, Serialize};
34

45
pub const ANALYTICS_QUERY_LIMIT: u32 = 200;
56

7+
#[derive(Debug, Serialize, Deserialize)]
8+
#[serde(rename_all = "camelCase")]
9+
pub struct AnalyticsData {
10+
pub time: f64,
11+
pub value: String,
12+
#[serde(default, skip_serializing_if = "Option::is_none")]
13+
pub channel_id: Option<ChannelId>,
14+
}
15+
616
#[derive(Debug, Serialize, Deserialize)]
717
pub struct AnalyticsResponse {
8-
time: u32,
9-
value: String,
18+
pub aggr: Vec<AnalyticsData>,
19+
pub limit: u32,
1020
}
1121

1222
#[cfg(feature = "postgres")]
1323
pub mod postgres {
14-
use super::AnalyticsResponse;
24+
use super::AnalyticsData;
1525
use tokio_postgres::Row;
1626

17-
impl From<&Row> for AnalyticsResponse {
27+
impl From<&Row> for AnalyticsData {
1828
fn from(row: &Row) -> Self {
1929
Self {
2030
time: row.get("time"),
2131
value: row.get("value"),
32+
channel_id: row.try_get("channel_id").ok(),
2233
}
2334
}
2435
}
2536
}
2637

2738
#[derive(Debug, Deserialize)]
39+
#[serde(rename_all = "camelCase")]
2840
pub struct AnalyticsQuery {
2941
#[serde(default = "default_limit")]
3042
pub limit: u32,
@@ -34,6 +46,7 @@ pub struct AnalyticsQuery {
3446
pub metric: String,
3547
#[serde(default = "default_timeframe")]
3648
pub timeframe: String,
49+
pub segment_by_channel: Option<String>,
3750
}
3851

3952
impl AnalyticsQuery {

primitives/src/big_num.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,19 @@ use num_derive::{Num, NumOps, One, Zero};
1010
use serde::{Deserialize, Deserializer, Serialize, Serializer};
1111

1212
#[derive(
13-
Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, NumOps, One, Zero, Num,
13+
Serialize,
14+
Deserialize,
15+
Debug,
16+
Clone,
17+
PartialEq,
18+
Eq,
19+
PartialOrd,
20+
Ord,
21+
NumOps,
22+
One,
23+
Zero,
24+
Num,
25+
Default,
1426
)]
1527
pub struct BigNum(
1628
#[serde(

primitives/src/channel.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,19 @@ pub struct Channel {
6363
pub spec: ChannelSpec,
6464
}
6565

66+
#[derive(Serialize, Deserialize, Debug, Clone)]
67+
pub struct Pricing {
68+
pub max: BigNum,
69+
pub min: BigNum,
70+
}
71+
72+
#[derive(Serialize, Deserialize, Debug, Clone)]
73+
#[serde(rename_all = "UPPERCASE")]
74+
pub struct PricingBounds {
75+
pub impression: Option<Pricing>,
76+
pub click: Option<Pricing>,
77+
}
78+
6679
#[derive(Serialize, Deserialize, Debug, Clone)]
6780
#[serde(rename_all = "camelCase")]
6881
pub struct ChannelSpec {
@@ -73,6 +86,8 @@ pub struct ChannelSpec {
7386
pub max_per_impression: BigNum,
7487
/// Minimum payment offered per impression
7588
pub min_per_impression: BigNum,
89+
// Event pricing bounds
90+
pub pricing_bounds: Option<PricingBounds>,
7691
/// An array of TargetingTag (optional)
7792
#[serde(default, skip_serializing_if = "Vec::is_empty")]
7893
pub targeting: Vec<TargetingTag>,
@@ -270,6 +285,12 @@ pub mod postgres {
270285
accepts!(TEXT, VARCHAR);
271286
}
272287

288+
impl From<&Row> for ChannelId {
289+
fn from(row: &Row) -> Self {
290+
row.get("id")
291+
}
292+
}
293+
273294
impl ToSql for ChannelId {
274295
fn to_sql(
275296
&self,

primitives/src/sentry.rs

Lines changed: 135 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,56 @@
1-
use crate::validator::{ApproveState, Heartbeat, MessageTypes, NewState};
1+
use crate::validator::MessageTypes;
22
use crate::{BigNum, Channel, ChannelId, ValidatorId};
33
use chrono::{DateTime, Utc};
44
use serde::{Deserialize, Serialize};
55
use std::collections::HashMap;
6+
use std::fmt;
7+
use std::hash::Hash;
68

79
#[derive(Serialize, Deserialize, Debug)]
810
#[serde(rename_all = "camelCase")]
911
pub struct LastApproved {
1012
/// NewState can be None if the channel is brand new
1113
pub new_state: Option<NewStateValidatorMessage>,
1214
/// ApproveState can be None if the channel is brand new
13-
pub approved_state: Option<ApproveStateValidatorMessage>,
15+
pub approve_state: Option<ApproveStateValidatorMessage>,
1416
}
1517

1618
#[derive(Serialize, Deserialize, Debug)]
1719
pub struct NewStateValidatorMessage {
18-
pub from: String,
20+
pub from: ValidatorId,
1921
pub received: DateTime<Utc>,
20-
pub msg: NewState,
22+
pub msg: MessageTypes,
2123
}
2224

2325
#[derive(Serialize, Deserialize, Debug)]
2426
pub struct ApproveStateValidatorMessage {
25-
pub from: String,
27+
pub from: ValidatorId,
28+
pub received: DateTime<Utc>,
29+
pub msg: MessageTypes,
30+
}
31+
32+
#[derive(Serialize, Deserialize, Debug)]
33+
pub struct HeartbeatValidatorMessage {
34+
pub from: ValidatorId,
2635
pub received: DateTime<Utc>,
27-
pub msg: ApproveState,
36+
pub msg: MessageTypes,
2837
}
2938

3039
#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
31-
#[derive(Serialize, Deserialize)]
40+
#[derive(Serialize, Deserialize, Clone, Debug)]
3241
pub enum Event {
3342
#[serde(rename_all = "camelCase")]
3443
Impression {
3544
publisher: ValidatorId,
3645
ad_unit: Option<String>,
46+
ad_slot: Option<String>,
47+
referrer: Option<String>,
3748
},
3849
Click {
39-
publisher: String,
50+
publisher: ValidatorId,
51+
ad_unit: Option<String>,
52+
ad_slot: Option<String>,
53+
referrer: Option<String>,
4054
},
4155
ImpressionWithCommission {
4256
earners: Vec<Earner>,
@@ -55,7 +69,37 @@ pub enum Event {
5569
Close,
5670
}
5771

58-
#[derive(Serialize, Deserialize)]
72+
impl Event {
73+
pub fn is_click_event(&self) -> bool {
74+
match *self {
75+
Event::Click { .. } => true,
76+
_ => false,
77+
}
78+
}
79+
80+
pub fn is_impression_event(&self) -> bool {
81+
match *self {
82+
Event::Impression { .. } => true,
83+
_ => false,
84+
}
85+
}
86+
}
87+
88+
impl fmt::Display for Event {
89+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90+
match *self {
91+
Event::Impression { .. } => write!(f, "IMPRESSION"),
92+
Event::Click { .. } => write!(f, "CLICK"),
93+
Event::ImpressionWithCommission { .. } => write!(f, "IMPRESSION_WITH_COMMMISION"),
94+
Event::UpdateImpressionPrice { .. } => write!(f, "UPDATE_IMPRESSION_PRICE"),
95+
Event::Pay { .. } => write!(f, "PAY"),
96+
Event::PauseChannel => write!(f, "PAUSE_CHANNEL"),
97+
Event::Close => write!(f, "CLOSE"),
98+
}
99+
}
100+
}
101+
102+
#[derive(Serialize, Deserialize, Clone, Debug)]
59103
pub struct Earner {
60104
#[serde(rename = "publisher")]
61105
pub address: String,
@@ -89,7 +133,7 @@ pub struct ChannelListResponse {
89133
#[serde(rename_all = "camelCase")]
90134
pub struct LastApprovedResponse {
91135
pub last_approved: Option<LastApproved>,
92-
pub heartbeats: Option<Vec<Heartbeat>>,
136+
pub heartbeats: Option<Vec<HeartbeatValidatorMessage>>,
93137
}
94138

95139
#[derive(Serialize, Deserialize, Debug)]
@@ -116,9 +160,59 @@ pub struct EventAggregateResponse {
116160
pub events: Vec<EventAggregate>,
117161
}
118162

163+
#[derive(Serialize, Deserialize)]
164+
#[serde(rename_all = "camelCase")]
165+
pub struct AdvancedAnalyticsResponse {
166+
pub by_channel_stats: HashMap<ChannelId, HashMap<ChannelReport, HashMap<String, f64>>>,
167+
pub publisher_stats: HashMap<PublisherReport, HashMap<String, f64>>,
168+
}
169+
170+
#[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone)]
171+
#[serde(rename_all = "camelCase")]
172+
pub enum PublisherReport {
173+
AdUnit,
174+
AdSlot,
175+
AdSlotPay,
176+
Country,
177+
Hostname,
178+
}
179+
180+
impl fmt::Display for PublisherReport {
181+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182+
match *self {
183+
PublisherReport::AdUnit => write!(f, "reportPublisherToAdUnit"),
184+
PublisherReport::AdSlot => write!(f, "reportPublisherToAdSlot"),
185+
PublisherReport::AdSlotPay => write!(f, "reportPublisherToAdSlotPay"),
186+
PublisherReport::Country => write!(f, "reportPublisherToCountry"),
187+
PublisherReport::Hostname => write!(f, "reportPublisherToHostname"),
188+
}
189+
}
190+
}
191+
192+
#[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone)]
193+
#[serde(rename_all = "camelCase")]
194+
pub enum ChannelReport {
195+
AdUnit,
196+
Hostname,
197+
HostnamePay,
198+
}
199+
200+
impl fmt::Display for ChannelReport {
201+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202+
match *self {
203+
ChannelReport::AdUnit => write!(f, "reportPublisherToAdUnit"),
204+
ChannelReport::Hostname => write!(f, "reportChannelToHostname"),
205+
ChannelReport::HostnamePay => write!(f, "reportChannelToHostnamePay"),
206+
}
207+
}
208+
}
209+
119210
#[cfg(feature = "postgres")]
120211
mod postgres {
121-
use super::ValidatorMessage;
212+
use super::{
213+
ApproveStateValidatorMessage, HeartbeatValidatorMessage, NewStateValidatorMessage,
214+
ValidatorMessage,
215+
};
122216
use crate::sentry::EventAggregate;
123217
use crate::validator::MessageTypes;
124218
use bytes::BytesMut;
@@ -146,6 +240,36 @@ mod postgres {
146240
}
147241
}
148242

243+
impl From<&Row> for ApproveStateValidatorMessage {
244+
fn from(row: &Row) -> Self {
245+
Self {
246+
from: row.get("from"),
247+
received: row.get("received"),
248+
msg: row.get::<_, Json<MessageTypes>>("msg").0,
249+
}
250+
}
251+
}
252+
253+
impl From<&Row> for NewStateValidatorMessage {
254+
fn from(row: &Row) -> Self {
255+
Self {
256+
from: row.get("from"),
257+
received: row.get("received"),
258+
msg: row.get::<_, Json<MessageTypes>>("msg").0,
259+
}
260+
}
261+
}
262+
263+
impl From<&Row> for HeartbeatValidatorMessage {
264+
fn from(row: &Row) -> Self {
265+
Self {
266+
from: row.get("from"),
267+
received: row.get("received"),
268+
msg: row.get::<_, Json<MessageTypes>>("msg").0,
269+
}
270+
}
271+
}
272+
149273
impl ToSql for MessageTypes {
150274
fn to_sql(
151275
&self,

primitives/src/util/tests/prep_db.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::{
2+
channel::{Pricing, PricingBounds},
23
BigNum, Channel, ChannelId, ChannelSpec, EventSubmission, SpecValidators, ValidatorDesc,
34
ValidatorId,
45
};
@@ -69,7 +70,7 @@ lazy_static! {
6970
title: None,
7071
validators: SpecValidators::new(DUMMY_VALIDATOR_LEADER.clone(), DUMMY_VALIDATOR_FOLLOWER.clone()),
7172
max_per_impression: 10.into(),
72-
min_per_impression: 10.into(),
73+
min_per_impression: 1.into(),
7374
targeting: vec![],
7475
min_targeting_score: None,
7576
event_submission: Some(EventSubmission { allow: vec![] }),
@@ -79,6 +80,7 @@ lazy_static! {
7980
nonce: Some(nonce),
8081
withdraw_period_start: Utc.timestamp_millis(4_073_414_400_000),
8182
ad_units: vec![],
83+
pricing_bounds: Some(PricingBounds {impression: None, click: Some(Pricing { max: 0.into(), min: 0.into()})}),
8284
},
8385
}
8486
};

0 commit comments

Comments
 (0)