From 9315908e68d4baaf05c981f683e33c0b1ec3621b Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Tue, 24 Sep 2024 11:04:32 +0530 Subject: [PATCH 1/2] Bulk insert of failed receipts --- tap-agent/src/agent/sender_allocation.rs | 62 +++++++++++++++--------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index a25b6df71..575af231e 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -685,7 +685,32 @@ impl SenderAllocationState { &mut self, receipts: &[ReceiptWithState], ) -> Result<()> { - for received_receipt in receipts.iter() { + let mut query_str = String::from( + "INSERT INTO scalar_tap_receipts_invalid ( + signer_address, + signature, + allocation_id, + timestamp_ns, + nonce, + value + ) + VALUES "); + for i in 0..receipts.len() { + query_str = query_str +"(" + +"$"+&(i*6+1).to_string()+", " + +"$"+&(i*6+2).to_string()+", " + +"$"+&(i*6+3).to_string()+", " + +"$"+&(i*6+4).to_string()+", " + +"$"+&(i*6+5).to_string()+", " + +"$"+&(i*6+6).to_string() + +")"; + if i!=receipts.len()-1 { + query_str = query_str +" , " + } + } + query_str = query_str+";"; + let mut query = sqlx::query(&query_str); + for received_receipt in receipts.iter(){ let receipt = received_receipt.signed_receipt(); let allocation_id = receipt.message.allocation_id; let encoded_signature = receipt.signature.as_bytes().to_vec(); @@ -696,31 +721,20 @@ impl SenderAllocationState { error!("Failed to recover receipt signer: {}", e); anyhow!(e) })?; - sqlx::query!( - r#" - INSERT INTO scalar_tap_receipts_invalid ( - signer_address, - signature, - allocation_id, - timestamp_ns, - nonce, - value, - error_log - ) - VALUES ($1, $2, $3, $4, $5, $6, $7) - "#, - receipt_signer.encode_hex(), - encoded_signature, + debug!( + "Receipt for allocation {} and signer {} failed reason: {}", allocation_id.encode_hex(), - BigDecimal::from(receipt.message.timestamp_ns), - BigDecimal::from(receipt.message.nonce), - BigDecimal::from(BigInt::from(receipt.message.value)), + receipt_signer.encode_hex(), receipt_error - ) - .execute(&self.pgpool) - .await - .map_err(|e| anyhow!("Failed to store invalid receipt: {:?}", e))?; - } + ); + query = query.bind(receipt_signer.encode_hex()); + query = query.bind(encoded_signature); + query = query.bind(allocation_id.encode_hex()); + query = query.bind(BigDecimal::from(receipt.message.timestamp_ns)); + query = query.bind(BigDecimal::from(receipt.message.nonce)); + query = query.bind(BigDecimal::from(BigInt::from(receipt.message.value))); + } + query.execute(&self.pgpool).await?; let fees = receipts .iter() .map(|receipt| receipt.signed_receipt().message.value) From 5bcbcd86783a014ec6e4db1c3159a7162dd72cb1 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Tue, 24 Sep 2024 22:22:42 +0530 Subject: [PATCH 2/2] query macro --- tap-agent/src/agent/sender_allocation.rs | 77 ++++++++++++++---------- 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 575af231e..85a66bcf6 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -685,31 +685,14 @@ impl SenderAllocationState { &mut self, receipts: &[ReceiptWithState], ) -> Result<()> { - let mut query_str = String::from( - "INSERT INTO scalar_tap_receipts_invalid ( - signer_address, - signature, - allocation_id, - timestamp_ns, - nonce, - value - ) - VALUES "); - for i in 0..receipts.len() { - query_str = query_str +"(" - +"$"+&(i*6+1).to_string()+", " - +"$"+&(i*6+2).to_string()+", " - +"$"+&(i*6+3).to_string()+", " - +"$"+&(i*6+4).to_string()+", " - +"$"+&(i*6+5).to_string()+", " - +"$"+&(i*6+6).to_string() - +")"; - if i!=receipts.len()-1 { - query_str = query_str +" , " - } - } - query_str = query_str+";"; - let mut query = sqlx::query(&query_str); + let reciepts_len = receipts.len(); + let mut reciepts_signers = Vec::with_capacity(reciepts_len); + let mut encoded_signatures = Vec::with_capacity(reciepts_len); + let mut allocation_ids = Vec::with_capacity(reciepts_len); + let mut timestamps = Vec::with_capacity(reciepts_len); + let mut nounces = Vec::with_capacity(reciepts_len); + let mut values = Vec::with_capacity(reciepts_len); + for received_receipt in receipts.iter(){ let receipt = received_receipt.signed_receipt(); let allocation_id = receipt.message.allocation_id; @@ -727,14 +710,42 @@ impl SenderAllocationState { receipt_signer.encode_hex(), receipt_error ); - query = query.bind(receipt_signer.encode_hex()); - query = query.bind(encoded_signature); - query = query.bind(allocation_id.encode_hex()); - query = query.bind(BigDecimal::from(receipt.message.timestamp_ns)); - query = query.bind(BigDecimal::from(receipt.message.nonce)); - query = query.bind(BigDecimal::from(BigInt::from(receipt.message.value))); - } - query.execute(&self.pgpool).await?; + reciepts_signers.push(receipt_signer.encode_hex()); + encoded_signatures.push(encoded_signature); + allocation_ids.push(allocation_id.encode_hex()); + timestamps.push(BigDecimal::from(receipt.message.timestamp_ns)); + nounces.push(BigDecimal::from(receipt.message.nonce)); + values.push(BigDecimal::from(BigInt::from(receipt.message.value))); + } + sqlx::query!( + r#"INSERT INTO scalar_tap_receipts_invalid ( + signer_address, + signature, + allocation_id, + timestamp_ns, + nonce, + value + ) SELECT * FROM UNNEST( + $1::CHAR(40)[], + $2::BYTEA[], + $3::CHAR(40)[], + $4::NUMERIC(20)[], + $5::NUMERIC(20)[], + $6::NUMERIC(40)[] + )"#, + &signers, + &signatures, + &allocation_ids, + ×tamps, + &nonces, + &values, + ).execute(&self.pgpool) + .await + .map_err(|e| { + error!("Failed to store receipt: {}", e); + anyhow!(e) + })?; + let fees = receipts .iter() .map(|receipt| receipt.signed_receipt().message.value)