Skip to content

Commit e927bcf

Browse files
authored
refactor: add horizon support to service (#593)
* refactor: add horizon support WIP: needs receipt parsing Signed-off-by: Gustavo Inacio <[email protected]> * refactor: add unimplemented in store Signed-off-by: Gustavo Inacio <[email protected]> * refactor: use import from tap Signed-off-by: Gustavo Inacio <[email protected]> * chore: clippy Signed-off-by: Gustavo Inacio <[email protected]> --------- Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 6bab2ef commit e927bcf

20 files changed

+233
-128
lines changed

Cargo.lock

Lines changed: 8 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ bigdecimal = "0.4.3"
5454
build-info = "0.0.39"
5555
tap_core = { version = "3.0.0", default-features = false }
5656
tap_aggregator = { version = "0.4.0", default-features = false }
57-
tap_graph = { version = "0.1.0", default-features = false }
57+
tap_graph = { version = "0.2.0", features = ["v2"] }
5858
tracing-subscriber = { version = "0.3", features = [
5959
"json",
6060
"env-filter",
@@ -81,3 +81,15 @@ prost = "0.13.4"
8181
prost-types = "0.13.3"
8282
dipper-rpc = { git = "https://github.com/edgeandnode/dipper/", rev = "c8700e2", default-features = false }
8383
tonic-build = "0.12.3"
84+
85+
[patch.crates-io.tap_core]
86+
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
87+
rev = "dbae001"
88+
89+
[patch.crates-io.tap_aggregator]
90+
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
91+
rev = "dbae001"
92+
93+
[patch.crates-io.tap_graph]
94+
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
95+
rev = "dbae001"

crates/dips/src/server.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33

44
use std::{sync::Arc, time::Duration};
55

6+
use anyhow::anyhow;
7+
use async_trait::async_trait;
8+
use thegraph_core::alloy::{dyn_abi::Eip712Domain, primitives::Address};
9+
use tonic::{Request, Response, Status};
10+
use uuid::Uuid;
11+
612
use crate::{
713
proto::indexer::graphprotocol::indexer::dips::{
814
dips_service_server::DipsService, CancelAgreementRequest, CancelAgreementResponse,
@@ -11,11 +17,6 @@ use crate::{
1117
store::AgreementStore,
1218
validate_and_cancel_agreement, validate_and_create_agreement, DipsError,
1319
};
14-
use anyhow::anyhow;
15-
use async_trait::async_trait;
16-
use thegraph_core::alloy::{dyn_abi::Eip712Domain, primitives::Address};
17-
use tonic::{Request, Response, Status};
18-
use uuid::Uuid;
1920

2021
#[derive(Debug)]
2122
pub struct DipsServer {

crates/service/src/middleware/auth.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ mod tests {
2828

2929
use crate::{
3030
middleware::auth::{self, Bearer, OrExt},
31-
tap::IndexerTapContext,
31+
tap::{IndexerTapContext, TapReceipt},
3232
};
3333

3434
const BEARER_TOKEN: &str = "test";
@@ -104,7 +104,7 @@ mod tests {
104104

105105
// check with receipt
106106
let mut req = Request::new(Default::default());
107-
req.extensions_mut().insert(receipt);
107+
req.extensions_mut().insert(TapReceipt::V1(receipt));
108108
let res = service.call(req).await.unwrap();
109109
assert_eq!(res.status(), StatusCode::OK);
110110

crates/service/src/middleware/auth/tap.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,20 @@ use tap_core::{
2020
manager::{adapters::ReceiptStore, Manager},
2121
receipt::Context,
2222
};
23-
use tap_graph::{ReceiptAggregateVoucher, SignedReceipt};
23+
use tap_graph::ReceiptAggregateVoucher;
2424
use tower_http::auth::AsyncAuthorizeRequest;
2525

26-
use crate::{error::IndexerServiceError, middleware::prometheus_metrics::MetricLabels};
26+
use crate::{
27+
error::IndexerServiceError, middleware::prometheus_metrics::MetricLabels, tap::TapReceipt,
28+
};
2729

2830
/// Middleware to verify and store TAP receipts
2931
///
3032
/// It also optionally updates a failed receipt metric if Labels are provided
3133
///
32-
/// Requires SignedReceipt, MetricLabels and Arc<Context> extensions
34+
/// Requires TapReceipt, MetricLabels and Arc<Context> extensions
3335
pub fn tap_receipt_authorize<T, B>(
34-
tap_manager: Arc<Manager<T, SignedReceipt, ReceiptAggregateVoucher>>,
36+
tap_manager: Arc<Manager<T, TapReceipt, ReceiptAggregateVoucher>>,
3537
failed_receipt_metric: &'static prometheus::CounterVec,
3638
) -> impl AsyncAuthorizeRequest<
3739
B,
@@ -41,11 +43,11 @@ pub fn tap_receipt_authorize<T, B>(
4143
> + Clone
4244
+ Send
4345
where
44-
T: ReceiptStore<SignedReceipt> + Sync + Send + 'static,
46+
T: ReceiptStore<TapReceipt> + Sync + Send + 'static,
4547
B: Send,
4648
{
47-
move |request: Request<B>| {
48-
let receipt = request.extensions().get::<SignedReceipt>().cloned();
49+
move |mut request: Request<B>| {
50+
let receipt = request.extensions_mut().remove::<TapReceipt>();
4951
// load labels from previous middlewares
5052
let labels = request.extensions().get::<MetricLabels>().cloned();
5153
// load context from previous middlewares
@@ -91,7 +93,6 @@ mod tests {
9193
manager::Manager,
9294
receipt::checks::{Check, CheckError, CheckList, CheckResult},
9395
};
94-
use tap_graph::SignedReceipt;
9596
use test_assets::{
9697
assert_while_retry, create_signed_receipt, SignedReceiptRequest, TAP_EIP712_DOMAIN,
9798
};
@@ -103,7 +104,7 @@ mod tests {
103104
auth::tap_receipt_authorize,
104105
prometheus_metrics::{MetricLabelProvider, MetricLabels},
105106
},
106-
tap::{CheckingReceipt, IndexerTapContext},
107+
tap::{CheckingReceipt, IndexerTapContext, TapReceipt},
107108
};
108109

109110
#[fixture]
@@ -131,13 +132,13 @@ mod tests {
131132

132133
struct MyCheck;
133134
#[async_trait::async_trait]
134-
impl Check<SignedReceipt> for MyCheck {
135+
impl Check<TapReceipt> for MyCheck {
135136
async fn check(
136137
&self,
137138
_: &tap_core::receipt::Context,
138139
receipt: &CheckingReceipt,
139140
) -> CheckResult {
140-
if receipt.signed_receipt().message.nonce == FAILED_NONCE {
141+
if receipt.signed_receipt().nonce() == FAILED_NONCE {
141142
Err(CheckError::Failed(anyhow::anyhow!("Failed")))
142143
} else {
143144
Ok(())
@@ -175,7 +176,7 @@ mod tests {
175176

176177
// check with receipt
177178
let mut req = Request::new(Body::default());
178-
req.extensions_mut().insert(receipt);
179+
req.extensions_mut().insert(TapReceipt::V1(receipt));
179180
let res = service.call(req).await.unwrap();
180181
assert_eq!(res.status(), StatusCode::OK);
181182

@@ -214,7 +215,7 @@ mod tests {
214215
// change the nonce to make the receipt invalid
215216
receipt.message.nonce = FAILED_NONCE;
216217
let mut req = Request::new(Body::default());
217-
req.extensions_mut().insert(receipt);
218+
req.extensions_mut().insert(TapReceipt::V1(receipt));
218219
req.extensions_mut().insert(labels);
219220
let response = service.call(req);
220221

crates/service/src/middleware/sender.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ use axum::{
77
response::Response,
88
};
99
use indexer_monitor::EscrowAccounts;
10-
use tap_graph::SignedReceipt;
1110
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
1211
use tokio::sync::watch;
1312

14-
use crate::error::IndexerServiceError;
13+
use crate::{error::IndexerServiceError, tap::TapReceipt};
1514

1615
/// Stated used by sender middleware
1716
#[derive(Clone)]
@@ -44,7 +43,7 @@ pub async fn sender_middleware(
4443
mut request: Request,
4544
next: Next,
4645
) -> Result<Response, IndexerServiceError> {
47-
if let Some(receipt) = request.extensions().get::<SignedReceipt>() {
46+
if let Some(receipt) = request.extensions().get::<TapReceipt>() {
4847
let signer = receipt.recover_signer(&state.domain_separator)?;
4948
let sender = state
5049
.escrow_accounts
@@ -75,7 +74,7 @@ mod tests {
7574
use tower::ServiceExt;
7675

7776
use super::{sender_middleware, Sender};
78-
use crate::middleware::sender::SenderState;
77+
use crate::{middleware::sender::SenderState, tap::TapReceipt};
7978

8079
#[tokio::test]
8180
async fn test_sender_middleware() {
@@ -105,7 +104,7 @@ mod tests {
105104
.oneshot(
106105
Request::builder()
107106
.uri("/")
108-
.extension(receipt)
107+
.extension(TapReceipt::V1(receipt))
109108
.body(Body::empty())
110109
.unwrap(),
111110
)

crates/service/src/middleware/tap_receipt.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use axum::{extract::Request, middleware::Next, response::Response, RequestExt};
55
use axum_extra::TypedHeader;
66

7-
use crate::service::TapReceipt;
7+
use crate::service::TapHeader;
88

99
/// Injects tap receipts in the extensions
1010
///
@@ -14,8 +14,8 @@ use crate::service::TapReceipt;
1414
///
1515
/// This is useful to not deserialize multiple times the same receipt
1616
pub async fn receipt_middleware(mut request: Request, next: Next) -> Response {
17-
if let Ok(TypedHeader(TapReceipt(receipt))) =
18-
request.extract_parts::<TypedHeader<TapReceipt>>().await
17+
if let Ok(TypedHeader(TapHeader(receipt))) =
18+
request.extract_parts::<TypedHeader<TapHeader>>().await
1919
{
2020
request.extensions_mut().insert(receipt);
2121
}
@@ -33,11 +33,10 @@ mod tests {
3333
};
3434
use axum_extra::headers::Header;
3535
use reqwest::StatusCode;
36-
use tap_graph::SignedReceipt;
3736
use test_assets::{create_signed_receipt, SignedReceiptRequest};
3837
use tower::ServiceExt;
3938

40-
use crate::{middleware::tap_receipt::receipt_middleware, service::TapReceipt};
39+
use crate::{middleware::tap_receipt::receipt_middleware, service::TapHeader, tap::TapReceipt};
4140

4241
#[tokio::test]
4342
async fn test_receipt_middleware() {
@@ -46,12 +45,13 @@ mod tests {
4645
let receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await;
4746
let receipt_json = serde_json::to_string(&receipt).unwrap();
4847

48+
let receipt = TapReceipt::V1(receipt);
49+
4950
let handle = move |extensions: Extensions| async move {
5051
let received_receipt = extensions
51-
.get::<SignedReceipt>()
52+
.get::<TapReceipt>()
5253
.expect("Should decode tap receipt");
53-
assert_eq!(received_receipt.message, receipt.message);
54-
assert_eq!(received_receipt.signature, receipt.signature);
54+
assert_eq!(*received_receipt, receipt);
5555
Body::empty()
5656
};
5757

@@ -61,7 +61,7 @@ mod tests {
6161
.oneshot(
6262
Request::builder()
6363
.uri("/")
64-
.header(TapReceipt::name(), receipt_json)
64+
.header(TapHeader::name(), receipt_json)
6565
.body(Body::empty())
6666
.unwrap(),
6767
)

crates/service/src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ mod router;
3333
mod tap_receipt_header;
3434

3535
pub use router::ServiceRouter;
36-
pub use tap_receipt_header::TapReceipt;
36+
pub use tap_receipt_header::TapHeader;
3737

3838
#[derive(Clone)]
3939
pub struct GraphNodeState {

crates/service/src/service/tap_receipt_header.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,18 @@ use lazy_static::lazy_static;
66
use prometheus::{register_counter, Counter};
77
use tap_graph::SignedReceipt;
88

9+
use crate::tap::TapReceipt;
10+
911
#[derive(Debug, PartialEq)]
10-
pub struct TapReceipt(pub SignedReceipt);
12+
pub struct TapHeader(pub TapReceipt);
1113

1214
lazy_static! {
1315
static ref TAP_RECEIPT: HeaderName = HeaderName::from_static("tap-receipt");
1416
pub static ref TAP_RECEIPT_INVALID: Counter =
1517
register_counter!("indexer_tap_invalid_total", "Invalid tap receipt decode",).unwrap();
1618
}
1719

18-
impl Header for TapReceipt {
20+
impl Header for TapHeader {
1921
fn name() -> &'static HeaderName {
2022
&TAP_RECEIPT
2123
}
@@ -30,9 +32,9 @@ impl Header for TapReceipt {
3032
let raw_receipt = raw_receipt
3133
.to_str()
3234
.map_err(|_| headers::Error::invalid())?;
33-
let parsed_receipt =
35+
let parsed_receipt: SignedReceipt =
3436
serde_json::from_str(raw_receipt).map_err(|_| headers::Error::invalid())?;
35-
Ok(TapReceipt(parsed_receipt))
37+
Ok(TapHeader(crate::tap::TapReceipt::V1(parsed_receipt)))
3638
};
3739
execute().inspect_err(|_| TAP_RECEIPT_INVALID.inc())
3840
}
@@ -51,25 +53,26 @@ mod test {
5153
use axum_extra::headers::Header;
5254
use test_assets::{create_signed_receipt, SignedReceiptRequest};
5355

54-
use super::TapReceipt;
56+
use super::TapHeader;
57+
use crate::tap::TapReceipt;
5558

5659
#[tokio::test]
5760
async fn test_decode_valid_tap_receipt_header() {
5861
let original_receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await;
5962
let serialized_receipt = serde_json::to_string(&original_receipt).unwrap();
6063
let header_value = HeaderValue::from_str(&serialized_receipt).unwrap();
6164
let header_values = vec![&header_value];
62-
let decoded_receipt = TapReceipt::decode(&mut header_values.into_iter())
65+
let decoded_receipt = TapHeader::decode(&mut header_values.into_iter())
6366
.expect("tap receipt header value should be valid");
6467

65-
assert_eq!(decoded_receipt, TapReceipt(original_receipt));
68+
assert_eq!(decoded_receipt, TapHeader(TapReceipt::V1(original_receipt)));
6669
}
6770

6871
#[test]
6972
fn test_decode_non_string_tap_receipt_header() {
7073
let header_value = HeaderValue::from_static("123");
7174
let header_values = vec![&header_value];
72-
let result = TapReceipt::decode(&mut header_values.into_iter());
75+
let result = TapHeader::decode(&mut header_values.into_iter());
7376

7477
assert!(result.is_err());
7578
}
@@ -78,7 +81,7 @@ mod test {
7881
fn test_decode_invalid_tap_receipt_header() {
7982
let header_value = HeaderValue::from_bytes(b"invalid").unwrap();
8083
let header_values = vec![&header_value];
81-
let result = TapReceipt::decode(&mut header_values.into_iter());
84+
let result = TapHeader::decode(&mut header_values.into_iter());
8285

8386
assert!(result.is_err());
8487
}

0 commit comments

Comments
 (0)