Skip to content

Commit 74c00c7

Browse files
committed
misc(bft): replace PrimarySender with a callback
1 parent 0de4fd8 commit 74c00c7

File tree

19 files changed

+468
-584
lines changed

19 files changed

+468
-584
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/bft/Cargo.toml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,15 @@ cuda = [
3838
"snarkos-node-bft-ledger-service/cuda",
3939
"snarkos-node-sync/cuda"
4040
]
41+
persistent-storage = [ "snarkos-node-bft-storage-service/persistent" ]
4142
test = [
4243
# "snarkvm/test" this breaks some of the tests
43-
"snarkvm/test-helpers",
4444
"snarkos-node-bft-ledger-service/test",
45-
"snarkos-node-bft-storage-service/test"
45+
"snarkos-node-bft-storage-service/test",
46+
"test-helpers"
47+
]
48+
test-helpers = [
49+
"snarkvm/test-helpers",
4650
]
4751
serial = [ "snarkos-node-bft-ledger-service/serial" ]
4852

@@ -206,3 +210,8 @@ workspace = true
206210

207211
[dev-dependencies.mockall]
208212
version = "0.13"
213+
214+
[[test]]
215+
name = "gateway-e2e"
216+
path = "./tests/gateway_e2e.rs"
217+
required-features = [ "test-helpers" ]

node/bft/examples/simple_node.rs

Lines changed: 23 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,7 @@ extern crate tracing;
2020
extern crate snarkos_node_metrics as metrics;
2121

