diff --git a/.sqlx/query-9a11e8e814e42113c91050acf54c2e847f24a2e086abcbe8e7be618f84d58f78.json b/.sqlx/query-9a11e8e814e42113c91050acf54c2e847f24a2e086abcbe8e7be618f84d58f78.json new file mode 100644 index 000000000..134eef5e2 --- /dev/null +++ b/.sqlx/query-9a11e8e814e42113c91050acf54c2e847f24a2e086abcbe8e7be618f84d58f78.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE tap_horizon_ravs\n SET last = true\n WHERE \n allocation_id = $1\n AND payer = $2\n AND service_provider = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar", + "Bpchar" + ] + }, + "nullable": [] + }, + "hash": "9a11e8e814e42113c91050acf54c2e847f24a2e086abcbe8e7be618f84d58f78" +} diff --git a/.sqlx/query-b467894d499541f2bca15965f38ee551ece8619860f3fbcb184ebded33898cbd.json b/.sqlx/query-b467894d499541f2bca15965f38ee551ece8619860f3fbcb184ebded33898cbd.json new file mode 100644 index 000000000..76efd9841 --- /dev/null +++ b/.sqlx/query-b467894d499541f2bca15965f38ee551ece8619860f3fbcb184ebded33898cbd.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n MAX(id),\n SUM(value),\n COUNT(*)\n FROM\n tap_horizon_receipts_invalid\n WHERE\n allocation_id = $1\n AND signer_address IN (SELECT unnest($2::text[]))\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "max", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "sum", + "type_info": "Numeric" + }, + { + "ordinal": 2, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Bpchar", + "TextArray" + ] + }, + "nullable": [ + null, + null, + null + ] + }, + "hash": "b467894d499541f2bca15965f38ee551ece8619860f3fbcb184ebded33898cbd" +} diff --git a/.sqlx/query-f65b5a6c2149155d69fb1f187ebb4e24c08d5763d97da4e2ec759c30d6170a68.json b/.sqlx/query-f65b5a6c2149155d69fb1f187ebb4e24c08d5763d97da4e2ec759c30d6170a68.json new file mode 100644 index 000000000..71f5c8e0b --- /dev/null +++ b/.sqlx/query-f65b5a6c2149155d69fb1f187ebb4e24c08d5763d97da4e2ec759c30d6170a68.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO tap_horizon_receipts_invalid (\n signer_address,\n signature,\n allocation_id,\n payer,\n data_service,\n service_provider,\n timestamp_ns,\n nonce,\n value,\n error_log\n ) SELECT * FROM UNNEST(\n $1::CHAR(40)[],\n $2::BYTEA[],\n $3::CHAR(40)[],\n $4::CHAR(40)[],\n $5::CHAR(40)[],\n $6::CHAR(40)[],\n $7::NUMERIC(20)[],\n $8::NUMERIC(20)[],\n $9::NUMERIC(40)[],\n $10::TEXT[]\n )", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "BpcharArray", + "ByteaArray", + "BpcharArray", + "BpcharArray", + "BpcharArray", + "BpcharArray", + "NumericArray", + "NumericArray", + "NumericArray", + "TextArray" + ] + }, + "nullable": [] + }, + "hash": "f65b5a6c2149155d69fb1f187ebb4e24c08d5763d97da4e2ec759c30d6170a68" +} diff --git a/.sqlx/query-fa31900971e23fdf98dd48fcd649e04328fb1e758a0b538319c5fb132251a7ed.json b/.sqlx/query-fa31900971e23fdf98dd48fcd649e04328fb1e758a0b538319c5fb132251a7ed.json new file mode 100644 index 000000000..e87b866a2 --- /dev/null +++ b/.sqlx/query-fa31900971e23fdf98dd48fcd649e04328fb1e758a0b538319c5fb132251a7ed.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n MAX(id),\n SUM(value),\n COUNT(*)\n FROM\n tap_horizon_receipts\n WHERE\n allocation_id = $1\n AND service_provider = $2\n AND id <= $3\n AND signer_address IN (SELECT unnest($4::text[]))\n AND timestamp_ns > $5\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "max", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "sum", + "type_info": "Numeric" + }, + { + "ordinal": 2, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar", + "Int8", + "TextArray", + "Numeric" + ] + }, + "nullable": [ + null, + null, + null + ] + }, + "hash": "fa31900971e23fdf98dd48fcd649e04328fb1e758a0b538319c5fb132251a7ed" +} diff --git a/Cargo.lock b/Cargo.lock index a73f9dd93..da53e80c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3958,6 +3958,7 @@ dependencies = [ "indexer-receipt", "indexer-tap-agent", "indexer-watcher", + "itertools 0.14.0", "jsonrpsee", "lazy_static", "prometheus", diff --git a/crates/tap-agent/Cargo.toml b/crates/tap-agent/Cargo.toml index 6fd099070..48859968f 100644 --- a/crates/tap-agent/Cargo.toml +++ b/crates/tap-agent/Cargo.toml @@ -52,6 +52,7 @@ futures = { version = "0.3.30", default-features = false } bon = "3.3" test-assets = { path = "../test-assets", optional = true } rand = { version = "0.8", optional = true } +itertools = "0.14.0" [dev-dependencies] # Release-please breaks with cyclical dependencies if dev-dependencies diff --git a/crates/tap-agent/src/agent/sender_allocation.rs b/crates/tap-agent/src/agent/sender_allocation.rs index 4a4e2e149..75b542547 100644 --- a/crates/tap-agent/src/agent/sender_allocation.rs +++ b/crates/tap-agent/src/agent/sender_allocation.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ + future::Future, marker::PhantomData, sync::Arc, time::{Duration, Instant}, @@ -10,6 +11,7 @@ use std::{ use anyhow::{anyhow, ensure}; use bigdecimal::{num_bigint::BigInt, ToPrimitive}; use indexer_monitor::{EscrowAccounts, SubgraphClient}; +use itertools::{Either, Itertools}; use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; use ractor::{Actor, ActorProcessingErr, ActorRef}; use sqlx::{types::BigDecimal, PgPool}; @@ -39,7 +41,7 @@ use crate::{ tap::{ context::{ checks::{AllocationId, Signature}, - NetworkVersion, TapAgentContext, + Horizon, Legacy, NetworkVersion, TapAgentContext, }, signers_trimmed, TapReceipt, }, @@ -136,6 +138,8 @@ pub struct SenderAllocationState { allocation_id: Address, /// Address of the sender responsible for this [SenderAllocation] sender: Address, + /// Address of the indexer + indexer_address: Address, /// Watcher containing the escrow accounts escrow_accounts: Receiver, @@ -233,6 +237,7 @@ pub enum SenderAllocationMessage { #[async_trait::async_trait] impl Actor for SenderAllocation where + SenderAllocationState: DatabaseInteractions, T: NetworkVersion, for<'a> &'a Eip712SignedMessage: Into, TapAgentContext: @@ -431,6 +436,7 @@ where T: NetworkVersion, TapAgentContext: RavRead + RavStore + ReceiptDelete + ReceiptRead, + SenderAllocationState: DatabaseInteractions, { /// Helper function to create a [SenderAllocationState] /// given [SenderAllocationArgs] @@ -484,6 +490,7 @@ where sender, escrow_accounts, domain_separator, + indexer_address: config.indexer_address, sender_account_ref: sender_account_ref.clone(), unaggregated_fees: UnaggregatedReceipts::default(), invalid_receipts_fees: UnaggregatedReceipts::default(), @@ -503,106 +510,6 @@ where .await } - /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager - /// with the latest unaggregated fees from the database. - async fn calculate_fee_until_last_id( - &self, - last_id: i64, - ) -> anyhow::Result { - tracing::trace!("calculate_unaggregated_fee()"); - self.tap_manager.remove_obsolete_receipts().await?; - - let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; - let res = sqlx::query!( - r#" - SELECT - MAX(id), - SUM(value), - COUNT(*) - FROM - scalar_tap_receipts - WHERE - allocation_id = $1 - AND id <= $2 - AND signer_address IN (SELECT unnest($3::text[])) - AND timestamp_ns > $4 - "#, - self.allocation_id.encode_hex(), - last_id, - &signers, - BigDecimal::from( - self.latest_rav - .as_ref() - .map(|rav| rav.message.timestamp_ns()) - .unwrap_or_default() - ), - ) - .fetch_one(&self.pgpool) - .await?; - - ensure!( - res.sum.is_none() == res.max.is_none(), - "Exactly one of SUM(value) and MAX(id) is null. This should not happen." - ); - - Ok(UnaggregatedReceipts { - last_id: res.max.unwrap_or(0).try_into()?, - value: res - .sum - .unwrap_or(BigDecimal::from(0)) - .to_string() - .parse::()?, - counter: res - .count - .unwrap_or(0) - .to_u64() - .expect("default value exists, this shouldn't be empty"), - }) - } - - async fn calculate_invalid_receipts_fee(&self) -> anyhow::Result { - tracing::trace!("calculate_invalid_receipts_fee()"); - let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; - - // TODO: Get `rav.timestamp_ns` from the TAP Manager's RAV storage adapter instead? - let res = sqlx::query!( - r#" - SELECT - MAX(id), - SUM(value), - COUNT(*) - FROM - scalar_tap_receipts_invalid - WHERE - allocation_id = $1 - AND signer_address IN (SELECT unnest($2::text[])) - "#, - self.allocation_id.encode_hex(), - &signers - ) - .fetch_one(&self.pgpool) - .await?; - - ensure!( - res.sum.is_none() == res.max.is_none(), - "Exactly one of SUM(value) and MAX(id) is null. This should not happen." - ); - - Ok(UnaggregatedReceipts { - last_id: res.max.unwrap_or(0).try_into()?, - value: res - .sum - .unwrap_or(BigDecimal::from(0)) - .to_string() - .parse::()?, - counter: res - .count - .unwrap_or(0) - .to_u64() - .expect("default value exists, this shouldn't be empty"), - }) - } - async fn request_rav(&mut self) -> anyhow::Result<()> { match self.rav_requester_single().await { Ok(rav) => { @@ -657,8 +564,6 @@ where self.allocation_id, self.sender ); - self.store_invalid_receipts(invalid_receipts.as_slice()) - .await?; // Obtain min/max timestamps to define query let min_timestamp = invalid_receipts .iter() @@ -670,21 +575,11 @@ where .map(|receipt| receipt.signed_receipt().timestamp_ns()) .max() .expect("invalid receipts should not be empty"); + + self.store_invalid_receipts(invalid_receipts).await?; let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; - sqlx::query!( - r#" - DELETE FROM scalar_tap_receipts - WHERE timestamp_ns BETWEEN $1 AND $2 - AND allocation_id = $3 - AND signer_address IN (SELECT unnest($4::text[])); - "#, - BigDecimal::from(min_timestamp), - BigDecimal::from(max_timestamp), - self.allocation_id.encode_hex(), - &signers, - ) - .execute(&self.pgpool) - .await?; + self.delete_receipts_between(&signers, min_timestamp, max_timestamp) + .await?; Err(RavError::AllReceiptsInvalid) } // When it receives both valid and invalid receipts or just valid @@ -716,8 +611,7 @@ where // Save invalid receipts to the database for logs. // TODO: consider doing that in a spawned task? - self.store_invalid_receipts(invalid_receipts.as_slice()) - .await?; + self.store_invalid_receipts(invalid_receipts).await?; } match self @@ -776,47 +670,67 @@ where } } - /// Sends a database query and mark the allocation rav as last - pub async fn mark_rav_last(&self) -> anyhow::Result<()> { - tracing::info!( - sender = %self.sender, - allocation_id = %self.allocation_id, - "Marking rav as last!", + async fn store_invalid_receipts( + &mut self, + receipts: Vec>, + ) -> anyhow::Result<()> { + let fees = receipts + .iter() + .map(|receipt| receipt.signed_receipt().value()) + .sum(); + + let (receipts_v1, receipts_v2): (Vec<_>, Vec<_>) = + receipts.into_iter().partition_map(|r| { + // note: it would be nice if we could get signed_receipt and error by value without + // cloning + let error = r.clone().error().to_string(); + match r.signed_receipt().clone() { + TapReceipt::V1(receipt) => Either::Left((receipt, error)), + TapReceipt::V2(receipt) => Either::Right((receipt, error)), + } + }); + + let (result1, result2) = tokio::join!( + self.store_v1_invalid_receipts(receipts_v1), + self.store_v2_invalid_receipts(receipts_v2), ); - let updated_rows = sqlx::query!( - r#" - UPDATE scalar_tap_ravs - SET last = true - WHERE allocation_id = $1 AND sender_address = $2 - "#, - self.allocation_id.encode_hex(), - self.sender.encode_hex(), - ) - .execute(&self.pgpool) - .await?; + if let Err(err) = result1 { + tracing::error!(%err, "There was an error while storing invalid v1 receipts."); + } - match updated_rows.rows_affected() { - // in case no rav was marked as final - 0 => { - tracing::warn!( - "No RAVs were updated as last for allocation {} and sender {}.", + if let Err(err) = result2 { + tracing::error!(%err, "There was an error while storing invalid v2 receipts."); + } + + self.invalid_receipts_fees.value = self + .invalid_receipts_fees + .value + .checked_add(fees) + .unwrap_or_else(|| { + // This should never happen, but if it does, we want to know about it. + tracing::error!( + "Overflow when adding receipt value {} to invalid receipts fees {} \ + for allocation {} and sender {}. Setting total unaggregated fees to \ + u128::MAX.", + fees, + self.invalid_receipts_fees.value, self.allocation_id, self.sender ); - Ok(()) - } - 1 => Ok(()), - _ => anyhow::bail!( - "Expected exactly one row to be updated in the latest RAVs table, \ - but {} were updated.", - updated_rows.rows_affected() - ), - } + u128::MAX + }); + self.sender_account_ref + .cast(SenderAccountMessage::UpdateInvalidReceiptFees( + self.allocation_id, + self.invalid_receipts_fees, + ))?; + + Ok(()) } - async fn store_invalid_receipts( - &mut self, - receipts: &[ReceiptWithState], + async fn store_v1_invalid_receipts( + &self, + receipts: Vec<(tap_graph::SignedReceipt, String)>, ) -> anyhow::Result<()> { let reciepts_len = receipts.len(); let mut reciepts_signers = Vec::with_capacity(reciepts_len); @@ -827,14 +741,9 @@ where let mut values = Vec::with_capacity(reciepts_len); let mut error_logs = Vec::with_capacity(reciepts_len); - for received_receipt in receipts.iter() { - let receipt = match received_receipt.signed_receipt() { - TapReceipt::V1(receipt) => receipt, - TapReceipt::V2(_) => unimplemented!("V2 not supported"), - }; + for (receipt, receipt_error) in receipts { let allocation_id = receipt.message.allocation_id; let encoded_signature = receipt.signature.as_bytes().to_vec(); - let receipt_error = received_receipt.clone().error().to_string(); let receipt_signer = receipt .recover_signer(&self.domain_separator) .map_err(|e| { @@ -888,66 +797,484 @@ where anyhow!(e) })?; - let fees = receipts - .iter() - .map(|receipt| receipt.signed_receipt().value()) - .sum(); - - self.invalid_receipts_fees.value = self - .invalid_receipts_fees - .value - .checked_add(fees) - .unwrap_or_else(|| { - // This should never happen, but if it does, we want to know about it. - tracing::error!( - "Overflow when adding receipt value {} to invalid receipts fees {} \ - for allocation {} and sender {}. Setting total unaggregated fees to \ - u128::MAX.", - fees, - self.invalid_receipts_fees.value, - self.allocation_id, - self.sender - ); - u128::MAX - }); - self.sender_account_ref - .cast(SenderAccountMessage::UpdateInvalidReceiptFees( - self.allocation_id, - self.invalid_receipts_fees, - ))?; - Ok(()) } - /// Stores a failed Rav, used for logging purposes - async fn store_failed_rav( + async fn store_v2_invalid_receipts( &self, - expected_rav: &T::Rav, - rav: &Eip712SignedMessage, - reason: &str, + receipts: Vec<(tap_graph::v2::SignedReceipt, String)>, ) -> anyhow::Result<()> { - sqlx::query!( - r#" - INSERT INTO scalar_tap_rav_requests_failed ( - allocation_id, - sender_address, - expected_rav, - rav_response, - reason - ) - VALUES ($1, $2, $3, $4, $5) - "#, - self.allocation_id.encode_hex(), - self.sender.encode_hex(), - serde_json::to_value(expected_rav)?, - serde_json::to_value(rav)?, - reason - ) - .execute(&self.pgpool) - .await - .map_err(|e| anyhow!("Failed to store failed RAV: {:?}", e))?; + let reciepts_len = receipts.len(); + let mut reciepts_signers = Vec::with_capacity(reciepts_len); + let mut encoded_signatures = Vec::with_capacity(reciepts_len); + let mut allocation_ids = Vec::with_capacity(reciepts_len); + let mut payers = Vec::with_capacity(reciepts_len); + let mut data_services = Vec::with_capacity(reciepts_len); + let mut service_providers = Vec::with_capacity(reciepts_len); + let mut timestamps = Vec::with_capacity(reciepts_len); + let mut nonces = Vec::with_capacity(reciepts_len); + let mut values = Vec::with_capacity(reciepts_len); + let mut error_logs = Vec::with_capacity(reciepts_len); - Ok(()) + for (receipt, receipt_error) in receipts { + let allocation_id = receipt.message.allocation_id; + let payer = receipt.message.payer; + let data_service = receipt.message.data_service; + let service_provider = receipt.message.service_provider; + let encoded_signature = receipt.signature.as_bytes().to_vec(); + let receipt_signer = receipt + .recover_signer(&self.domain_separator) + .map_err(|e| { + tracing::error!("Failed to recover receipt signer: {}", e); + anyhow!(e) + })?; + tracing::debug!( + "Receipt for allocation {} and signer {} failed reason: {}", + allocation_id.encode_hex(), + receipt_signer.encode_hex(), + receipt_error + ); + reciepts_signers.push(receipt_signer.encode_hex()); + encoded_signatures.push(encoded_signature); + allocation_ids.push(allocation_id.encode_hex()); + payers.push(payer.encode_hex()); + data_services.push(data_service.encode_hex()); + service_providers.push(service_provider.encode_hex()); + timestamps.push(BigDecimal::from(receipt.message.timestamp_ns)); + nonces.push(BigDecimal::from(receipt.message.nonce)); + values.push(BigDecimal::from(BigInt::from(receipt.message.value))); + error_logs.push(receipt_error); + } + sqlx::query!( + r#"INSERT INTO tap_horizon_receipts_invalid ( + signer_address, + signature, + allocation_id, + payer, + data_service, + service_provider, + timestamp_ns, + nonce, + value, + error_log + ) SELECT * FROM UNNEST( + $1::CHAR(40)[], + $2::BYTEA[], + $3::CHAR(40)[], + $4::CHAR(40)[], + $5::CHAR(40)[], + $6::CHAR(40)[], + $7::NUMERIC(20)[], + $8::NUMERIC(20)[], + $9::NUMERIC(40)[], + $10::TEXT[] + )"#, + &reciepts_signers, + &encoded_signatures, + &allocation_ids, + &payers, + &data_services, + &service_providers, + ×tamps, + &nonces, + &values, + &error_logs + ) + .execute(&self.pgpool) + .await + .map_err(|e| { + tracing::error!("Failed to store invalid receipt: {}", e); + anyhow!(e) + })?; + + Ok(()) + } + + /// Stores a failed Rav, used for logging purposes + async fn store_failed_rav( + &self, + expected_rav: &T::Rav, + rav: &Eip712SignedMessage, + reason: &str, + ) -> anyhow::Result<()> { + // Failed Ravs are stored as json, we don't need to have a copy of the table + // TODO update table name? + sqlx::query!( + r#" + INSERT INTO scalar_tap_rav_requests_failed ( + allocation_id, + sender_address, + expected_rav, + rav_response, + reason + ) + VALUES ($1, $2, $3, $4, $5) + "#, + self.allocation_id.encode_hex(), + self.sender.encode_hex(), + serde_json::to_value(expected_rav)?, + serde_json::to_value(rav)?, + reason + ) + .execute(&self.pgpool) + .await + .map_err(|e| anyhow!("Failed to store failed RAV: {:?}", e))?; + + Ok(()) + } +} + +/// Interactions with the database that needs some special treatment depending on the NetworkVersion +pub trait DatabaseInteractions { + /// Delete receipts between `min_timestamp` and `max_timestamp` + fn delete_receipts_between( + &self, + signers: &[String], + min_timestamp: u64, + max_timestamp: u64, + ) -> impl Future> + Send; + /// Calculates fees for invalid receipts + fn calculate_invalid_receipts_fee( + &self, + ) -> impl Future> + Send; + + /// Calculates all receipt fees until provided `last_id` + /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager + /// with the latest unaggregated fees from the database. + fn calculate_fee_until_last_id( + &self, + last_id: i64, + ) -> impl Future> + Send; + + /// Sends a database query and mark the allocation rav as last + fn mark_rav_last(&self) -> impl Future> + Send; +} + +impl DatabaseInteractions for SenderAllocationState { + async fn delete_receipts_between( + &self, + signers: &[String], + min_timestamp: u64, + max_timestamp: u64, + ) -> anyhow::Result<()> { + sqlx::query!( + r#" + DELETE FROM scalar_tap_receipts + WHERE timestamp_ns BETWEEN $1 AND $2 + AND allocation_id = $3 + AND signer_address IN (SELECT unnest($4::text[])); + "#, + BigDecimal::from(min_timestamp), + BigDecimal::from(max_timestamp), + self.allocation_id.encode_hex(), + &signers, + ) + .execute(&self.pgpool) + .await?; + Ok(()) + } + async fn calculate_invalid_receipts_fee(&self) -> anyhow::Result { + tracing::trace!("calculate_invalid_receipts_fee()"); + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; + + let res = sqlx::query!( + r#" + SELECT + MAX(id), + SUM(value), + COUNT(*) + FROM + scalar_tap_receipts_invalid + WHERE + allocation_id = $1 + AND signer_address IN (SELECT unnest($2::text[])) + "#, + self.allocation_id.encode_hex(), + &signers + ) + .fetch_one(&self.pgpool) + .await?; + + ensure!( + res.sum.is_none() == res.max.is_none(), + "Exactly one of SUM(value) and MAX(id) is null. This should not happen." + ); + + Ok(UnaggregatedReceipts { + last_id: res.max.unwrap_or(0).try_into()?, + value: res + .sum + .unwrap_or(BigDecimal::from(0)) + .to_string() + .parse::()?, + counter: res + .count + .unwrap_or(0) + .to_u64() + .expect("default value exists, this shouldn't be empty"), + }) + } + + /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager + /// with the latest unaggregated fees from the database. + async fn calculate_fee_until_last_id( + &self, + last_id: i64, + ) -> anyhow::Result { + tracing::trace!("calculate_unaggregated_fee()"); + self.tap_manager.remove_obsolete_receipts().await?; + + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; + let res = sqlx::query!( + r#" + SELECT + MAX(id), + SUM(value), + COUNT(*) + FROM + scalar_tap_receipts + WHERE + allocation_id = $1 + AND id <= $2 + AND signer_address IN (SELECT unnest($3::text[])) + AND timestamp_ns > $4 + "#, + self.allocation_id.encode_hex(), + last_id, + &signers, + BigDecimal::from( + self.latest_rav + .as_ref() + .map(|rav| rav.message.timestamp_ns()) + .unwrap_or_default() + ), + ) + .fetch_one(&self.pgpool) + .await?; + + ensure!( + res.sum.is_none() == res.max.is_none(), + "Exactly one of SUM(value) and MAX(id) is null. This should not happen." + ); + + Ok(UnaggregatedReceipts { + last_id: res.max.unwrap_or(0).try_into()?, + value: res + .sum + .unwrap_or(BigDecimal::from(0)) + .to_string() + .parse::()?, + counter: res + .count + .unwrap_or(0) + .to_u64() + .expect("default value exists, this shouldn't be empty"), + }) + } + + /// Sends a database query and mark the allocation rav as last + async fn mark_rav_last(&self) -> anyhow::Result<()> { + tracing::info!( + sender = %self.sender, + allocation_id = %self.allocation_id, + "Marking rav as last!", + ); + let updated_rows = sqlx::query!( + r#" + UPDATE scalar_tap_ravs + SET last = true + WHERE allocation_id = $1 AND sender_address = $2 + "#, + self.allocation_id.encode_hex(), + self.sender.encode_hex(), + ) + .execute(&self.pgpool) + .await?; + + match updated_rows.rows_affected() { + // in case no rav was marked as final + 0 => { + tracing::warn!( + "No RAVs were updated as last for allocation {} and sender {}.", + self.allocation_id, + self.sender + ); + Ok(()) + } + 1 => Ok(()), + _ => anyhow::bail!( + "Expected exactly one row to be updated in the latest RAVs table, \ + but {} were updated.", + updated_rows.rows_affected() + ), + } + } +} + +impl DatabaseInteractions for SenderAllocationState { + async fn delete_receipts_between( + &self, + signers: &[String], + min_timestamp: u64, + max_timestamp: u64, + ) -> anyhow::Result<()> { + sqlx::query!( + r#" + DELETE FROM scalar_tap_receipts + WHERE timestamp_ns BETWEEN $1 AND $2 + AND allocation_id = $3 + AND signer_address IN (SELECT unnest($4::text[])); + "#, + BigDecimal::from(min_timestamp), + BigDecimal::from(max_timestamp), + self.allocation_id.encode_hex(), + &signers, + ) + .execute(&self.pgpool) + .await?; + Ok(()) + } + + async fn calculate_invalid_receipts_fee(&self) -> anyhow::Result { + tracing::trace!("calculate_invalid_receipts_fee()"); + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; + + let res = sqlx::query!( + r#" + SELECT + MAX(id), + SUM(value), + COUNT(*) + FROM + tap_horizon_receipts_invalid + WHERE + allocation_id = $1 + AND signer_address IN (SELECT unnest($2::text[])) + "#, + self.allocation_id.encode_hex(), + &signers + ) + .fetch_one(&self.pgpool) + .await?; + + ensure!( + res.sum.is_none() == res.max.is_none(), + "Exactly one of SUM(value) and MAX(id) is null. This should not happen." + ); + + Ok(UnaggregatedReceipts { + last_id: res.max.unwrap_or(0).try_into()?, + value: res + .sum + .unwrap_or(BigDecimal::from(0)) + .to_string() + .parse::()?, + counter: res + .count + .unwrap_or(0) + .to_u64() + .expect("default value exists, this shouldn't be empty"), + }) + } + + async fn calculate_fee_until_last_id( + &self, + last_id: i64, + ) -> anyhow::Result { + tracing::trace!("calculate_unaggregated_fee()"); + self.tap_manager.remove_obsolete_receipts().await?; + + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; + let res = sqlx::query!( + r#" + SELECT + MAX(id), + SUM(value), + COUNT(*) + FROM + tap_horizon_receipts + WHERE + allocation_id = $1 + AND service_provider = $2 + AND id <= $3 + AND signer_address IN (SELECT unnest($4::text[])) + AND timestamp_ns > $5 + "#, + self.allocation_id.encode_hex(), + self.indexer_address.encode_hex(), + last_id, + &signers, + BigDecimal::from( + self.latest_rav + .as_ref() + .map(|rav| rav.message.timestamp_ns()) + .unwrap_or_default() + ), + ) + .fetch_one(&self.pgpool) + .await?; + + ensure!( + res.sum.is_none() == res.max.is_none(), + "Exactly one of SUM(value) and MAX(id) is null. This should not happen." + ); + + Ok(UnaggregatedReceipts { + last_id: res.max.unwrap_or(0).try_into()?, + value: res + .sum + .unwrap_or(BigDecimal::from(0)) + .to_string() + .parse::()?, + counter: res + .count + .unwrap_or(0) + .to_u64() + .expect("default value exists, this shouldn't be empty"), + }) + } + + /// Sends a database query and mark the allocation rav as last + async fn mark_rav_last(&self) -> anyhow::Result<()> { + tracing::info!( + sender = %self.sender, + allocation_id = %self.allocation_id, + "Marking rav as last!", + ); + // TODO add service_provider filter + let updated_rows = sqlx::query!( + r#" + UPDATE tap_horizon_ravs + SET last = true + WHERE + allocation_id = $1 + AND payer = $2 + AND service_provider = $3 + "#, + self.allocation_id.encode_hex(), + self.sender.encode_hex(), + self.indexer_address.encode_hex(), + ) + .execute(&self.pgpool) + .await?; + + match updated_rows.rows_affected() { + // in case no rav was marked as final + 0 => { + tracing::warn!( + "No RAVs were updated as last for allocation {} and sender {}.", + self.allocation_id, + self.sender + ); + Ok(()) + } + 1 => Ok(()), + _ => anyhow::bail!( + "Expected exactly one row to be updated in the latest RAVs table, \ + but {} were updated.", + updated_rows.rows_affected() + ), + } } } @@ -993,6 +1320,7 @@ pub mod tests { agent::{ sender_account::{ReceiptFees, SenderAccountMessage}, sender_accounts_manager::NewReceiptNotification, + sender_allocation::DatabaseInteractions, unaggregated_receipts::UnaggregatedReceipts, }, tap::{context::Legacy, CheckingReceipt}, @@ -1703,7 +2031,7 @@ pub mod tests { let failing_receipts: Vec<_> = join_all(failing_receipts).await; // store the failing receipts - let result = state.store_invalid_receipts(&failing_receipts).await; + let result = state.store_invalid_receipts(failing_receipts).await; // we just store a few and make sure it doesn't fail assert!(result.is_ok()); diff --git a/migrations/20250131122241_tap_horizon_receipts.down.sql b/migrations/20250131122241_tap_horizon_receipts.down.sql index f2f920434..25d3b2965 100644 --- a/migrations/20250131122241_tap_horizon_receipts.down.sql +++ b/migrations/20250131122241_tap_horizon_receipts.down.sql @@ -1,2 +1,3 @@ -- Add down migration script here DROP TABLE IF EXISTS tap_horizon_receipts CASCADE; +DROP TABLE IF EXISTS tap_horizon_receipts_invalid CASCADE; diff --git a/migrations/20250131122241_tap_horizon_receipts.up.sql b/migrations/20250131122241_tap_horizon_receipts.up.sql index 908607306..d5759746c 100644 --- a/migrations/20250131122241_tap_horizon_receipts.up.sql +++ b/migrations/20250131122241_tap_horizon_receipts.up.sql @@ -16,3 +16,22 @@ CREATE TABLE IF NOT EXISTS tap_horizon_receipts ( CREATE INDEX IF NOT EXISTS scalar_tap_receipts_allocation_id_idx ON scalar_tap_receipts (allocation_id); CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_receipts (timestamp_ns); + + +-- This table is used to store invalid receipts (receipts that fail at least one of the checks in the tap-agent). +-- Used for logging and debugging purposes. +CREATE TABLE IF NOT EXISTS tap_horizon_receipts_invalid ( + id BIGSERIAL PRIMARY KEY, -- id being SERIAL is important for the function of tap-agent + signer_address CHAR(40) NOT NULL, + + -- Values below are the individual fields of the EIP-712 receipt + signature BYTEA NOT NULL, + allocation_id CHAR(40) NOT NULL, + payer CHAR(40) NOT NULL, + data_service CHAR(40) NOT NULL, + service_provider CHAR(40) NOT NULL, + timestamp_ns NUMERIC(20) NOT NULL, + nonce NUMERIC(20) NOT NULL, + value NUMERIC(39) NOT NULL, + error_log TEXT NOT NULL DEFAULT '' +);