Skip to content

Commit 75ab5b0

Browse files
authored
Merge pull request #191 from AdExNetwork/issue-189-channel-load
Issue #189 Channel load
2 parents d52f7eb + 2703d3c commit 75ab5b0

File tree

12 files changed

+224
-28
lines changed

12 files changed

+224
-28
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

primitives/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ version = "0.1.0"
44
authors = ["Lachezar Lechev <[email protected]>, Omidiora Samuel <[email protected]>"]
55
edition = "2018"
66

7+
[features]
8+
postgres = ["postgres-types", "bytes"]
9+
710
[dependencies]
811
# Futures
912
futures-preview = { version = "=0.3.0-alpha.19", features = ["compat"]}
@@ -33,5 +36,8 @@ num-derive = "0.2"
3336
# Fixtures
3437
fake = { version = "^1.3", features = ["chrono"] }
3538
rand = { version = "^0.6" }
39+
# postgres feature
40+
postgres-types = {version = "0.1.0-alpha.1", optional = true}
41+
bytes = { version = "0.4.12", optional = true}
3642
# Other
3743
lazy_static = "1.4.0"

primitives/src/big_num.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,55 @@ where
222222
serializer.serialize_str(&num.to_str_radix(10))
223223
}
224224

225+
#[cfg(feature = "postgres")]
226+
pub mod postgres {
227+
use super::BigNum;
228+
use bytes::BytesMut;
229+
use postgres_types::{FromSql, IsNull, ToSql, Type};
230+
use std::error::Error;
231+
232+
impl<'a> FromSql<'a> for BigNum {
233+
fn from_sql(ty: &Type, raw: &'a [u8]) -> Result<BigNum, Box<dyn Error + Sync + Send>> {
234+
use std::convert::TryInto;
235+
236+
let str_slice = <&str as FromSql>::from_sql(ty, raw)?;
237+
238+
Ok(str_slice.try_into()?)
239+
}
240+
241+
fn accepts(ty: &Type) -> bool {
242+
match *ty {
243+
Type::TEXT | Type::VARCHAR => true,
244+
_ => false,
245+
}
246+
}
247+
}
248+
249+
impl ToSql for BigNum {
250+
fn to_sql(
251+
&self,
252+
ty: &Type,
253+
w: &mut BytesMut,
254+
) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
255+
<String as ToSql>::to_sql(&self.0.to_string(), ty, w)
256+
}
257+
258+
fn accepts(ty: &Type) -> bool {
259+
match *ty {
260+
Type::TEXT | Type::VARCHAR => true,
261+
_ => false,
262+
}
263+
}
264+
265+
fn to_sql_checked(
266+
&self,
267+
ty: &Type,
268+
out: &mut BytesMut,
269+
) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
270+
<String as ToSql>::to_sql_checked(&self.0.to_string(), ty, out)
271+
}
272+
}
273+
}
225274
#[cfg(test)]
226275
mod test {
227276
use super::*;

primitives/src/channel.rs

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,47 @@ use serde_hex::{SerHex, StrictPfx};
99
use crate::big_num::BigNum;
1010
use crate::util::serde::ts_milliseconds_option;
1111
use crate::{AdUnit, EventSubmission, TargetingTag, ValidatorDesc, ValidatorId};
12+
use hex::{FromHex, FromHexError};
13+
use std::ops::Deref;
14+
15+
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Copy, Clone)]
16+
#[serde(transparent)]
17+
pub struct ChannelId(#[serde(with = "SerHex::<StrictPfx>")] [u8; 32]);
18+
19+
impl Deref for ChannelId {
20+
type Target = [u8];
21+
22+
fn deref(&self) -> &[u8] {
23+
&self.0
24+
}
25+
}
26+
27+
impl From<[u8; 32]> for ChannelId {
28+
fn from(array: [u8; 32]) -> Self {
29+
Self(array)
30+
}
31+
}
32+
33+
impl AsRef<[u8]> for ChannelId {
34+
fn as_ref(&self) -> &[u8] {
35+
&self.0
36+
}
37+
}
38+
39+
impl FromHex for ChannelId {
40+
type Error = FromHexError;
41+
42+
fn from_hex<T: AsRef<[u8]>>(hex: T) -> Result<Self, Self::Error> {
43+
let array = hex::FromHex::from_hex(hex.as_ref())?;
44+
45+
Ok(Self(array))
46+
}
47+
}
1248

1349
#[derive(Serialize, Deserialize, Debug, Clone)]
1450
#[serde(rename_all = "camelCase")]
1551
pub struct Channel {
16-
#[serde(with = "SerHex::<StrictPfx>")]
17-
pub id: [u8; 32],
52+
pub id: ChannelId,
1853
pub creator: String,
1954
pub deposit_asset: String,
2055
pub deposit_amount: BigNum,
@@ -158,3 +193,29 @@ impl Error for ChannelError {
158193
None
159194
}
160195
}
196+
197+
#[cfg(feature = "postgres")]
198+
pub mod postgres {
199+
use super::ChannelId;
200+
use hex::FromHex;
201+
use postgres_types::{FromSql, Type};
202+
use std::error::Error;
203+
204+
impl<'a> FromSql<'a> for ChannelId {
205+
fn from_sql(ty: &Type, raw: &'a [u8]) -> Result<Self, Box<dyn Error + Sync + Send>> {
206+
let str_slice = <&str as FromSql>::from_sql(ty, raw)?;
207+
208+
// FromHex::from_hex for fixed-sized arrays will guard against the length of the string!
209+
let id: [u8; 32] = <[u8; 32] as FromHex>::from_hex(str_slice)?;
210+
211+
Ok(ChannelId(id))
212+
}
213+
214+
fn accepts(ty: &Type) -> bool {
215+
match *ty {
216+
Type::TEXT | Type::VARCHAR => true,
217+
_ => false,
218+
}
219+
}
220+
}
221+
}

primitives/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub mod validator;
2929
pub use self::ad_unit::AdUnit;
3030
pub use self::balances_map::BalancesMap;
3131
pub use self::big_num::BigNum;
32-
pub use self::channel::{Channel, ChannelSpec, SpecValidator, SpecValidators};
32+
pub use self::channel::{Channel, ChannelId, ChannelSpec, SpecValidator, SpecValidators};
3333
pub use self::config::Config;
3434
pub use self::event_submission::EventSubmission;
3535
pub use self::targeting_tag::TargetingTag;

primitives/src/sentry.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use crate::validator::{ApproveState, Heartbeat, MessageTypes, NewState};
2-
use crate::{BigNum, Channel};
2+
use crate::{BigNum, Channel, ChannelId};
33
use chrono::{DateTime, Utc};
44
use serde::{Deserialize, Serialize};
5-
use serde_hex::{SerHex, StrictPfx};
65
use std::collections::HashMap;
76

87
#[derive(Serialize, Deserialize, Debug)]
@@ -63,8 +62,7 @@ pub struct Earner {
6362
#[derive(Debug, Serialize, Deserialize)]
6463
#[serde(rename_all = "camelCase")]
6564
pub struct EventAggregate {
66-
#[serde(with = "SerHex::<StrictPfx>")]
67-
pub channel_id: [u8; 32],
65+
pub channel_id: ChannelId,
6866
pub created: DateTime<Utc>,
6967
pub events: HashMap<String, AggregateEvents>,
7068
}

primitives/src/util/tests/prep_db.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
2-
BigNum, Channel, ChannelSpec, EventSubmission, SpecValidators, ValidatorDesc, ValidatorId,
2+
BigNum, Channel, ChannelId, ChannelSpec, EventSubmission, SpecValidators, ValidatorDesc,
3+
ValidatorId,
34
};
45
use chrono::{TimeZone, Utc};
56
use fake::faker::{Faker, Number};
@@ -56,7 +57,7 @@ lazy_static! {
5657
let nonce = BigNum::from(<Faker as Number>::between(100_000_000, 999_999_999));
5758

5859
Channel {
59-
id: <[u8; 32]>::from_hex("061d5e2a67d0a9a10f1c732bca12a676d83f79663a396f7d87b3e30b9b411088").expect("prep_db: failed to deserialize channel id"),
60+
id: ChannelId::from_hex("061d5e2a67d0a9a10f1c732bca12a676d83f79663a396f7d87b3e30b9b411088").expect("prep_db: failed to deserialize channel id"),
6061
creator: "0x033ed90e0fec3f3ea1c9b005c724d704501e0196".to_string(),
6162
deposit_asset: "0x89d24A6b4CcB1B6fAA2625fE562bDD9a23260359".to_string(),
6263
deposit_amount: 1_000.into(),

sentry/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ edition = "2018"
88
# Futures
99
futures-preview = { version = "=0.3.0-alpha.19", features = ["compat"]}
1010
# Primitives
11-
primitives = {path = "../primitives"}
11+
primitives = {path = "../primitives", features = ["postgres"]}
1212
adapter = { version = "0.1", path = "../adapter" }
1313
chrono = { version = "0.4", features = ["serde"] }
1414
hex = "0.3.2"
@@ -21,7 +21,7 @@ regex = "1"
2121
# Database
2222
redis = {version = "0.13.1-alpha.0", features = ["tokio-executor"]}
2323
bb8 = {git = "https://github.com/djc/bb8", branch = "async-await"}
24-
bb8-postgres = {git = "https://github.com/djc/bb8", branch = "async-await"}
24+
bb8-postgres = {git = "https://github.com/djc/bb8", branch = "async-await", features = ["with-chrono-0_4", "with-serde_json-1"]}
2525
# Logger
2626
slog = { version = "^2.2.3" , features = ["max_level_trace"] }
2727
# Serde

sentry/src/db.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ use redis::aio::MultiplexedConnection;
22
use redis::RedisError;
33

44
use bb8::Pool;
5-
use bb8_postgres::{
6-
tokio_postgres::{Error as PostgresError, NoTls},
7-
PostgresConnectionManager,
8-
};
5+
use bb8_postgres::tokio_postgres::NoTls;
6+
use bb8_postgres::PostgresConnectionManager;
97
use lazy_static::lazy_static;
108

9+
pub type DbPool = Pool<PostgresConnectionManager<NoTls>>;
10+
1111
lazy_static! {
1212
static ref REDIS_URL: String =
1313
std::env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1:6379"));
@@ -20,8 +20,7 @@ pub async fn redis_connection() -> Result<MultiplexedConnection, RedisError> {
2020
client.get_multiplexed_tokio_connection().await
2121
}
2222

23-
pub async fn postgres_connection() -> Result<Pool<PostgresConnectionManager<NoTls>>, PostgresError>
24-
{
23+
pub async fn postgres_connection() -> Result<DbPool, bb8_postgres::tokio_postgres::Error> {
2524
let pg_mgr = PostgresConnectionManager::new_from_stringlike(POSTGRES_URL.as_str(), NoTls)?;
2625

2726
Pool::builder().build(pg_mgr).await

sentry/src/lib.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
#![deny(clippy::all)]
22
#![deny(rust_2018_idioms)]
33

4+
use crate::db::DbPool;
45
use crate::middleware::auth;
56
use crate::middleware::cors::{cors, Cors};
6-
use bb8::Pool;
7-
use bb8_postgres::{tokio_postgres::NoTls, PostgresConnectionManager};
87
use hyper::service::{make_service_fn, service_fn};
98
use hyper::{Body, Error, Method, Request, Response, Server, StatusCode};
109
use primitives::adapter::Adapter;
@@ -14,8 +13,10 @@ use slog::{error, info, Logger};
1413

1514
pub mod middleware {
1615
pub mod auth;
16+
pub mod channel;
1717
pub mod cors;
1818
}
19+
1920
pub mod routes {
2021
pub mod channel;
2122
pub mod cfg {
@@ -43,7 +44,7 @@ pub struct Application<A: Adapter> {
4344
adapter: A,
4445
logger: Logger,
4546
redis: MultiplexedConnection,
46-
_postgres: Pool<PostgresConnectionManager<NoTls>>,
47+
pool: DbPool,
4748
_clustered: bool,
4849
port: u16,
4950
config: Config,
@@ -55,7 +56,7 @@ impl<A: Adapter + 'static> Application<A> {
5556
config: Config,
5657
logger: Logger,
5758
redis: MultiplexedConnection,
58-
postgres: Pool<PostgresConnectionManager<NoTls>>,
59+
pool: DbPool,
5960
clustered: bool,
6061
port: u16,
6162
) -> Self {
@@ -64,7 +65,7 @@ impl<A: Adapter + 'static> Application<A> {
6465
config,
6566
logger,
6667
redis,
67-
_postgres: postgres,
68+
pool,
6869
_clustered: clustered,
6970
port,
7071
}
@@ -79,17 +80,19 @@ impl<A: Adapter + 'static> Application<A> {
7980
let adapter_config = (self.adapter.clone(), self.config.clone());
8081
let redis = self.redis.clone();
8182
let logger = self.logger.clone();
83+
let pool = self.pool.clone();
8284
async move {
8385
Ok::<_, Error>(service_fn(move |req| {
8486
let adapter_config = adapter_config.clone();
8587
let redis = redis.clone();
8688
let logger = logger.clone();
89+
let pool = pool.clone();
8790
async move {
8891
Ok::<_, Error>(
8992
handle_routing(
9093
req,
9194
(&adapter_config.0, &adapter_config.1),
92-
redis,
95+
(pool, redis),
9396
&logger,
9497
)
9598
.await,
@@ -125,7 +128,7 @@ where
125128
async fn handle_routing(
126129
req: Request<Body>,
127130
(adapter, config): (&impl Adapter, &Config),
128-
redis: MultiplexedConnection,
131+
(pool, redis): (DbPool, MultiplexedConnection),
129132
logger: &Logger,
130133
) -> Response<Body> {
131134
let headers = match cors(&req) {
@@ -147,7 +150,7 @@ async fn handle_routing(
147150
let mut response = match (req.uri().path(), req.method()) {
148151
("/cfg", &Method::GET) => crate::routes::cfg::return_config(&config),
149152
(route, _) if route.starts_with("/channel") => {
150-
crate::routes::channel::handle_channel_routes(req, adapter).await
153+
crate::routes::channel::handle_channel_routes(req, (&pool, adapter)).await
151154
}
152155
_ => Err(ResponseError::NotFound),
153156
}
@@ -172,7 +175,8 @@ pub fn not_found() -> Response<Body> {
172175
response
173176
}
174177

175-
pub fn bad_request(_: Box<dyn std::error::Error>) -> Response<Body> {
178+
pub fn bad_request(err: Box<dyn std::error::Error>) -> Response<Body> {
179+
println!("{:#?}", err);
176180
let body = Body::from("Bad Request: try again later");
177181
let mut response = Response::new(body);
178182
let status = response.status_mut();

0 commit comments

Comments
 (0)