2222
use snarkos_account::Account;
23-
use snarkos_node_bft::{
24-
BFT,
25-
BftCallback,
26-
MEMORY_POOL_PORT,
27-
Primary,
28-
helpers::{PrimarySender, Storage, init_primary_channels},
29-
};
23+
use snarkos_node_bft::{BFT, BftCallback, MEMORY_POOL_PORT, Primary, helpers::Storage};
3024
use snarkos_node_bft_ledger_service::TranslucentLedgerService;
3125
use snarkos_node_bft_storage_service::BFTMemoryService;
3226
use snarkos_node_sync::BlockSync;
@@ -69,7 +63,7 @@ use std::{
6963
str::FromStr,
7064
sync::{Arc, Mutex, OnceLock},
7165
};
72-
use tokio::{net::TcpListener, sync::oneshot};
66+
use tokio::net::TcpListener;
7367
use tracing_subscriber::{
7468
layer::{Layer, SubscriberExt},
7569
util::SubscriberInitExt,
@@ -119,13 +113,7 @@ pub fn initialize_logger(verbosity: u8) {
119113
/**************************************************************************************************/
120114

121115
/// Starts the BFT instance.
122-
pub async fn start_bft(
123-
node_id: u16,
124-
num_nodes: u16,
125-
peers: HashMap<u16, SocketAddr>,
126-
) -> Result<(BFT<CurrentNetwork>, PrimarySender<CurrentNetwork>)> {
127-
// Initialize the primary channels.
128-
let (sender, receiver) = init_primary_channels();
116+
pub async fn start_bft(node_id: u16, num_nodes: u16, peers: HashMap<u16, SocketAddr>) -> Result<BFT<CurrentNetwork>> {
129117
// Initialize the components.
130118
let (committee, account) = initialize_components(node_id, num_nodes)?;
131119
// Initialize the translucent ledger service.
@@ -148,26 +136,25 @@ pub async fn start_bft(
148136
let consensus_handler = Arc::new(ConsensusHandler {});
149137
// Initialize the BFT instance.
150138
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
151-
let mut bft =
152-
BFT::<CurrentNetwork>::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode, None)?;
139+
let bft =
140+
BFT::<CurrentNetwork>::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode, None)
141+
.await?;
153142
// Run the BFT instance.
154-
bft.run(None, Some(consensus_handler), sender.clone(), receiver).await?;
143+
bft.run(None, Some(consensus_handler)).await?;
155144
// Retrieve the BFT's primary.
156145
let primary = bft.primary();
157146
// Handle OS signals.
158147
handle_signals(primary);
159148
// Return the BFT instance.
160-
Ok((bft, sender))
149+
Ok(bft)
161150
}
162151

163152
/// Starts the primary instance.
164153
pub async fn start_primary(
165154
node_id: u16,
166155
num_nodes: u16,
167156
peers: HashMap<u16, SocketAddr>,
168-
) -> Result<(Primary<CurrentNetwork>, PrimarySender<CurrentNetwork>)> {
169-
// Initialize the primary channels.
170-
let (sender, receiver) = init_primary_channels();
157+
) -> Result<Primary<CurrentNetwork>> {
171158
// Initialize the components.
172159
let (committee, account) = initialize_components(node_id, num_nodes)?;
173160
// Initialize the translucent ledger service.
@@ -188,7 +175,7 @@ pub async fn start_primary(
188175
let trusted_validators = trusted_validators(node_id, num_nodes, peers);
189176
// Initialize the primary instance.
190177
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
191-
let mut primary = Primary::<CurrentNetwork>::new(
178+
let primary = Primary::<CurrentNetwork>::new(
192179
account,
193180
storage,
194181
ledger,
@@ -197,13 +184,14 @@ pub async fn start_primary(
197184
&trusted_validators,
198185
storage_mode,
199186
None,
200-
)?;
187+
)
188+
.await?;
201189
// Run the primary instance.
202-
primary.run(None, None, None, sender.clone(), receiver).await?;
190+
primary.run(None, None, None).await?;
203191
// Handle OS signals.
204192
handle_signals(&primary);
205193
// Return the primary instance.
206-
Ok((primary, sender))
194+
Ok(primary)
207195
}
208196

209197
/// Initialize the translucent ledger service.
@@ -371,8 +359,7 @@ fn handle_signals(primary: &Primary<CurrentNetwork>) {
371359
/**************************************************************************************************/
372360

373361
/// Fires *fake* unconfirmed solutions at the node.
374-
fn fire_unconfirmed_solutions(sender: &PrimarySender<CurrentNetwork>, node_id: u16, interval_ms: u64) {
375-
let tx_unconfirmed_solution = sender.tx_unconfirmed_solution.clone();
362+
fn fire_unconfirmed_solutions(primary: Primary<CurrentNetwork>, node_id: u16, interval_ms: u64) {
376363
tokio::task::spawn(async move {
377364
// This RNG samples the *same* fake solutions for all nodes.
378365
let mut shared_rng = rand_chacha::ChaChaRng::seed_from_u64(123456789);
@@ -396,13 +383,8 @@ fn fire_unconfirmed_solutions(sender: &PrimarySender<CurrentNetwork>, node_id: u
396383
// Sample a random fake solution ID and solution.
397384
let (solution_id, solution) =
398385
if counter % 2 == 0 { sample(&mut shared_rng) } else { sample(&mut unique_rng) };
399-
// Initialize a callback sender and receiver.
400-
let (callback, callback_receiver) = oneshot::channel();
401386
// Send the fake solution.
402-
if let Err(e) = tx_unconfirmed_solution.send((solution_id, solution, callback)).await {
403-
error!("Failed to send unconfirmed solution: {e}");
404-
}
405-
let _ = callback_receiver.await;
387+
let _ = primary.process_unconfirmed_solution(solution_id, solution).await;
406388
// Increment the counter.
407389
counter += 1;
408390
// Sleep briefly.
@@ -412,8 +394,7 @@ fn fire_unconfirmed_solutions(sender: &PrimarySender<CurrentNetwork>, node_id: u
412394
}
413395

414396
/// Fires *fake* unconfirmed transactions at the node.
415-
fn fire_unconfirmed_transactions(sender: &PrimarySender<CurrentNetwork>, node_id: u16, interval_ms: u64) {
416-
let tx_unconfirmed_transaction = sender.tx_unconfirmed_transaction.clone();
397+
fn fire_unconfirmed_transactions(primary: Primary<CurrentNetwork>, node_id: u16, interval_ms: u64) {
417398
tokio::task::spawn(async move {
418399
// This RNG samples the *same* fake transactions for all nodes.
419400
let mut shared_rng = rand_chacha::ChaChaRng::seed_from_u64(123456789);
@@ -438,13 +419,8 @@ fn fire_unconfirmed_transactions(sender: &PrimarySender<CurrentNetwork>, node_id
438419
loop {
439420
// Sample a random fake transaction ID and transaction.
440421
let (id, transaction) = if counter % 2 == 0 { sample(&mut shared_rng) } else { sample(&mut unique_rng) };
441-
// Initialize a callback sender and receiver.
442-
let (callback, callback_receiver) = oneshot::channel();
443422
// Send the fake transaction.
444-
if let Err(e) = tx_unconfirmed_transaction.send((id, transaction, callback)).await {
445-
error!("Failed to send unconfirmed transaction: {e}");
446-
}
447-
let _ = callback_receiver.await;
423+
let _ = primary.process_unconfirmed_transaction(id, transaction).await;
448424
// Increment the counter.
449425
counter += 1;
450426
// Sleep briefly.
@@ -583,14 +559,14 @@ async fn main() -> Result<()> {
583559
let mut bft_holder = None;
584560

585561
// Start the node.
586-
let (primary, sender) = match args.mode {
562+
let primary = match args.mode {
587563
Mode::Bft => {
588564
// Start the BFT.
589-
let (bft, sender) = start_bft(args.id, args.num_nodes, peers).await?;
565+
let bft = start_bft(args.id, args.num_nodes, peers).await?;
590566
// Set the BFT holder.
591567
bft_holder = Some(bft.clone());
592568
// Return the primary and sender.
593-
(bft.primary().clone(), sender)
569+
bft.primary().clone()
594570
}
595571
Mode::Narwhal => start_primary(args.id, args.num_nodes, peers).await?,
596572
};
@@ -602,7 +578,7 @@ async fn main() -> Result<()> {
602578
match (args.fire_transmissions, args.fire_solutions) {
603579
// Note: We allow the user to overload the solutions rate, even when the 'fire-transmissions' flag is enabled.
604580
(Some(rate), _) | (_, Some(rate)) => {
605-
fire_unconfirmed_solutions(&sender, args.id, rate.unwrap_or(DEFAULT_INTERVAL_MS));
581+
fire_unconfirmed_solutions(primary.clone(), args.id, rate.unwrap_or(DEFAULT_INTERVAL_MS));
606582
}
607583
_ => (),
608584
};
@@ -611,7 +587,7 @@ async fn main() -> Result<()> {
611587
match (args.fire_transmissions, args.fire_transactions) {
612588
// Note: We allow the user to overload the transactions rate, even when the 'fire-transmissions' flag is enabled.
613589
(Some(rate), _) | (_, Some(rate)) => {
614-
fire_unconfirmed_transactions(&sender, args.id, rate.unwrap_or(DEFAULT_INTERVAL_MS));
590+
fire_unconfirmed_transactions(primary.clone(), args.id, rate.unwrap_or(DEFAULT_INTERVAL_MS));
615591
}
616592
_ => (),
617593
};

node/bft/ledger-service/src/traits.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@ use snarkvm::{
1717
ledger::{
1818
block::{Block, Transaction},
1919
committee::Committee,
20-
narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
20+
narwhal::{BatchCertificate, Data, Transmission, TransmissionID},
2121
puzzle::{Solution, SolutionID},
2222
},
2323
prelude::{Address, ConsensusVersion, Field, Network, Result},
2424
};
2525

26+
#[cfg(feature = "ledger-write")]
2627
use indexmap::IndexMap;
28+
#[cfg(feature = "ledger-write")]
29+
use snarkvm::ledger::narwhal::Subdag;
30+
2731
use std::{fmt::Debug, ops::Range};
2832

2933
#[async_trait]

0 commit comments

Comments
 (0)