Skip to content

Commit deafe77

Browse files
authored
Merge pull request #198 from AdExNetwork/issue-189-channel-if-functions
Issue #189 channel if functions
2 parents ddadaab + d7db62f commit deafe77

File tree

11 files changed

+216
-90
lines changed

11 files changed

+216
-90
lines changed

adapter/src/dummy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl Adapter for DummyAdapter {
6464
}
6565

6666
fn validate_channel(&self, channel: &Channel) -> AdapterResult<bool> {
67-
match DummyAdapter::is_channel_valid(&self.config, channel) {
67+
match DummyAdapter::is_channel_valid(&self.config, self.whoami(), channel) {
6868
Ok(_) => Ok(true),
6969
Err(e) => Err(AdapterError::InvalidChannel(e.to_string())),
7070
}

adapter/src/ethereum.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl Adapter for EthereumAdapter {
123123

124124
fn validate_channel(&self, channel: &Channel) -> AdapterResult<bool> {
125125
// check if channel is valid
126-
if let Err(e) = EthereumAdapter::is_channel_valid(&self.config, channel) {
126+
if let Err(e) = EthereumAdapter::is_channel_valid(&self.config, self.whoami(), channel) {
127127
return Err(AdapterError::InvalidChannel(e.to_string()));
128128
}
129129

primitives/src/channel.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl FromHex for ChannelId {
4040
type Error = FromHexError;
4141

4242
fn from_hex<T: AsRef<[u8]>>(hex: T) -> Result<Self, Self::Error> {
43-
let array = hex::FromHex::from_hex(hex.as_ref())?;
43+
let array = hex::FromHex::from_hex(hex)?;
4444

4545
Ok(Self(array))
4646
}
@@ -200,24 +200,9 @@ pub mod postgres {
200200
use super::{Channel, ChannelSpec};
201201
use bytes::BytesMut;
202202
use hex::FromHex;
203-
use postgres_types::{FromSql, IsNull, ToSql, Type};
203+
use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, Json, ToSql, Type};
204204
use std::error::Error;
205-
use tokio_postgres::{types::Json, Row};
206-
207-
impl<'a> FromSql<'a> for ChannelId {
208-
fn from_sql(ty: &Type, raw: &'a [u8]) -> Result<Self, Box<dyn Error + Sync + Send>> {
209-
let str_slice = <&str as FromSql>::from_sql(ty, raw)?;
210-
211-
Ok(ChannelId::from_hex(&str_slice[2..])?)
212-
}
213-
214-
fn accepts(ty: &Type) -> bool {
215-
match *ty {
216-
Type::TEXT | Type::VARCHAR => true,
217-
_ => false,
218-
}
219-
}
220-
}
205+
use tokio_postgres::Row;
221206

222207
impl From<&Row> for Channel {
223208
fn from(row: &Row) -> Self {
@@ -232,6 +217,16 @@ pub mod postgres {
232217
}
233218
}
234219

220+
impl<'a> FromSql<'a> for ChannelId {
221+
fn from_sql(ty: &Type, raw: &'a [u8]) -> Result<Self, Box<dyn Error + Sync + Send>> {
222+
let str_slice = <&str as FromSql>::from_sql(ty, raw)?;
223+
224+
Ok(ChannelId::from_hex(&str_slice[2..])?)
225+
}
226+
227+
accepts!(TEXT, VARCHAR);
228+
}
229+
235230
impl ToSql for ChannelId {
236231
fn to_sql(
237232
&self,
@@ -247,14 +242,19 @@ pub mod postgres {
247242
<String as ToSql>::accepts(ty)
248243
}
249244

250-
fn to_sql_checked(
245+
to_sql_checked!();
246+
}
247+
248+
impl ToSql for ChannelSpec {
249+
fn to_sql(
251250
&self,
252251
ty: &Type,
253-
out: &mut BytesMut,
252+
w: &mut BytesMut,
254253
) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
255-
let string = format!("0x{}", hex::encode(self));
256-
257-
<String as ToSql>::to_sql_checked(&string, ty, out)
254+
Json(self).to_sql(ty, w)
258255
}
256+
257+
accepts!(JSONB);
258+
to_sql_checked!();
259259
}
260260
}

primitives/src/channel_validator.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ use crate::channel::{Channel, ChannelError, SpecValidator, SpecValidators};
22
use crate::config::Config;
33
use crate::ValidatorId;
44
use chrono::Utc;
5-
use std::convert::TryFrom;
65

76
pub trait ChannelValidator {
8-
fn is_channel_valid(config: &Config, channel: &Channel) -> Result<(), ChannelError> {
9-
let identity = &config.clone().identity.unwrap_or_else(|| "".to_string());
10-
let validator_identity = ValidatorId::try_from(identity).map_err(|_| {
11-
ChannelError::InvalidArgument("Failed to deserialize identity".to_string())
12-
})?;
13-
let adapter_channel_validator = match channel.spec.validators.find(&validator_identity) {
7+
fn is_channel_valid(
8+
config: &Config,
9+
validator_identity: &ValidatorId,
10+
channel: &Channel,
11+
) -> Result<(), ChannelError> {
12+
let adapter_channel_validator = match channel.spec.validators.find(validator_identity) {
1413
// check if the channel validators include our adapter identity
1514
SpecValidator::None => return Err(ChannelError::AdapterNotIncluded),
1615
SpecValidator::Leader(validator) | SpecValidator::Follower(validator) => validator,

primitives/src/config.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ lazy_static! {
1717
#[derive(Serialize, Deserialize, Debug, Clone)]
1818
#[serde(rename_all(serialize = "SCREAMING_SNAKE_CASE"))]
1919
pub struct Config {
20-
pub identity: Option<String>, // should not be here maybe?
2120
pub max_channels: u32,
2221
pub wait_time: u32,
2322
pub aggr_throttle: u32,

sentry/src/chain.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,27 @@
1-
use crate::ResponseError;
1+
use crate::{Application, ResponseError};
22
use hyper::{Body, Request};
3+
use primitives::adapter::Adapter;
34
use std::future::Future;
45

56
// chain middleware function calls
67
//
78
// function signature
89
// fn middleware(mut req: Request) -> Result<Request, ResponseError>
910

10-
pub async fn chain<M, MF>(
11+
pub async fn chain<'a, A, M, MF>(
1112
req: Request<Body>,
13+
app: &'a Application<A>,
1214
middlewares: Vec<M>,
1315
) -> Result<Request<Body>, ResponseError>
1416
where
17+
A: Adapter,
1518
MF: Future<Output = Result<Request<Body>, ResponseError>> + Send,
16-
M: FnMut(Request<Body>) -> MF,
19+
M: FnMut(Request<Body>, &'a Application<A>) -> MF,
1720
{
1821
let mut req = Ok(req);
1922

2023
for mut mw in middlewares.into_iter() {
21-
match mw(req.unwrap()).await {
24+
match mw(req.unwrap(), app).await {
2225
Ok(r) => {
2326
req = Ok(r);
2427
}

sentry/src/db.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ use std::env;
77

88
use lazy_static::lazy_static;
99

10+
mod channel;
11+
pub use self::channel::*;
12+
1013
pub type DbPool = Pool<PostgresConnectionManager<NoTls>>;
1114

1215
lazy_static! {

sentry/src/db/channel.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use crate::db::DbPool;
2+
use bb8::RunError;
3+
use primitives::{Channel, ChannelId, ValidatorId};
4+
use std::str::FromStr;
5+
6+
pub async fn get_channel_by_id(
7+
pool: &DbPool,
8+
id: &ChannelId,
9+
) -> Result<Option<Channel>, RunError<bb8_postgres::tokio_postgres::Error>> {
10+
pool
11+
.run(move |connection| {
12+
async move {
13+
match connection.prepare("SELECT id, creator, deposit_asset, deposit_amount, valid_until, spec FROM channels WHERE id = $1 LIMIT 1").await {
14+
Ok(select) => match connection.query(&select, &[&id]).await {
15+
Ok(results) => Ok((results.get(0).map(Channel::from), connection)),
16+
Err(e) => Err((e, connection)),
17+
},
18+
Err(e) => Err((e, connection)),
19+
}
20+
}
21+
})
22+
.await
23+
}
24+
25+
pub async fn get_channel_by_id_and_validator(
26+
pool: &DbPool,
27+
id: &ChannelId,
28+
validator_id: &ValidatorId,
29+
) -> Result<Option<Channel>, RunError<bb8_postgres::tokio_postgres::Error>> {
30+
pool
31+
.run(move |connection| {
32+
async move {
33+
let validator = serde_json::Value::from_str(&format!(r#"[{{"id": "{}"}}]"#, validator_id)).expect("Not a valid json");
34+
let query = "SELECT id, creator, deposit_asset, deposit_amount, valid_until, spec FROM channels WHERE id = $1 AND spec->'validators' @> $2 LIMIT 1";
35+
match connection.prepare(query).await {
36+
Ok(select) => {
37+
match connection.query(&select, &[&id, &validator]).await {
38+
Ok(results) => Ok((results.get(0).map(Channel::from), connection)),
39+
Err(e) => Err((e, connection)),
40+
}
41+
},
42+
Err(e) => Err((e, connection)),
43+
}
44+
}
45+
})
46+
.await
47+
}
48+
49+
pub async fn insert_channel(
50+
pool: &DbPool,
51+
channel: &Channel,
52+
) -> Result<bool, RunError<bb8_postgres::tokio_postgres::Error>> {
53+
pool
54+
.run(move |connection| {
55+
async move {
56+
match connection.prepare("INSERT INTO channels (id, creator, deposit_asset, deposit_amount, valid_until, spec) values ($1, $2, $3, $4, $5, $6)").await {
57+
Ok(stmt) => match connection.execute(&stmt, &[&channel.id, &channel.creator, &channel.deposit_asset, &channel.deposit_amount, &channel.valid_until, &channel.spec]).await {
58+
Ok(row) => {
59+
let inserted = row == 1;
60+
Ok((inserted, connection))
61+
},
62+
Err(e) => Err((e, connection)),
63+
},
64+
Err(e) => Err((e, connection)),
65+
}
66+
}
67+
})
68+
.await
69+
}

sentry/src/lib.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
use crate::chain::chain;
55
use crate::db::DbPool;
66
use crate::middleware::auth;
7+
use crate::middleware::channel::channel_load;
78
use crate::middleware::cors::{cors, Cors};
9+
use crate::routes::channel::channel_status;
810
use hyper::{Body, Method, Request, Response, StatusCode};
911
use lazy_static::lazy_static;
1012
use primitives::adapter::Adapter;
@@ -40,7 +42,10 @@ lazy_static! {
4042
// @TODO define other regex routes
4143
}
4244

43-
async fn config_middleware(req: Request<Body>) -> Result<Request<Body>, ResponseError> {
45+
async fn config_middleware<A: Adapter>(
46+
req: Request<Body>,
47+
_: &Application<A>,
48+
) -> Result<Request<Body>, ResponseError> {
4449
Ok(req)
4550
}
4651

@@ -121,14 +126,30 @@ impl<A: Adapter + 'static> Application<A> {
121126

122127
// example with middleware
123128
// @TODO remove later
124-
let req = match chain(req, vec![config_middleware]).await {
129+
let req = match chain(req, &self, vec![config_middleware]).await {
125130
Ok(req) => req,
126131
Err(error) => {
127132
return map_response_error(error);
128133
}
129134
};
130135

131136
last_approved(req, &self).await
137+
} else if let (Some(caps), &Method::GET) =
138+
(CHANNEL_STATUS_BY_CHANNEL_ID.captures(route), method)
139+
{
140+
let param = RouteParams(vec![caps
141+
.get(1)
142+
.map_or("".to_string(), |m| m.as_str().to_string())]);
143+
req.extensions_mut().insert(param);
144+
145+
let req = match chain(req, &self, vec![channel_load]).await {
146+
Ok(req) => req,
147+
Err(error) => {
148+
return map_response_error(error);
149+
}
150+
};
151+
152+
channel_status(req, &self).await
132153
} else {
133154
Err(ResponseError::NotFound)
134155
}

sentry/src/middleware/channel.rs

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,61 @@
1-
use crate::db::DbPool;
2-
use crate::ResponseError;
3-
use bb8::RunError;
1+
use crate::db::{get_channel_by_id, get_channel_by_id_and_validator};
2+
use crate::{Application, ResponseError, RouteParams};
3+
use hex::FromHex;
44
use hyper::{Body, Request};
5-
use primitives::{Channel, ChannelId};
5+
use primitives::adapter::Adapter;
6+
use primitives::{ChannelId, ValidatorId};
7+
use std::convert::TryFrom;
68

7-
pub async fn channel_load(
9+
/// channel_load & channel_if_exist
10+
pub async fn channel_load<A: Adapter>(
811
mut req: Request<Body>,
9-
(pool, id): (&DbPool, &ChannelId),
12+
app: &Application<A>,
1013
) -> Result<Request<Body>, ResponseError> {
11-
let channel = get_channel(pool, id)
14+
let id = req
15+
.extensions()
16+
.get::<RouteParams>()
17+
.ok_or_else(|| ResponseError::BadRequest("Route params not found".to_string()))?
18+
.get(0)
19+
.ok_or_else(|| ResponseError::BadRequest("No id".to_string()))?;
20+
21+
let channel_id = ChannelId::from_hex(id)
22+
.map_err(|_| ResponseError::BadRequest("Wrong Channel Id".to_string()))?;
23+
let channel = get_channel_by_id(&app.pool, &channel_id)
1224
.await?
13-
.ok_or(ResponseError::NotFound)?;
25+
.ok_or_else(|| ResponseError::NotFound)?;
1426

1527
req.extensions_mut().insert(channel);
1628

1729
Ok(req)
1830
}
1931

20-
// @TODO: Maybe move this to more generic place?
21-
pub async fn get_channel(
22-
pool: &DbPool,
23-
id: &ChannelId,
24-
) -> Result<Option<Channel>, RunError<bb8_postgres::tokio_postgres::Error>> {
25-
pool
26-
.run(move |connection| {
27-
async move {
28-
match connection.prepare("SELECT id, creator, deposit_asset, deposit_amount, valid_until, spec FROM channels WHERE id = $1 LIMIT 1").await {
29-
Ok(select) => match connection.query(&select, &[&id]).await {
30-
Ok(results) => Ok(( results.get(0).map(Channel::from) , connection)),
31-
Err(e) => Err((e, connection)),
32-
},
33-
Err(e) => Err((e, connection)),
34-
}
35-
}
36-
})
37-
.await
32+
pub async fn channel_if_active<A: Adapter>(
33+
mut req: Request<Body>,
34+
app: &Application<A>,
35+
) -> Result<Request<Body>, ResponseError> {
36+
let route_params = req
37+
.extensions()
38+
.get::<RouteParams>()
39+
.ok_or_else(|| ResponseError::BadRequest("Route params not found".to_string()))?;
40+
41+
let id = route_params
42+
.get(0)
43+
.ok_or_else(|| ResponseError::BadRequest("No id".to_string()))?;
44+
45+
let channel_id = ChannelId::from_hex(id)
46+
.map_err(|_| ResponseError::BadRequest("Wrong Channel Id".to_string()))?;
47+
48+
let validator_id = route_params
49+
.get(1)
50+
.ok_or_else(|| ResponseError::BadRequest("No Validator Id".to_string()))?;
51+
let validator_id = ValidatorId::try_from(&validator_id)
52+
.map_err(|_| ResponseError::BadRequest("Wrong Validator Id".to_string()))?;
53+
54+
let channel = get_channel_by_id_and_validator(&app.pool, &channel_id, &validator_id)
55+
.await?
56+
.ok_or_else(|| ResponseError::NotFound)?;
57+
58+
req.extensions_mut().insert(channel);
59+
60+
Ok(req)
3861
}

0 commit comments

Comments
 (0)