diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index 5cca4ddddf..5337216085 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -129,17 +129,23 @@ pub struct PrimarySender { pub tx_batch_signature: mpsc::Sender<(SocketAddr, BatchSignature)>, pub tx_batch_certified: mpsc::Sender<(SocketAddr, Data>)>, pub tx_primary_ping: mpsc::Sender<(SocketAddr, Data>)>, - pub tx_unconfirmed_solution: mpsc::Sender<(SolutionID, Data>, oneshot::Sender>)>, - pub tx_unconfirmed_transaction: mpsc::Sender<(N::TransactionID, Data>, oneshot::Sender>)>, + pub tx_unconfirmed_solution: mpsc::Sender<(SolutionID, Data>, oneshot::Sender>)>, + pub tx_unconfirmed_transaction: + mpsc::Sender<(N::TransactionID, Data>, oneshot::Sender>)>, } impl PrimarySender { /// Sends the unconfirmed solution to the primary. + /// + /// # Returns + /// - `Ok(true)` if the solution was added to the ready queue. + /// - `Ok(false)` if the solution was valid but already exists in the ready queue. + /// - `Err(anyhow::Error)` if the solution was invalid. pub async fn send_unconfirmed_solution( &self, solution_id: SolutionID, solution: Data>, - ) -> Result<()> { + ) -> Result { // Initialize a callback sender and receiver. let (callback_sender, callback_receiver) = oneshot::channel(); // Send the unconfirmed solution to the primary. @@ -149,11 +155,16 @@ impl PrimarySender { } /// Sends the unconfirmed transaction to the primary. + /// + /// # Returns + /// - `Ok(true)` if the transaction was added to the ready queue. + /// - `Ok(false)` if the transaction was valid but already exists in the ready queue. + /// - `Err(anyhow::Error)` if the transaction was invalid. pub async fn send_unconfirmed_transaction( &self, transaction_id: N::TransactionID, transaction: Data>, - ) -> Result<()> { + ) -> Result { // Initialize a callback sender and receiver. let (callback_sender, callback_receiver) = oneshot::channel(); // Send the unconfirmed transaction to the primary. @@ -169,9 +180,9 @@ pub struct PrimaryReceiver { pub rx_batch_signature: mpsc::Receiver<(SocketAddr, BatchSignature)>, pub rx_batch_certified: mpsc::Receiver<(SocketAddr, Data>)>, pub rx_primary_ping: mpsc::Receiver<(SocketAddr, Data>)>, - pub rx_unconfirmed_solution: mpsc::Receiver<(SolutionID, Data>, oneshot::Sender>)>, + pub rx_unconfirmed_solution: mpsc::Receiver<(SolutionID, Data>, oneshot::Sender>)>, pub rx_unconfirmed_transaction: - mpsc::Receiver<(N::TransactionID, Data>, oneshot::Sender>)>, + mpsc::Receiver<(N::TransactionID, Data>, oneshot::Sender>)>, } /// Initializes the primary channels. diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index a4c2fda37e..2ca99e35d1 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -340,11 +340,16 @@ impl Worker { /// Handles the incoming unconfirmed solution. /// Note: This method assumes the incoming solution is valid and does not exist in the ledger. + /// + /// # Returns + /// - `Ok(true)` if the solution was added to the ready queue. + /// - `Ok(false)` if the solution was valid but already exists in the ready queue. + /// - `Err(anyhow::Error)` if the solution is invalid. pub(crate) async fn process_unconfirmed_solution( &self, solution_id: SolutionID, solution: Data>, - ) -> Result<()> { + ) -> Result { // Construct the transmission. let transmission = Transmission::Solution(solution.clone()); // Compute the checksum. @@ -355,7 +360,7 @@ impl Worker { self.pending.remove(transmission_id, Some(transmission.clone())); // Check if the solution exists. if self.contains_transmission(transmission_id) { - bail!("Solution '{}.{}' already exists.", fmt_id(solution_id), fmt_id(checksum).dimmed()); + return Ok(false); } // Check that the solution is well-formed and unique. self.ledger.check_solution_basic(solution_id, solution).await?; @@ -368,15 +373,20 @@ impl Worker { fmt_id(checksum).dimmed() ); } - Ok(()) + Ok(true) } /// Handles the incoming unconfirmed transaction. + /// + /// # Returns + /// - `Ok(true)` if the transaction was added to the ready queue. + /// - `Ok(false)` if the transaction was valid but already exists in the ready queue. + /// - `Err(anyhow::Error)` if the transaction was invalid. pub(crate) async fn process_unconfirmed_transaction( &self, transaction_id: N::TransactionID, transaction: Data>, - ) -> Result<()> { + ) -> Result { // Construct the transmission. let transmission = Transmission::Transaction(transaction.clone()); // Compute the checksum. @@ -387,7 +397,7 @@ impl Worker { self.pending.remove(transmission_id, Some(transmission.clone())); // Check if the transaction ID exists. if self.contains_transmission(transmission_id) { - bail!("Transaction '{}.{}' already exists.", fmt_id(transaction_id), fmt_id(checksum).dimmed()); + return Ok(false); } // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error. let transaction = spawn_blocking!({ @@ -408,7 +418,7 @@ impl Worker { fmt_id(checksum).dimmed() ); } - Ok(()) + Ok(true) } } diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index e7a8877543..a8d62ee9ed 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -50,10 +50,11 @@ use snarkvm::{ puzzle::{Solution, SolutionID}, }, prelude::*, + utilities::flatten_error, }; use aleo_std::StorageMode; -use anyhow::Result; +use anyhow::{Context, Result}; use colored::Colorize; use indexmap::IndexMap; #[cfg(feature = "locktick")] @@ -347,13 +348,21 @@ impl Consensus { let solution_id = solution.id(); trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id)); // Send the unconfirmed solution to the primary. - if let Err(e) = self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await { - // If the BFT is synced, then log the warning. - if self.bft.is_synced() { - // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore. - if self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 { - warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id)) - }; + match self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await { + Ok(true) => {} + Ok(false) => debug!( + "Unable to add unconfirmed solution '{}' to the memory pool. Already exists.", + fmt_id(solution_id) + ), + Err(err) => { + // If the node is synced and the occurs after the first 10 blocks of an epoch, log it as a warning, otherwise ignore. + if self.bft.is_synced() && self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 { + let err = err.context(format!( + "Unable to add unconfirmed solution '{}' to the memory pool", + fmt_id(solution_id) + )); + warn!("{}", flatten_error(err)); + } } } } @@ -446,15 +455,21 @@ impl Consensus { }; trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id)); // Send the unconfirmed transaction to the primary. - if let Err(e) = - self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await - { - // If the BFT is synced, then log the warning. - if self.bft.is_synced() { - warn!( - "Failed to add unconfirmed {tx_type_str} transaction '{}' to the memory pool - {e}", - fmt_id(transaction_id) - ); + match self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await { + Ok(true) => {} + Ok(false) => debug!( + "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool. Already exists.", + fmt_id(transaction_id) + ), + Err(err) => { + // If the BFT is synced, then log the warning. + if self.bft.is_synced() { + let err = err.context(format!( + "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool", + fmt_id(transaction_id) + )); + warn!("{}", flatten_error(err)); + } } } } @@ -508,11 +523,11 @@ impl Consensus { // Try to advance to the next block. let self_ = self.clone(); let transmissions_ = transmissions.clone(); - let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) }; + let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_).with_context(|| "Unable to advance to the next block") }; // If the block failed to advance, reinsert the transmissions into the memory pool. if let Err(e) = &result { - error!("Unable to advance to the next block - {e}"); + error!("{}", flatten_error(e)); // On failure, reinsert the transmissions into the memory pool. self.reinsert_transmissions(transmissions).await; } @@ -611,27 +626,41 @@ impl Consensus { // Iterate over the transmissions. for (transmission_id, transmission) in transmissions.into_iter() { // Reinsert the transmission into the memory pool. - if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await { - warn!( - "Unable to reinsert transmission {}.{} into the memory pool - {e}", + match self.reinsert_transmission(transmission_id, transmission).await { + Ok(true) => {} + Ok(false) => debug!( + "Unable to reinsert transmission {}.{} into the memory pool. Already exists.", fmt_id(transmission_id), fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed() - ); + ), + Err(err) => { + let err = err.context(format!( + "Unable to reinsert transmission {}.{} into the memory pool", + fmt_id(transmission_id), + fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed() + )); + warn!("{}", flatten_error(err)); + } } } } /// Reinserts the given transmission into the memory pool. + /// + /// # Returns + /// - `Ok(true)` if the transmission was added to the memory pool. + /// - `Ok(false)` if the transmission was valid but already exists in the memory pool. + /// - `Err(anyhow::Error)` if the transmission was invalid. async fn reinsert_transmission( &self, transmission_id: TransmissionID, transmission: Transmission, - ) -> Result<()> { + ) -> Result { // Initialize a callback sender and receiver. let (callback, callback_receiver) = oneshot::channel(); // Send the transmission to the primary. match (transmission_id, transmission) { - (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()), + (TransmissionID::Ratification, Transmission::Ratification) => return Ok(true), (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => { // Send the solution to the primary. self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?; diff --git a/node/router/tests/cleanups.rs b/node/router/tests/cleanups.rs index 4d5e05ae9c..74a529c0f0 100644 --- a/node/router/tests/cleanups.rs +++ b/node/router/tests/cleanups.rs @@ -39,10 +39,10 @@ async fn test_connection_cleanups() { // Create 2 routers of random types. let mut nodes = Vec::with_capacity(2); for _ in 0..2 { - let node = match rng.gen_range(0..3) % 3 { + let node = match rng.gen_range(0..=1) { 0 => client(0, 1, &mut rng).await, 1 => prover(0, 1, &mut rng).await, - 2 => validator(0, 1, &[], false, &mut rng).await, + // TODO => validator(0, 1, &[], false, &mut rng).await, _ => unreachable!(), }; diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index 30c1e62969..1017298448 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -23,7 +23,11 @@ use snarkos_node_cdn::CdnBlockSync; use snarkos_node_network::NodeType; use snarkos_node_rest::Rest; use snarkos_node_router::{ - Heartbeat, Inbound, Outbound, Router, Routing, + Heartbeat, + Inbound, + Outbound, + Router, + Routing, messages::{Message, UnconfirmedSolution, UnconfirmedTransaction}, }; use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators}; @@ -58,7 +62,8 @@ use std::{ sync::{ Arc, atomic::{ - AtomicBool, AtomicUsize, + AtomicBool, + AtomicUsize, Ordering::{Acquire, Relaxed}, }, }, diff --git a/node/src/validator/mod.rs b/node/src/validator/mod.rs index af4ddddcd3..7fb4507db9 100644 --- a/node/src/validator/mod.rs +++ b/node/src/validator/mod.rs @@ -24,7 +24,11 @@ use snarkos_node_consensus::Consensus; use snarkos_node_network::{NodeType, PeerPoolHandling}; use snarkos_node_rest::Rest; use snarkos_node_router::{ - Heartbeat, Inbound, Outbound, Router, Routing, + Heartbeat, + Inbound, + Outbound, + Router, + Routing, messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction}, }; use snarkos_node_sync::{BlockSync, Ping}; @@ -33,7 +37,8 @@ use snarkos_node_tcp::{ protocols::{Disconnect, Handshake, OnConnect, Reading}, }; use snarkvm::prelude::{ - Ledger, Network, + Ledger, + Network, block::{Block, Header}, puzzle::Solution, store::ConsensusStorage, @@ -483,7 +488,8 @@ impl> NodeInterface for Validator { mod tests { use super::*; use snarkvm::prelude::{ - MainnetV0, VM, + MainnetV0, + VM, store::{ConsensusStore, helpers::memory::ConsensusMemory}, };