Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions node/bft/src/helpers/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,23 @@ pub struct PrimarySender<N: Network> {
pub tx_batch_signature: mpsc::Sender<(SocketAddr, BatchSignature<N>)>,
pub tx_batch_certified: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
pub tx_primary_ping: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
pub tx_unconfirmed_solution: mpsc::Sender<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<()>>)>,
pub tx_unconfirmed_transaction: mpsc::Sender<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<()>>)>,
pub tx_unconfirmed_solution: mpsc::Sender<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<bool>>)>,
pub tx_unconfirmed_transaction:
mpsc::Sender<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<bool>>)>,
}

impl<N: Network> PrimarySender<N> {
/// Sends the unconfirmed solution to the primary.
///
/// # Returns
/// - `Ok(true)` if the solution was added to the ready queue.
/// - `Ok(false)` if the solution was valid but already exists in the ready queue.
/// - `Err(anyhow::Error)` if the solution was invalid.
pub async fn send_unconfirmed_solution(
&self,
solution_id: SolutionID<N>,
solution: Data<Solution<N>>,
) -> Result<()> {
) -> Result<bool> {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the unconfirmed solution to the primary.
Expand All @@ -149,11 +155,16 @@ impl<N: Network> PrimarySender<N> {
}

/// Sends the unconfirmed transaction to the primary.
///
/// # Returns
/// - `Ok(true)` if the transaction was added to the ready queue.
/// - `Ok(false)` if the transaction was valid but already exists in the ready queue.
/// - `Err(anyhow::Error)` if the transaction was invalid.
pub async fn send_unconfirmed_transaction(
&self,
transaction_id: N::TransactionID,
transaction: Data<Transaction<N>>,
) -> Result<()> {
) -> Result<bool> {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the unconfirmed transaction to the primary.
Expand All @@ -169,9 +180,9 @@ pub struct PrimaryReceiver<N: Network> {
pub rx_batch_signature: mpsc::Receiver<(SocketAddr, BatchSignature<N>)>,
pub rx_batch_certified: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
pub rx_primary_ping: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
pub rx_unconfirmed_solution: mpsc::Receiver<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<()>>)>,
pub rx_unconfirmed_solution: mpsc::Receiver<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<bool>>)>,
pub rx_unconfirmed_transaction:
mpsc::Receiver<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<()>>)>,
mpsc::Receiver<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<bool>>)>,
}

/// Initializes the primary channels.
Expand Down
22 changes: 16 additions & 6 deletions node/bft/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,16 @@ impl<N: Network> Worker<N> {

/// Handles the incoming unconfirmed solution.
/// Note: This method assumes the incoming solution is valid and does not exist in the ledger.
///
/// # Returns
/// - `Ok(true)` if the solution was added to the ready queue.
/// - `Ok(false)` if the solution was valid but already exists in the ready queue.
/// - `Err(anyhow::Error)` if the solution is invalid.
pub(crate) async fn process_unconfirmed_solution(
&self,
solution_id: SolutionID<N>,
solution: Data<Solution<N>>,
) -> Result<()> {
) -> Result<bool> {
// Construct the transmission.
let transmission = Transmission::Solution(solution.clone());
// Compute the checksum.
Expand All @@ -355,7 +360,7 @@ impl<N: Network> Worker<N> {
self.pending.remove(transmission_id, Some(transmission.clone()));
// Check if the solution exists.
if self.contains_transmission(transmission_id) {
bail!("Solution '{}.{}' already exists.", fmt_id(solution_id), fmt_id(checksum).dimmed());
return Ok(false);
}
// Check that the solution is well-formed and unique.
self.ledger.check_solution_basic(solution_id, solution).await?;
Expand All @@ -368,15 +373,20 @@ impl<N: Network> Worker<N> {
fmt_id(checksum).dimmed()
);
}
Ok(())
Ok(true)
}

/// Handles the incoming unconfirmed transaction.
///
/// # Returns
/// - `Ok(true)` if the transaction was added to the ready queue.
/// - `Ok(false)` if the transaction was valid but already exists in the ready queue.
/// - `Err(anyhow::Error)` if the transaction was invalid.
pub(crate) async fn process_unconfirmed_transaction(
&self,
transaction_id: N::TransactionID,
transaction: Data<Transaction<N>>,
) -> Result<()> {
) -> Result<bool> {
// Construct the transmission.
let transmission = Transmission::Transaction(transaction.clone());
// Compute the checksum.
Expand All @@ -387,7 +397,7 @@ impl<N: Network> Worker<N> {
self.pending.remove(transmission_id, Some(transmission.clone()));
// Check if the transaction ID exists.
if self.contains_transmission(transmission_id) {
bail!("Transaction '{}.{}' already exists.", fmt_id(transaction_id), fmt_id(checksum).dimmed());
return Ok(false);
}
// Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
let transaction = spawn_blocking!({
Expand All @@ -408,7 +418,7 @@ impl<N: Network> Worker<N> {
fmt_id(checksum).dimmed()
);
}
Ok(())
Ok(true)
}
}

Expand Down
79 changes: 54 additions & 25 deletions node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ use snarkvm::{
puzzle::{Solution, SolutionID},
},
prelude::*,
utilities::flatten_error,
};

use aleo_std::StorageMode;
use anyhow::Result;
use anyhow::{Context, Result};
use colored::Colorize;
use indexmap::IndexMap;
#[cfg(feature = "locktick")]
Expand Down Expand Up @@ -347,13 +348,21 @@ impl<N: Network> Consensus<N> {
let solution_id = solution.id();
trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
// Send the unconfirmed solution to the primary.
if let Err(e) = self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
// If the BFT is synced, then log the warning.
if self.bft.is_synced() {
// If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
if self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
};
match self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
Ok(true) => {}
Ok(false) => debug!(
"Unable to add unconfirmed solution '{}' to the memory pool. Already exists.",
fmt_id(solution_id)
),
Err(err) => {
// If the node is synced and the occurs after the first 10 blocks of an epoch, log it as a warning, otherwise ignore.
if self.bft.is_synced() && self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
let err = err.context(format!(
"Unable to add unconfirmed solution '{}' to the memory pool",
fmt_id(solution_id)
));
warn!("{}", flatten_error(err));
}
}
}
}
Expand Down Expand Up @@ -446,15 +455,21 @@ impl<N: Network> Consensus<N> {
};
trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id));
// Send the unconfirmed transaction to the primary.
if let Err(e) =
self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
{
// If the BFT is synced, then log the warning.
if self.bft.is_synced() {
warn!(
"Failed to add unconfirmed {tx_type_str} transaction '{}' to the memory pool - {e}",
fmt_id(transaction_id)
);
match self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await {
Ok(true) => {}
Ok(false) => debug!(
"Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool. Already exists.",
fmt_id(transaction_id)
),
Err(err) => {
// If the BFT is synced, then log the warning.
if self.bft.is_synced() {
let err = err.context(format!(
"Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool",
fmt_id(transaction_id)
));
warn!("{}", flatten_error(err));
}
}
}
}
Expand Down Expand Up @@ -508,11 +523,11 @@ impl<N: Network> Consensus<N> {
// Try to advance to the next block.
let self_ = self.clone();
let transmissions_ = transmissions.clone();
let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_).with_context(|| "Unable to advance to the next block") };

