Skip to content

Commit 53d52e4

Browse files
committed
fix(node/bft): ensure gateways are shut down during prod tests
1 parent 8dad471 commit 53d52e4

File tree

2 files changed

+67
-27
lines changed

2 files changed

+67
-27
lines changed

node/bft/src/gateway.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,14 @@ impl<N: Network> Gateway<N> {
203203
(None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
204204
(Some(ip), _) => ip,
205205
};
206+
207+
// Allow at most as many connections as the maximum committe size.
208+
// and fail if the chosen port is not available.
209+
let mut tcp_config = Config::new(ip, Committee::<N>::max_committee_size()?);
210+
tcp_config.allow_random_port = false;
211+
206212
// Initialize the TCP stack.
207-
let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
213+
let tcp = Tcp::new(tcp_config);
208214

209215
// Return the gateway.
210216
Ok(Self {
@@ -1690,17 +1696,11 @@ mod prop_tests {
16901696

16911697
impl GatewayAddress {
16921698
fn ip(&self) -> Option<SocketAddr> {
1693-
if let GatewayAddress::Prod(ip) = self {
1694-
return *ip;
1695-
}
1696-
None
1699+
if let GatewayAddress::Prod(ip) = self { *ip } else { None }
16971700
}
16981701

16991702
fn port(&self) -> Option<u16> {
1700-
if let GatewayAddress::Dev(port) = self {
1701-
return Some(*port as u16);
1702-
}
1703-
None
1703+
if let GatewayAddress::Dev(port) = self { Some(*port as u16) } else { None }
17041704
}
17051705
}
17061706

@@ -1757,8 +1757,8 @@ mod prop_tests {
17571757
.boxed()
17581758
}
17591759

1760-
#[proptest]
1761-
fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1760+
#[proptest(async = "tokio")]
1761+
async fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
17621762
let (storage, _, private_key, dev) = input;
17631763
let account = Account::try_from(private_key).unwrap();
17641764

@@ -1772,10 +1772,13 @@ mod prop_tests {
17721772
let tcp_config = gateway.tcp().config();
17731773
assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
17741774
assert_eq!(gateway.account().address(), account.address());
1775+
1776+
// Ensure the gateway shuts down and unbinds the TCP port.
1777+
gateway.shut_down().await;
17751778
}
17761779

1777-
#[proptest]
1778-
fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1780+
#[proptest(async = "tokio")]
1781+
async fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
17791782
let (storage, _, private_key, dev) = input;
17801783
let account = Account::try_from(private_key).unwrap();
17811784

@@ -1794,6 +1797,9 @@ mod prop_tests {
17941797
let tcp_config = gateway.tcp().config();
17951798
assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
17961799
assert_eq!(gateway.account().address(), account.address());
1800+
1801+
// Ensure the gateway shuts down and unbinds the TCP port.
1802+
gateway.shut_down().await;
17971803
}
17981804

17991805
#[proptest(async = "tokio")]
@@ -1839,6 +1845,9 @@ mod prop_tests {
18391845
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
18401846
);
18411847
assert_eq!(gateway.num_workers(), workers.len() as u8);
1848+
1849+
// Ensure the gateway shuts down and unbinds the TCP port.
1850+
gateway.shut_down().await;
18421851
}
18431852

18441853
#[proptest]

node/bft/src/primary.rs

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,16 @@ use snarkos_node_bft_ledger_service::LedgerService;
4343
use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
4444
use snarkvm::{
4545
console::{
46+
network::ConsensusVersion,
4647
prelude::*,
4748
types::{Address, Field},
4849
},
4950
ledger::{
5051
block::Transaction,
52+
committee::Committee,
5153
narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
5254
puzzle::{Solution, SolutionID},
5355
},
54-
prelude::{ConsensusVersion, committee::Committee},
5556
};
5657

5758
use aleo_std::StorageMode;
@@ -127,7 +128,7 @@ impl<N: Network> Primary<N> {
127128
/// The maximum number of unconfirmed transmissions to send to the primary.
128129
pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
129130

130-
/// Initializes a new primary instance.
131+
/// Initializes a new primary instance and starts the gateway.
131132
#[allow(clippy::too_many_arguments)]
132133
pub async fn new(
133134
account: Account<N>,
@@ -676,15 +677,17 @@ impl<N: Network> Primary<N> {
676677
// Prepare the previous batch certificate IDs.
677678
let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
678679
// Sign the batch header and construct the proposal.
679-
let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
680-
&private_key,
681-
round,
682-
current_timestamp,
683-
committee_id,
684-
transmission_ids,
685-
previous_certificate_ids,
686-
&mut rand::thread_rng()
687-
))
680+
let (batch_header, proposal) = spawn_blocking!({
681+
BatchHeader::new(
682+
&private_key,
683+
round,
684+
current_timestamp,
685+
committee_id,
686+
transmission_ids,
687+
previous_certificate_ids,
688+
&mut rand::thread_rng(),
689+
)
690+
})
688691
.and_then(|batch_header| {
689692
Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
690693
.map(|proposal| (batch_header, proposal))
@@ -848,7 +851,7 @@ impl<N: Network> Primary<N> {
848851
// Ensure the batch header from the peer is valid.
849852
let (storage, header) = (self.storage.clone(), batch_header.clone());
850853
let missing_transmissions =
851-
spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
854+
spawn_blocking!({ storage.check_batch_header(&header, missing_transmissions, Default::default()) })?;
852855
// Inserts the missing transmissions into the workers.
853856
self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
854857

@@ -1546,7 +1549,7 @@ impl<N: Network> Primary<N> {
15461549
if !self.storage.contains_certificate(certificate.id()) {
15471550
// Store the batch certificate.
15481551
let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1549-
spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1552+
spawn_blocking!({ storage.insert_certificate(certificate_, missing_transmissions, Default::default()) })?;
15501553
debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
15511554
// If a BFT sender was provided, send the round and certificate to the BFT.
15521555
if let Some(cb) = self.primary_callback.get() {
@@ -1763,6 +1766,8 @@ impl<N: Network> Primary<N> {
17631766
info!("Shutting down the primary...");
17641767
// Remove the callback.
17651768
self.primary_callback.clear();
1769+
// Stop syncing.
1770+
self.sync.shut_down().await;
17661771
// Shut down the workers.
17671772
self.workers.iter().for_each(|worker| worker.shut_down());
17681773
// Abort the tasks.
@@ -1917,6 +1922,8 @@ impl<N: Network> Primary<N> {
19171922

19181923
#[cfg(test)]
19191924
mod tests {
1925+
use std::net::{Ipv4Addr, SocketAddrV4};
1926+
19201927
use super::*;
19211928
use snarkos_node_bft_ledger_service::MockLedgerService;
19221929
use snarkos_node_bft_storage_service::BFTMemoryService;
@@ -1962,11 +1969,16 @@ mod tests {
19621969
let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
19631970
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
19641971

1972+
// Pick a random port so we can run tests concurrently
1973+
let any_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0));
1974+
19651975
// Initialize the primary.
19661976
let account = accounts[account_index].1.clone();
19671977
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
19681978
let mut primary =
1969-
Primary::new(account, storage, ledger, block_sync, None, &[], StorageMode::Test(None), None).await.unwrap();
1979+
Primary::new(account, storage, ledger, block_sync, Some(any_addr), &[], StorageMode::Test(None), None)
1980+
.await
1981+
.unwrap();
19701982

19711983
// Construct a worker instance.
19721984
primary.workers = Arc::from([Worker::new(
@@ -2192,6 +2204,7 @@ mod tests {
21922204
}
21932205
}
21942206

2207+
#[tracing_test::traced_test]
21952208
#[tokio::test]
21962209
async fn test_propose_batch() {
21972210
let mut rng = TestRng::default();
@@ -2213,6 +2226,7 @@ mod tests {
22132226
assert!(primary.proposed_batch.read().is_some());
22142227
}
22152228

2229+
#[tracing_test::traced_test]
22162230
#[tokio::test]
22172231
async fn test_propose_batch_with_no_transmissions() {
22182232
let mut rng = TestRng::default();
@@ -2226,6 +2240,7 @@ mod tests {
22262240
assert!(primary.proposed_batch.read().is_some());
22272241
}
22282242

2243+
#[tracing_test::traced_test]
22292244
#[tokio::test]
22302245
async fn test_propose_batch_in_round() {
22312246
let round = 3;
@@ -2251,6 +2266,7 @@ mod tests {
22512266
assert!(primary.proposed_batch.read().is_some());
22522267
}
22532268

2269+
#[tracing_test::traced_test]
22542270
#[tokio::test]
22552271
async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
22562272
let round = 3;
@@ -2323,6 +2339,7 @@ mod tests {
23232339
);
23242340
}
23252341

2342+
#[tracing_test::traced_test]
23262343
#[tokio::test]
23272344
async fn test_propose_batch_over_spend_limit() {
23282345
let mut rng = TestRng::default();
@@ -2360,6 +2377,7 @@ mod tests {
23602377
assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
23612378
}
23622379

2380+
#[tracing_test::traced_test]
23632381
#[tokio::test]
23642382
async fn test_batch_propose_from_peer() {
23652383
let mut rng = TestRng::default();
@@ -2399,6 +2417,7 @@ mod tests {
23992417
);
24002418
}
24012419

2420+
#[tracing_test::traced_test]
24022421
#[tokio::test]
24032422
async fn test_batch_propose_from_peer_when_not_synced() {
24042423
let mut rng = TestRng::default();
@@ -2436,6 +2455,7 @@ mod tests {
24362455
);
24372456
}
24382457

2458+
#[tracing_test::traced_test]
24392459
#[tokio::test]
24402460
async fn test_batch_propose_from_peer_in_round() {
24412461
let round = 2;
@@ -2476,6 +2496,7 @@ mod tests {
24762496
primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
24772497
}
24782498

2499+
#[tracing_test::traced_test]
24792500
#[tokio::test]
24802501
async fn test_batch_propose_from_peer_wrong_round() {
24812502
let mut rng = TestRng::default();
@@ -2518,6 +2539,7 @@ mod tests {
25182539
);
25192540
}
25202541

2542+
#[tracing_test::traced_test]
25212543
#[tokio::test]
25222544
async fn test_batch_propose_from_peer_in_round_wrong_round() {
25232545
let round = 4;
@@ -2564,6 +2586,7 @@ mod tests {
25642586
}
25652587

25662588
/// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected.
2589+
#[tracing_test::traced_test]
25672590
#[tokio::test]
25682591
async fn test_batch_propose_from_peer_with_past_timestamp() {
25692592
let round = 2;
@@ -2614,6 +2637,7 @@ mod tests {
26142637
}
26152638

26162639
/// Check that proposals rejected that have timestamps older than the previous proposal.
2640+
#[tracing_test::traced_test]
26172641
#[tokio::test]
26182642
async fn test_batch_propose_from_peer_over_spend_limit() {
26192643
let mut rng = TestRng::default();
@@ -2679,6 +2703,7 @@ mod tests {
26792703
);
26802704
}
26812705

2706+
#[tracing_test::traced_test]
26822707
#[tokio::test]
26832708
async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
26842709
let round = 3;
@@ -2712,6 +2737,7 @@ mod tests {
27122737
assert!(primary.proposed_batch.read().is_some());
27132738
}
27142739

2740+
#[tracing_test::traced_test]
27152741
#[tokio::test]
27162742
async fn test_propose_batch_with_storage_round_behind_proposal() {
27172743
let round = 5;
@@ -2742,6 +2768,7 @@ mod tests {
27422768
assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
27432769
}
27442770

2771+
#[tracing_test::traced_test]
27452772
#[tokio::test(flavor = "multi_thread")]
27462773
async fn test_batch_signature_from_peer() {
27472774
let mut rng = TestRng::default();
@@ -2778,6 +2805,7 @@ mod tests {
27782805
assert_eq!(primary.current_round(), round + 1);
27792806
}
27802807

2808+
#[tracing_test::traced_test]
27812809
#[tokio::test(flavor = "multi_thread")]
27822810
async fn test_batch_signature_from_peer_in_round() {
27832811
let round = 5;
@@ -2817,6 +2845,7 @@ mod tests {
28172845
assert_eq!(primary.current_round(), round + 1);
28182846
}
28192847

2848+
#[tracing_test::traced_test]
28202849
#[tokio::test]
28212850
async fn test_batch_signature_from_peer_no_quorum() {
28222851
let mut rng = TestRng::default();
@@ -2852,6 +2881,7 @@ mod tests {
28522881
assert_eq!(primary.current_round(), round);
28532882
}
28542883

2884+
#[tracing_test::traced_test]
28552885
#[tokio::test]
28562886
async fn test_batch_signature_from_peer_in_round_no_quorum() {
28572887
let round = 7;
@@ -2890,6 +2920,7 @@ mod tests {
28902920
assert_eq!(primary.current_round(), round);
28912921
}
28922922

2923+
#[tracing_test::traced_test]
28932924
#[tokio::test]
28942925
async fn test_insert_certificate_with_aborted_transmissions() {
28952926
let round = 3;

0 commit comments

Comments
 (0)