Skip to content

Commit 3c240c0

Browse files
authored
refactor: store horizon receipts (#595)
* refactor: store horizon receipts Signed-off-by: Gustavo Inacio <[email protected]> * fix: tap horizon down migration Signed-off-by: Gustavo Inacio <[email protected]> * refactor: return process receipt error Signed-off-by: Gustavo Inacio <[email protected]> --------- Signed-off-by: Gustavo Inacio <[email protected]>
1 parent f071b3b commit 3c240c0

File tree

6 files changed

+243
-33
lines changed

6 files changed

+243
-33
lines changed

.sqlx/query-eef0d4e41d62691f9987cf4a086da379c5d5c363f378b31f226444a6b808a239.json

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ bip39.workspace = true
5959
tower = "0.5.1"
6060
pin-project = "1.1.7"
6161
tonic.workspace = true
62+
itertools = "0.14.0"
6263

6364
[dev-dependencies]
6465
hex-literal = "0.4.1"

crates/service/src/tap/receipt_store.rs

Lines changed: 190 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use anyhow::anyhow;
55
use bigdecimal::num_bigint::BigInt;
6+
use itertools::{Either, Itertools};
67
use sqlx::{types::BigDecimal, PgPool};
78
use tap_core::{manager::adapters::ReceiptStore, receipt::WithValueAndTimestamp};
89
use thegraph_core::alloy::{hex::ToHexExt, sol_types::Eip712Domain};
@@ -16,8 +17,39 @@ pub struct InnerContext {
1617
pub pgpool: PgPool,
1718
}
1819

20+
#[derive(thiserror::Error, Debug)]
21+
enum ProcessReceiptError {
22+
#[error("Failed to store v1 receipts: {0}")]
23+
V1(anyhow::Error),
24+
#[error("Failed to store v2 receipts: {0}")]
25+
V2(anyhow::Error),
26+
#[error("Failed to receipts for v1 and v2. Error v1: {0}. Error v2: {1}")]
27+
Both(anyhow::Error, anyhow::Error),
28+
}
29+
1930
impl InnerContext {
20-
async fn store_receipts(&self, receipts: Vec<DatabaseReceipt>) -> Result<(), AdapterError> {
31+
async fn process_db_receipts(
32+
&self,
33+
buffer: Vec<DatabaseReceipt>,
34+
) -> Result<(), ProcessReceiptError> {
35+
let (v1_receipts, v2_receipts): (Vec<_>, Vec<_>) =
36+
buffer.into_iter().partition_map(|r| match r {
37+
DatabaseReceipt::V1(db_receipt_v1) => Either::Left(db_receipt_v1),
38+
DatabaseReceipt::V2(db_receipt_v2) => Either::Right(db_receipt_v2),
39+
});
40+
let (insert_v1, insert_v2) = tokio::join!(
41+
self.store_receipts_v1(v1_receipts),
42+
self.store_receipts_v2(v2_receipts)
43+
);
44+
match (insert_v1, insert_v2) {
45+
(Err(e1), Err(e2)) => Err(ProcessReceiptError::Both(e1.into(), e2.into())),
46+
(Err(e1), _) => Err(ProcessReceiptError::V1(e1.into())),
47+
(_, Err(e2)) => Err(ProcessReceiptError::V2(e2.into())),
48+
_ => Ok(()),
49+
}
50+
}
51+
52+
async fn store_receipts_v1(&self, receipts: Vec<DbReceiptV1>) -> Result<(), AdapterError> {
2153
let receipts_len = receipts.len();
2254
let mut signers = Vec::with_capacity(receipts_len);
2355
let mut signatures = Vec::with_capacity(receipts_len);
@@ -66,6 +98,71 @@ impl InnerContext {
6698

6799
Ok(())
68100
}
101+
102+
async fn store_receipts_v2(&self, receipts: Vec<DbReceiptV2>) -> Result<(), AdapterError> {
103+
let receipts_len = receipts.len();
104+
let mut signers = Vec::with_capacity(receipts_len);
105+
let mut signatures = Vec::with_capacity(receipts_len);
106+
let mut allocation_ids = Vec::with_capacity(receipts_len);
107+
let mut payers = Vec::with_capacity(receipts_len);
108+
let mut data_services = Vec::with_capacity(receipts_len);
109+
let mut service_providers = Vec::with_capacity(receipts_len);
110+
let mut timestamps = Vec::with_capacity(receipts_len);
111+
let mut nonces = Vec::with_capacity(receipts_len);
112+
let mut values = Vec::with_capacity(receipts_len);
113+
114+
for receipt in receipts {
115+
signers.push(receipt.signer_address);
116+
signatures.push(receipt.signature);
117+
allocation_ids.push(receipt.allocation_id);
118+
payers.push(receipt.payer);
119+
data_services.push(receipt.data_service);
120+
service_providers.push(receipt.service_provider);
121+
timestamps.push(receipt.timestamp_ns);
122+
nonces.push(receipt.nonce);
123+
values.push(receipt.value);
124+
}
125+
sqlx::query!(
126+
r#"INSERT INTO tap_horizon_receipts (
127+
signer_address,
128+
signature,
129+
allocation_id,
130+
payer,
131+
data_service,
132+
service_provider,
133+
timestamp_ns,
134+
nonce,
135+
value
136+
) SELECT * FROM UNNEST(
137+
$1::CHAR(40)[],
138+
$2::BYTEA[],
139+
$3::CHAR(40)[],
140+
$4::CHAR(40)[],
141+
$5::CHAR(40)[],
142+
$6::CHAR(40)[],
143+
$7::NUMERIC(20)[],
144+
$8::NUMERIC(20)[],
145+
$9::NUMERIC(40)[]
146+
)"#,
147+
&signers,
148+
&signatures,
149+
&allocation_ids,
150+
&payers,
151+
&data_services,
152+
&service_providers,
153+
&timestamps,
154+
&nonces,
155+
&values,
156+
)
157+
.execute(&self.pgpool)
158+
.await
159+
.map_err(|e| {
160+
tracing::error!("Failed to store receipt: {}", e);
161+
anyhow!(e)
162+
})?;
163+
164+
Ok(())
165+
}
69166
}
70167

71168
impl IndexerTapContext {
@@ -81,8 +178,8 @@ impl IndexerTapContext {
81178
tokio::select! {
82179
biased;
83180
_ = receiver.recv_many(&mut buffer, BUFFER_SIZE) => {
84-
if let Err(e) = inner_context.store_receipts(buffer).await {
85-
tracing::error!("Failed to store receipts: {}", e);
181+
if let Err(e) = inner_context.process_db_receipts(buffer).await {
182+
tracing::error!("{e}");
86183
}
87184
}
88185
_ = cancelation_token.cancelled() => { break },
@@ -108,7 +205,21 @@ impl ReceiptStore<TapReceipt> for IndexerTapContext {
108205
}
109206
}
110207

111-
pub struct DatabaseReceipt {
208+
pub enum DatabaseReceipt {
209+
V1(DbReceiptV1),
210+
V2(DbReceiptV2),
211+
}
212+
213+
impl DatabaseReceipt {
214+
fn from_receipt(receipt: CheckingReceipt, separator: &Eip712Domain) -> anyhow::Result<Self> {
215+
Ok(match receipt.signed_receipt() {
216+
TapReceipt::V1(receipt) => Self::V1(DbReceiptV1::from_receipt(receipt, separator)?),
217+
TapReceipt::V2(receipt) => Self::V2(DbReceiptV2::from_receipt(receipt, separator)?),
218+
})
219+
}
220+
}
221+
222+
pub struct DbReceiptV1 {
112223
signer_address: String,
113224
signature: Vec<u8>,
114225
allocation_id: String,
@@ -117,34 +228,80 @@ pub struct DatabaseReceipt {
117228
value: BigDecimal,
118229
}
119230

120-
impl DatabaseReceipt {
121-
fn from_receipt(receipt: CheckingReceipt, separator: &Eip712Domain) -> anyhow::Result<Self> {
122-
match receipt.signed_receipt() {
123-
TapReceipt::V1(receipt) => {
124-
let allocation_id = receipt.message.allocation_id.encode_hex();
125-
let signature = receipt.signature.as_bytes().to_vec();
126-
127-
let signer_address = receipt
128-
.recover_signer(separator)
129-
.map_err(|e| {
130-
tracing::error!("Failed to recover receipt signer: {}", e);
131-
anyhow!(e)
132-
})?
133-
.encode_hex();
134-
135-
let timestamp_ns = BigDecimal::from(receipt.timestamp_ns());
136-
let nonce = BigDecimal::from(receipt.message.nonce);
137-
let value = BigDecimal::from(BigInt::from(receipt.value()));
138-
Ok(Self {
139-
allocation_id,
140-
nonce,
141-
signature,
142-
signer_address,
143-
timestamp_ns,
144-
value,
145-
})
146-
}
147-
TapReceipt::V2(_) => unimplemented!("Horizon Receipts are not supported yet."),
148-
}
231+
impl DbReceiptV1 {
232+
fn from_receipt(
233+
receipt: &tap_graph::SignedReceipt,
234+
separator: &Eip712Domain,
235+
) -> anyhow::Result<Self> {
236+
let allocation_id = receipt.message.allocation_id.encode_hex();
237+
let signature = receipt.signature.as_bytes().to_vec();
238+
239+
let signer_address = receipt
240+
.recover_signer(separator)
241+
.map_err(|e| {
242+
tracing::error!("Failed to recover receipt signer: {}", e);
243+
anyhow!(e)
244+
})?
245+
.encode_hex();
246+
247+
let timestamp_ns = BigDecimal::from(receipt.timestamp_ns());
248+
let nonce = BigDecimal::from(receipt.message.nonce);
249+
let value = BigDecimal::from(BigInt::from(receipt.value()));
250+
Ok(Self {
251+
allocation_id,
252+
nonce,
253+
signature,
254+
signer_address,
255+
timestamp_ns,
256+
value,
257+
})
258+
}
259+
}
260+
261+
pub struct DbReceiptV2 {
262+
signer_address: String,
263+
signature: Vec<u8>,
264+
allocation_id: String,
265+
payer: String,
266+
data_service: String,
267+
service_provider: String,
268+
timestamp_ns: BigDecimal,
269+
nonce: BigDecimal,
270+
value: BigDecimal,
271+
}
272+
273+
impl DbReceiptV2 {
274+
fn from_receipt(
275+
receipt: &tap_graph::v2::SignedReceipt,
276+
separator: &Eip712Domain,
277+
) -> anyhow::Result<Self> {
278+
let allocation_id = receipt.message.allocation_id.encode_hex();
279+
let payer = receipt.message.payer.encode_hex();
280+
let data_service = receipt.message.data_service.encode_hex();
281+
let service_provider = receipt.message.service_provider.encode_hex();
282+
let signature = receipt.signature.as_bytes().to_vec();
283+
284+
let signer_address = receipt
285+
.recover_signer(separator)
286+
.map_err(|e| {
287+
tracing::error!("Failed to recover receipt signer: {}", e);
288+
anyhow!(e)
289+
})?
290+
.encode_hex();
291+
292+
let timestamp_ns = BigDecimal::from(receipt.timestamp_ns());
293+
let nonce = BigDecimal::from(receipt.message.nonce);
294+
let value = BigDecimal::from(BigInt::from(receipt.value()));
295+
Ok(Self {
296+
allocation_id,
297+
payer,
298+
data_service,
299+
service_provider,
300+
nonce,
301+
signature,
302+
signer_address,
303+
timestamp_ns,
304+
value,
305+
})
149306
}
150307
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- Add down migration script here
2+
DROP TABLE IF EXISTS tap_horizon_receipts CASCADE;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
-- Add up migration script here
2+
CREATE TABLE IF NOT EXISTS tap_horizon_receipts (
3+
id BIGSERIAL PRIMARY KEY, -- id being SERIAL is important for the function of tap-agent
4+
signer_address CHAR(40) NOT NULL,
5+
6+
-- Values below are the individual fields of the EIP-712 receipt
7+
signature BYTEA NOT NULL,
8+
allocation_id CHAR(40) NOT NULL,
9+
payer CHAR(40) NOT NULL,
10+
data_service CHAR(40) NOT NULL,
11+
service_provider CHAR(40) NOT NULL,
12+
timestamp_ns NUMERIC(20) NOT NULL,
13+
nonce NUMERIC(20) NOT NULL,
14+
value NUMERIC(39) NOT NULL
15+
);
16+
17+
CREATE INDEX IF NOT EXISTS scalar_tap_receipts_allocation_id_idx ON scalar_tap_receipts (allocation_id);
18+
CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_receipts (timestamp_ns);

0 commit comments

Comments
 (0)