// If the block failed to advance, reinsert the transmissions into the memory pool.
if let Err(e) = &result {
error!("Unable to advance to the next block - {e}");
error!("{}", flatten_error(e));
// On failure, reinsert the transmissions into the memory pool.
self.reinsert_transmissions(transmissions).await;
}
Expand Down Expand Up @@ -611,27 +626,41 @@ impl<N: Network> Consensus<N> {
// Iterate over the transmissions.
for (transmission_id, transmission) in transmissions.into_iter() {
// Reinsert the transmission into the memory pool.
if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
warn!(
"Unable to reinsert transmission {}.{} into the memory pool - {e}",
match self.reinsert_transmission(transmission_id, transmission).await {
Ok(true) => {}
Ok(false) => debug!(
"Unable to reinsert transmission {}.{} into the memory pool. Already exists.",
fmt_id(transmission_id),
fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
);
),
Err(err) => {
let err = err.context(format!(
"Unable to reinsert transmission {}.{} into the memory pool",
fmt_id(transmission_id),
fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
));
warn!("{}", flatten_error(err));
}
}
}
}

/// Reinserts the given transmission into the memory pool.
///
/// # Returns
/// - `Ok(true)` if the transmission was added to the memory pool.
/// - `Ok(false)` if the transmission was valid but already exists in the memory pool.
/// - `Err(anyhow::Error)` if the transmission was invalid.
async fn reinsert_transmission(
&self,
transmission_id: TransmissionID<N>,
transmission: Transmission<N>,
) -> Result<()> {
) -> Result<bool> {
// Initialize a callback sender and receiver.
let (callback, callback_receiver) = oneshot::channel();
// Send the transmission to the primary.
match (transmission_id, transmission) {
(TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
(TransmissionID::Ratification, Transmission::Ratification) => return Ok(true),
(TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
// Send the solution to the primary.
self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
Expand Down
9 changes: 7 additions & 2 deletions node/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ use snarkos_node_cdn::CdnBlockSync;
use snarkos_node_network::NodeType;
use snarkos_node_rest::Rest;
use snarkos_node_router::{
Heartbeat, Inbound, Outbound, Router, Routing,
Heartbeat,
Inbound,
Outbound,
Router,
Routing,
messages::{Message, UnconfirmedSolution, UnconfirmedTransaction},
};
use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
Expand Down Expand Up @@ -58,7 +62,8 @@ use std::{
sync::{
Arc,
atomic::{
AtomicBool, AtomicUsize,
AtomicBool,
AtomicUsize,
Ordering::{Acquire, Relaxed},
},
},
Expand Down
12 changes: 9 additions & 3 deletions node/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ use snarkos_node_consensus::Consensus;
use snarkos_node_network::{NodeType, PeerPoolHandling};
use snarkos_node_rest::Rest;
use snarkos_node_router::{
Heartbeat, Inbound, Outbound, Router, Routing,
Heartbeat,
Inbound,
Outbound,
Router,
Routing,
messages::{PuzzleResponse, UnconfirmedSolution, UnconfirmedTransaction},
};
use snarkos_node_sync::{BlockSync, Ping};
Expand All @@ -33,7 +37,8 @@ use snarkos_node_tcp::{
protocols::{Disconnect, Handshake, OnConnect, Reading},
};
use snarkvm::prelude::{
Ledger, Network,
Ledger,
Network,
block::{Block, Header},
puzzle::Solution,
store::ConsensusStorage,
Expand Down Expand Up @@ -483,7 +488,8 @@ impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
mod tests {
use super::*;
use snarkvm::prelude::{
MainnetV0, VM,
MainnetV0,
VM,
store::{ConsensusStore, helpers::memory::ConsensusMemory},
};

Expand Down