From 9315908e68d4baaf05c981f683e33c0b1ec3621b Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Tue, 24 Sep 2024 11:04:32 +0530 Subject: [PATCH 1/4] Bulk insert of failed receipts --- tap-agent/src/agent/sender_allocation.rs | 62 +++++++++++++++--------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index a25b6df71..575af231e 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -685,7 +685,32 @@ impl SenderAllocationState { &mut self, receipts: &[ReceiptWithState], ) -> Result<()> { - for received_receipt in receipts.iter() { + let mut query_str = String::from( + "INSERT INTO scalar_tap_receipts_invalid ( + signer_address, + signature, + allocation_id, + timestamp_ns, + nonce, + value + ) + VALUES "); + for i in 0..receipts.len() { + query_str = query_str +"(" + +"$"+&(i*6+1).to_string()+", " + +"$"+&(i*6+2).to_string()+", " + +"$"+&(i*6+3).to_string()+", " + +"$"+&(i*6+4).to_string()+", " + +"$"+&(i*6+5).to_string()+", " + +"$"+&(i*6+6).to_string() + +")"; + if i!=receipts.len()-1 { + query_str = query_str +" , " + } + } + query_str = query_str+";"; + let mut query = sqlx::query(&query_str); + for received_receipt in receipts.iter(){ let receipt = received_receipt.signed_receipt(); let allocation_id = receipt.message.allocation_id; let encoded_signature = receipt.signature.as_bytes().to_vec(); @@ -696,31 +721,20 @@ impl SenderAllocationState { error!("Failed to recover receipt signer: {}", e); anyhow!(e) })?; - sqlx::query!( - r#" - INSERT INTO scalar_tap_receipts_invalid ( - signer_address, - signature, - allocation_id, - timestamp_ns, - nonce, - value, - error_log - ) - VALUES ($1, $2, $3, $4, $5, $6, $7) - "#, - receipt_signer.encode_hex(), - encoded_signature, + debug!( + "Receipt for allocation {} and signer {} failed reason: {}", allocation_id.encode_hex(), - BigDecimal::from(receipt.message.timestamp_ns), - BigDecimal::from(receipt.message.nonce), - BigDecimal::from(BigInt::from(receipt.message.value)), + receipt_signer.encode_hex(), receipt_error - ) - .execute(&self.pgpool) - .await - .map_err(|e| anyhow!("Failed to store invalid receipt: {:?}", e))?; - } + ); + query = query.bind(receipt_signer.encode_hex()); + query = query.bind(encoded_signature); + query = query.bind(allocation_id.encode_hex()); + query = query.bind(BigDecimal::from(receipt.message.timestamp_ns)); + query = query.bind(BigDecimal::from(receipt.message.nonce)); + query = query.bind(BigDecimal::from(BigInt::from(receipt.message.value))); + } + query.execute(&self.pgpool).await?; let fees = receipts .iter() .map(|receipt| receipt.signed_receipt().message.value) From 5bcbcd86783a014ec6e4db1c3159a7162dd72cb1 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Tue, 24 Sep 2024 22:22:42 +0530 Subject: [PATCH 2/4] query macro --- tap-agent/src/agent/sender_allocation.rs | 77 ++++++++++++++---------- 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 575af231e..85a66bcf6 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -685,31 +685,14 @@ impl SenderAllocationState { &mut self, receipts: &[ReceiptWithState], ) -> Result<()> { - let mut query_str = String::from( - "INSERT INTO scalar_tap_receipts_invalid ( - signer_address, - signature, - allocation_id, - timestamp_ns, - nonce, - value - ) - VALUES "); - for i in 0..receipts.len() { - query_str = query_str +"(" - +"$"+&(i*6+1).to_string()+", " - +"$"+&(i*6+2).to_string()+", " - +"$"+&(i*6+3).to_string()+", " - +"$"+&(i*6+4).to_string()+", " - +"$"+&(i*6+5).to_string()+", " - +"$"+&(i*6+6).to_string() - +")"; - if i!=receipts.len()-1 { - query_str = query_str +" , " - } - } - query_str = query_str+";"; - let mut query = sqlx::query(&query_str); + let reciepts_len = receipts.len(); + let mut reciepts_signers = Vec::with_capacity(reciepts_len); + let mut encoded_signatures = Vec::with_capacity(reciepts_len); + let mut allocation_ids = Vec::with_capacity(reciepts_len); + let mut timestamps = Vec::with_capacity(reciepts_len); + let mut nounces = Vec::with_capacity(reciepts_len); + let mut values = Vec::with_capacity(reciepts_len); + for received_receipt in receipts.iter(){ let receipt = received_receipt.signed_receipt(); let allocation_id = receipt.message.allocation_id; @@ -727,14 +710,42 @@ impl SenderAllocationState { receipt_signer.encode_hex(), receipt_error ); - query = query.bind(receipt_signer.encode_hex()); - query = query.bind(encoded_signature); - query = query.bind(allocation_id.encode_hex()); - query = query.bind(BigDecimal::from(receipt.message.timestamp_ns)); - query = query.bind(BigDecimal::from(receipt.message.nonce)); - query = query.bind(BigDecimal::from(BigInt::from(receipt.message.value))); - } - query.execute(&self.pgpool).await?; + reciepts_signers.push(receipt_signer.encode_hex()); + encoded_signatures.push(encoded_signature); + allocation_ids.push(allocation_id.encode_hex()); + timestamps.push(BigDecimal::from(receipt.message.timestamp_ns)); + nounces.push(BigDecimal::from(receipt.message.nonce)); + values.push(BigDecimal::from(BigInt::from(receipt.message.value))); + } + sqlx::query!( + r#"INSERT INTO scalar_tap_receipts_invalid ( + signer_address, + signature, + allocation_id, + timestamp_ns, + nonce, + value + ) SELECT * FROM UNNEST( + $1::CHAR(40)[], + $2::BYTEA[], + $3::CHAR(40)[], + $4::NUMERIC(20)[], + $5::NUMERIC(20)[], + $6::NUMERIC(40)[] + )"#, + &signers, + &signatures, + &allocation_ids, + ×tamps, + &nonces, + &values, + ).execute(&self.pgpool) + .await + .map_err(|e| { + error!("Failed to store receipt: {}", e); + anyhow!(e) + })?; + let fees = receipts .iter() .map(|receipt| receipt.signed_receipt().message.value) From f98aaf370474db15fda6c838b744c27467c0ca90 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Fri, 27 Sep 2024 23:39:44 +0530 Subject: [PATCH 3/4] sqlx bulk insert after running compiling codes --- .env | 1 + ...18688afa91dc0b8eeb8e551b271fc73c1157e.json | 19 ++++++++++++++++++ ...d07007990cc244f598b763ec5470515efe019.json | 6 ------ ...fe7861ff17c6b8de4365a39f28099f5711613.json | 20 ------------------- migrations/20230915230734_tap_ravs.down.sql | 4 ++-- tap-agent/src/agent/sender_allocation.rs | 17 ++++++---------- 6 files changed, 28 insertions(+), 39 deletions(-) create mode 100644 .env create mode 100644 .sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json delete mode 100644 .sqlx/query-6c365bc1b0728ec8d9f1239d7bffe7861ff17c6b8de4365a39f28099f5711613.json diff --git a/.env b/.env new file mode 100644 index 000000000..4b5dbc3c9 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +DATABASE_URL = postgres://taslim:6318@localhost:5432/test \ No newline at end of file diff --git a/.sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json b/.sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json new file mode 100644 index 000000000..4ebee3864 --- /dev/null +++ b/.sqlx/query-0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO scalar_tap_receipts_invalid (\n signer_address,\n signature,\n allocation_id,\n timestamp_ns,\n nonce,\n value\n ) SELECT * FROM UNNEST(\n $1::CHAR(40)[],\n $2::BYTEA[],\n $3::CHAR(40)[],\n $4::NUMERIC(20)[],\n $5::NUMERIC(20)[],\n $6::NUMERIC(40)[]\n )", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "BpcharArray", + "ByteaArray", + "BpcharArray", + "NumericArray", + "NumericArray", + "NumericArray" + ] + }, + "nullable": [] + }, + "hash": "0d4e055d87b4496202041f3c28618688afa91dc0b8eeb8e551b271fc73c1157e" +} diff --git a/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json b/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json index 698533680..ff0584398 100644 --- a/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json +++ b/.sqlx/query-56c3678866ffe0ec2eed7290394d07007990cc244f598b763ec5470515efe019.json @@ -37,11 +37,6 @@ "ordinal": 6, "name": "value", "type_info": "Numeric" - }, - { - "ordinal": 7, - "name": "error_log", - "type_info": "Text" } ], "parameters": { @@ -54,7 +49,6 @@ false, false, false, - false, false ] }, diff --git a/.sqlx/query-6c365bc1b0728ec8d9f1239d7bffe7861ff17c6b8de4365a39f28099f5711613.json b/.sqlx/query-6c365bc1b0728ec8d9f1239d7bffe7861ff17c6b8de4365a39f28099f5711613.json deleted file mode 100644 index 1e45b7ef7..000000000 --- a/.sqlx/query-6c365bc1b0728ec8d9f1239d7bffe7861ff17c6b8de4365a39f28099f5711613.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO scalar_tap_receipts_invalid (\n signer_address,\n signature,\n allocation_id,\n timestamp_ns,\n nonce,\n value,\n error_log\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bpchar", - "Bytea", - "Bpchar", - "Numeric", - "Numeric", - "Numeric", - "Text" - ] - }, - "nullable": [] - }, - "hash": "6c365bc1b0728ec8d9f1239d7bffe7861ff17c6b8de4365a39f28099f5711613" -} diff --git a/migrations/20230915230734_tap_ravs.down.sql b/migrations/20230915230734_tap_ravs.down.sql index 0ad115b3b..a411faed2 100644 --- a/migrations/20230915230734_tap_ravs.down.sql +++ b/migrations/20230915230734_tap_ravs.down.sql @@ -1,2 +1,2 @@ -DROP TABLE IF EXISTS scalar_tap_ravs CASCADE; -DROP TABLE IF EXISTS scalar_tap_rav_requests_failed CASCADE; +-- DROP TABLE IF EXISTS scalar_tap_ravs CASCADE; +-- DROP TABLE IF EXISTS scalar_tap_rav_requests_failed CASCADE; diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 85a66bcf6..f7b069faa 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -693,7 +693,7 @@ impl SenderAllocationState { let mut nounces = Vec::with_capacity(reciepts_len); let mut values = Vec::with_capacity(reciepts_len); - for received_receipt in receipts.iter(){ + for received_receipt in receipts.iter() { let receipt = received_receipt.signed_receipt(); let allocation_id = receipt.message.allocation_id; let encoded_signature = receipt.signature.as_bytes().to_vec(); @@ -704,12 +704,6 @@ impl SenderAllocationState { error!("Failed to recover receipt signer: {}", e); anyhow!(e) })?; - debug!( - "Receipt for allocation {} and signer {} failed reason: {}", - allocation_id.encode_hex(), - receipt_signer.encode_hex(), - receipt_error - ); reciepts_signers.push(receipt_signer.encode_hex()); encoded_signatures.push(encoded_signature); allocation_ids.push(allocation_id.encode_hex()); @@ -733,13 +727,14 @@ impl SenderAllocationState { $5::NUMERIC(20)[], $6::NUMERIC(40)[] )"#, - &signers, - &signatures, + &reciepts_signers, + &encoded_signatures, &allocation_ids, ×tamps, - &nonces, + &nounces, &values, - ).execute(&self.pgpool) + ) + .execute(&self.pgpool) .await .map_err(|e| { error!("Failed to store receipt: {}", e); From bcaae0d86faf82f1d3246dddbd37779f435fd4ae Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Fri, 27 Sep 2024 23:41:29 +0530 Subject: [PATCH 4/4] sqlx bulk insert after running compiling codes --- .env | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .env diff --git a/.env b/.env deleted file mode 100644 index 4b5dbc3c9..000000000 --- a/.env +++ /dev/null @@ -1 +0,0 @@ -DATABASE_URL = postgres://taslim:6318@localhost:5432/test \ No newline at end of file