Skip to content

Commit f86b213

Browse files
committed
feat: add tap v2 receipt parser
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 83334d5 commit f86b213

File tree

8 files changed

+334
-81
lines changed

8 files changed

+334
-81
lines changed

Cargo.lock

Lines changed: 152 additions & 64 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,15 @@ uuid = { version = "1.11.0", features = ["v7"] }
5252
tracing = { version = "0.1.40", default-features = false }
5353
bigdecimal = "0.4.3"
5454
build-info = "0.0.39"
55-
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1c6e29f", default-features = false }
56-
tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1c6e29f", default-features = false }
55+
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "3c56018", default-features = false }
56+
tap_core_v2 = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1dada3e", package = "tap_core" }
57+
tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "3c56018", default-features = false }
5758
tracing-subscriber = { version = "0.3", features = [
5859
"json",
5960
"env-filter",
6061
"ansi",
6162
], default-features = false }
62-
thegraph-core = { version = "0.9.6", features = [
63+
thegraph-core = { git = "https://github.com/edgeandnode/toolshed", rev= "d710e05", features = [
6364
"attestation",
6465
"alloy-eip712",
6566
"alloy-sol-types",

crates/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ async-graphql-axum = "7.0.11"
3737
base64.workspace = true
3838
graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3.0" }
3939
tap_core.workspace = true
40+
tap_core_v2.workspace = true
4041
uuid.workspace = true
4142
typed-builder.workspace = true
4243
tower_governor = { version = "0.5.0", features = ["axum"] }

crates/service/src/middleware/tap_receipt.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@ use crate::service::TapReceipt;
1414
///
1515
/// This is useful to not deserialize multiple times the same receipt
1616
pub async fn receipt_middleware(mut request: Request, next: Next) -> Response {
17-
if let Ok(TypedHeader(TapReceipt(receipt))) =
18-
request.extract_parts::<TypedHeader<TapReceipt>>().await
19-
{
20-
request.extensions_mut().insert(receipt);
17+
if let Ok(TypedHeader(receipt)) = request.extract_parts::<TypedHeader<TapReceipt>>().await {
18+
match receipt {
19+
TapReceipt::V1(receipt) => {
20+
request.extensions_mut().insert(receipt);
21+
}
22+
TapReceipt::V2(receipt) => {
23+
request.extensions_mut().insert(receipt);
24+
}
25+
}
2126
}
2227
next.run(request).await
2328
}

crates/service/src/service/tap_receipt_header.rs

Lines changed: 113 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,81 @@
44
use axum_extra::headers::{self, Header, HeaderName, HeaderValue};
55
use lazy_static::lazy_static;
66
use prometheus::{register_counter, Counter};
7-
use tap_core::receipt::SignedReceipt;
7+
use serde::de;
8+
use serde_json::Value;
9+
use tap_core::receipt::SignedReceipt as SignedReceiptV1;
10+
use tap_core_v2::receipt::SignedReceipt as SignedReceiptV2;
811

9-
#[derive(Debug, PartialEq)]
10-
pub struct TapReceipt(pub SignedReceipt);
12+
#[derive(Debug, PartialEq, Clone, serde::Serialize)]
13+
#[serde(untagged)]
14+
pub enum TapReceipt {
15+
V1(SignedReceiptV1),
16+
V2(SignedReceiptV2),
17+
}
18+
19+
impl<'de> serde::Deserialize<'de> for TapReceipt {
20+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
21+
where
22+
D: serde::Deserializer<'de>,
23+
{
24+
let temp = Value::deserialize(deserializer)?;
25+
26+
let is_v1 = temp
27+
.as_object()
28+
.ok_or(de::Error::custom("Didn't receive an object"))?
29+
.get("message")
30+
.ok_or(de::Error::custom("There's no message in the object"))?
31+
.as_object()
32+
.ok_or(de::Error::custom("Message is not an object"))?
33+
.contains_key("allocation_id");
34+
35+
if is_v1 {
36+
// Try V1 first
37+
serde_json::from_value::<SignedReceiptV1>(temp)
38+
.map(TapReceipt::V1)
39+
.map_err(de::Error::custom)
40+
} else {
41+
// Fall back to V2
42+
serde_json::from_value::<SignedReceiptV2>(temp)
43+
.map(TapReceipt::V2)
44+
.map_err(de::Error::custom)
45+
}
46+
}
47+
}
48+
49+
impl From<SignedReceiptV1> for TapReceipt {
50+
fn from(value: SignedReceiptV1) -> Self {
51+
Self::V1(value)
52+
}
53+
}
54+
55+
impl From<SignedReceiptV2> for TapReceipt {
56+
fn from(value: SignedReceiptV2) -> Self {
57+
Self::V2(value)
58+
}
59+
}
60+
61+
impl TryFrom<TapReceipt> for SignedReceiptV1 {
62+
type Error = anyhow::Error;
63+
64+
fn try_from(value: TapReceipt) -> Result<Self, Self::Error> {
65+
match value {
66+
TapReceipt::V2(_) => Err(anyhow::anyhow!("TapReceipt is V2")),
67+
TapReceipt::V1(receipt) => Ok(receipt),
68+
}
69+
}
70+
}
71+
72+
impl TryFrom<TapReceipt> for SignedReceiptV2 {
73+
type Error = anyhow::Error;
74+
75+
fn try_from(value: TapReceipt) -> Result<Self, Self::Error> {
76+
match value {
77+
TapReceipt::V1(_) => Err(anyhow::anyhow!("TapReceipt is V1")),
78+
TapReceipt::V2(receipt) => Ok(receipt),
79+
}
80+
}
81+
}
1182

1283
lazy_static! {
1384
static ref TAP_RECEIPT: HeaderName = HeaderName::from_static("tap-receipt");
@@ -30,9 +101,9 @@ impl Header for TapReceipt {
30101
let raw_receipt = raw_receipt
31102
.to_str()
32103
.map_err(|_| headers::Error::invalid())?;
33-
let parsed_receipt =
104+
let parsed_receipt: TapReceipt =
34105
serde_json::from_str(raw_receipt).map_err(|_| headers::Error::invalid())?;
35-
Ok(TapReceipt(parsed_receipt))
106+
Ok(parsed_receipt)
36107
};
37108
execute().inspect_err(|_| TAP_RECEIPT_INVALID.inc())
38109
}
@@ -49,20 +120,54 @@ impl Header for TapReceipt {
49120
mod test {
50121
use axum::http::HeaderValue;
51122
use axum_extra::headers::Header;
52-
use test_assets::{create_signed_receipt, SignedReceiptRequest};
123+
use test_assets::{
124+
create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest,
125+
SignedReceiptV2Request,
126+
};
53127

54128
use super::TapReceipt;
55129

56130
#[tokio::test]
57131
async fn test_decode_valid_tap_receipt_header() {
58132
let original_receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await;
59133
let serialized_receipt = serde_json::to_string(&original_receipt).unwrap();
60-
let header_value = HeaderValue::from_str(&serialized_receipt).unwrap();
134+
135+
let original_receipt_v1: TapReceipt = original_receipt.clone().into();
136+
let serialized_receipt_v1 = serde_json::to_string(&original_receipt_v1).unwrap();
137+
138+
assert_eq!(serialized_receipt, serialized_receipt_v1);
139+
140+
println!("Was able to serialize properly: {serialized_receipt_v1:?}");
141+
let deserialized: TapReceipt = serde_json::from_str(&serialized_receipt_v1).unwrap();
142+
println!("Was able to deserialize properly: {deserialized:?}");
143+
let header_value = HeaderValue::from_str(&serialized_receipt_v1).unwrap();
144+
let header_values = vec![&header_value];
145+
let decoded_receipt = TapReceipt::decode(&mut header_values.into_iter())
146+
.expect("tap receipt header value should be valid");
147+
148+
assert_eq!(decoded_receipt, original_receipt.into());
149+
}
150+
151+
#[tokio::test]
152+
async fn test_decode_valid_tap_v2_receipt_header() {
153+
let original_receipt =
154+
create_signed_receipt_v2(SignedReceiptV2Request::builder().build()).await;
155+
let serialized_receipt = serde_json::to_string(&original_receipt).unwrap();
156+
157+
let original_receipt_v1: TapReceipt = original_receipt.clone().into();
158+
let serialized_receipt_v1 = serde_json::to_string(&original_receipt_v1).unwrap();
159+
160+
assert_eq!(serialized_receipt, serialized_receipt_v1);
161+
162+
println!("Was able to serialize properly: {serialized_receipt_v1:?}");
163+
let deserialized: TapReceipt = serde_json::from_str(&serialized_receipt_v1).unwrap();
164+
println!("Was able to deserialize properly: {deserialized:?}");
165+
let header_value = HeaderValue::from_str(&serialized_receipt_v1).unwrap();
61166
let header_values = vec![&header_value];
62167
let decoded_receipt = TapReceipt::decode(&mut header_values.into_iter())
63168
.expect("tap receipt header value should be valid");
64169

65-
assert_eq!(decoded_receipt, TapReceipt(original_receipt));
170+
assert_eq!(decoded_receipt, original_receipt.into());
66171
}
67172

68173
#[test]

crates/tap-agent/src/agent/sender_allocation.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,8 +1258,7 @@ pub mod tests {
12581258
));
12591259

12601260
// Stop the TAP aggregator server.
1261-
handle.stop().unwrap();
1262-
handle.stopped().await;
1261+
handle.abort();
12631262
}
12641263

12651264
#[sqlx::test(migrations = "../../migrations")]

crates/test-assets/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ indexer-allocation = { path = "../allocation" }
88
bip39 = "2.0.0"
99
lazy_static.workspace = true
1010
tap_core.workspace = true
11+
tap_core_v2.workspace = true
1112
thegraph-core.workspace = true
1213
typed-builder.workspace = true
1314
tokio.workspace = true

crates/test-assets/src/lib.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ use tap_core::{
1515
signed_message::EIP712SignedMessage,
1616
tap_eip712_domain,
1717
};
18+
19+
use tap_core_v2::{
20+
receipt::{Receipt as ReceiptV2, SignedReceipt as SignedReceiptV2},
21+
signed_message::EIP712SignedMessage as EIP712SignedMessageV2,
22+
// tap_eip712_domain as tap_eip712_domain_v2,
23+
};
1824
use thegraph_core::{
1925
alloy::{
2026
primitives::{address, Address, U256},
@@ -353,6 +359,53 @@ pub async fn create_signed_receipt(
353359
.unwrap()
354360
}
355361

362+
#[derive(TypedBuilder)]
363+
pub struct SignedReceiptV2Request {
364+
#[builder(default = Address::ZERO)]
365+
payer: Address,
366+
#[builder(default = Address::ZERO)]
367+
data_service: Address,
368+
#[builder(default = Address::ZERO)]
369+
service_provider: Address,
370+
#[builder(default)]
371+
nonce: u64,
372+
#[builder(default_code = r#"SystemTime::now()
373+
.duration_since(UNIX_EPOCH)
374+
.unwrap()
375+
.as_nanos() as u64"#)]
376+
timestamp_ns: u64,
377+
#[builder(default = 1)]
378+
value: u128,
379+
}
380+
381+
pub async fn create_signed_receipt_v2(
382+
SignedReceiptV2Request {
383+
payer,
384+
data_service,
385+
service_provider,
386+
nonce,
387+
timestamp_ns,
388+
value,
389+
}: SignedReceiptV2Request,
390+
) -> SignedReceiptV2 {
391+
let (wallet, _) = &*self::TAP_SIGNER;
392+
393+
EIP712SignedMessageV2::new(
394+
&self::TAP_EIP712_DOMAIN,
395+
ReceiptV2 {
396+
payer,
397+
data_service,
398+
service_provider,
399+
nonce,
400+
timestamp_ns,
401+
value,
402+
},
403+
wallet,
404+
)
405+
.unwrap()
406+
}
407+
408+
356409
pub async fn flush_messages(notify: &Notify) {
357410
loop {
358411
if tokio::time::timeout(Duration::from_millis(10), notify.notified())

0 commit comments

Comments
 (0)