Skip to content

Commit 9972ea0

Browse files
authored
Reduce latency of cloning network sender using Arc pointers (aptos-labs#12103)
* Avoid cloning network sender using Arc pointers * Removing a clone * 100 node sweep test * Removing a few clone operations * reset forge test * Removing some clones * Removing clones
1 parent bc50dae commit 9972ea0

File tree

8 files changed

+31
-27
lines changed

8 files changed

+31
-27
lines changed

consensus/src/block_storage/sync_manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ impl BlockStore {
346346
async fn sync_to_highest_commit_cert(
347347
&self,
348348
ledger_info: &LedgerInfoWithSignatures,
349-
network: &NetworkSender,
349+
network: &Arc<NetworkSender>,
350350
) {
351351
// if the block exists between commit root and ordered root
352352
if self.commit_root().round() < ledger_info.commit_info().round()
@@ -404,15 +404,15 @@ impl BlockStore {
404404

405405
/// BlockRetriever is used internally to retrieve blocks
406406
pub struct BlockRetriever {
407-
network: NetworkSender,
407+
network: Arc<NetworkSender>,
408408
preferred_peer: Author,
409409
validator_addresses: Vec<AccountAddress>,
410410
max_blocks_to_request: u64,
411411
}
412412

413413
impl BlockRetriever {
414414
pub fn new(
415-
network: NetworkSender,
415+
network: Arc<NetworkSender>,
416416
preferred_peer: Author,
417417
validator_addresses: Vec<AccountAddress>,
418418
max_blocks_to_request: u64,

consensus/src/epoch_manager.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -681,7 +681,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
681681
ledger_data: LedgerRecoveryData,
682682
onchain_consensus_config: OnChainConsensusConfig,
683683
epoch_state: Arc<EpochState>,
684-
network_sender: NetworkSender,
684+
network_sender: Arc<NetworkSender>,
685685
) {
686686
let (recovery_manager_tx, recovery_manager_rx) = aptos_channel::new(
687687
QueueStyle::LIFO,
@@ -731,7 +731,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
731731
self.quorum_store_to_mempool_sender.clone(),
732732
self.config.mempool_txn_pull_timeout_ms,
733733
self.storage.aptos_db().clone(),
734-
network_sender.clone(),
734+
network_sender,
735735
epoch_state.verifier.clone(),
736736
self.config.safety_rules.backend.clone(),
737737
self.quorum_store_storage.clone(),
@@ -827,7 +827,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
827827
recovery_data: RecoveryData,
828828
epoch_state: Arc<EpochState>,
829829
onchain_consensus_config: OnChainConsensusConfig,
830-
network_sender: NetworkSender,
830+
network_sender: Arc<NetworkSender>,
831831
payload_client: Arc<dyn PayloadClient>,
832832
payload_manager: Arc<PayloadManager>,
833833
features: Features,
@@ -1085,7 +1085,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
10851085
initial_data,
10861086
epoch_state,
10871087
consensus_config,
1088-
network_sender,
1088+
Arc::new(network_sender),
10891089
payload_client,
10901090
payload_manager,
10911091
features,
@@ -1098,7 +1098,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
10981098
ledger_data,
10991099
consensus_config,
11001100
epoch_state,
1101-
network_sender,
1101+
Arc::new(network_sender),
11021102
)
11031103
.await
11041104
},

consensus/src/network.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,12 @@ impl NetworkSender {
295295
/// The future is fulfilled as soon as the message is put into the mpsc channel to network
296296
/// internal (to provide back pressure), it does not indicate the message is delivered or sent
297297
/// out.
298-
async fn broadcast(&mut self, msg: ConsensusMsg) {
298+
async fn broadcast(&self, msg: ConsensusMsg) {
299299
fail_point!("consensus::send::any", |_| ());
300300
// Directly send the message to ourself without going through network.
301301
let self_msg = Event::Message(self.author, msg.clone());
302-
if let Err(err) = self.self_sender.send(self_msg).await {
302+
let mut self_sender = self.self_sender.clone();
303+
if let Err(err) = self_sender.send(self_msg).await {
303304
error!("Error broadcasting to self: {:?}", err);
304305
}
305306

@@ -369,25 +370,25 @@ impl NetworkSender {
369370
}
370371
}
371372

372-
pub async fn broadcast_proposal(&mut self, proposal_msg: ProposalMsg) {
373+
pub async fn broadcast_proposal(&self, proposal_msg: ProposalMsg) {
373374
fail_point!("consensus::send::broadcast_proposal", |_| ());
374375
let msg = ConsensusMsg::ProposalMsg(Box::new(proposal_msg));
375376
self.broadcast(msg).await
376377
}
377378

378-
pub async fn broadcast_sync_info(&mut self, sync_info_msg: SyncInfo) {
379+
pub async fn broadcast_sync_info(&self, sync_info_msg: SyncInfo) {
379380
fail_point!("consensus::send::broadcast_sync_info", |_| ());
380381
let msg = ConsensusMsg::SyncInfo(Box::new(sync_info_msg));
381382
self.broadcast(msg).await
382383
}
383384

384-
pub async fn broadcast_timeout_vote(&mut self, timeout_vote_msg: VoteMsg) {
385+
pub async fn broadcast_timeout_vote(&self, timeout_vote_msg: VoteMsg) {
385386
fail_point!("consensus::send::broadcast_timeout_vote", |_| ());
386387
let msg = ConsensusMsg::VoteMsg(Box::new(timeout_vote_msg));
387388
self.broadcast(msg).await
388389
}
389390

390-
pub async fn broadcast_epoch_change(&mut self, epoch_change_proof: EpochChangeProof) {
391+
pub async fn broadcast_epoch_change(&self, epoch_change_proof: EpochChangeProof) {
391392
fail_point!("consensus::send::broadcast_epoch_change", |_| ());
392393
let msg = ConsensusMsg::EpochChangeProof(Box::new(epoch_change_proof));
393394
self.broadcast(msg).await

consensus/src/quorum_store/batch_coordinator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub enum BatchCoordinatorCommand {
2323

2424
pub struct BatchCoordinator {
2525
my_peer_id: PeerId,
26-
network_sender: NetworkSender,
26+
network_sender: Arc<NetworkSender>,
2727
batch_store: Arc<BatchStore>,
2828
max_batch_txns: u64,
2929
max_batch_bytes: u64,
@@ -43,7 +43,7 @@ impl BatchCoordinator {
4343
) -> Self {
4444
Self {
4545
my_peer_id,
46-
network_sender,
46+
network_sender: Arc::new(network_sender),
4747
batch_store,
4848
max_batch_txns,
4949
max_batch_bytes,

consensus/src/recovery_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::{mem::Discriminant, process, sync::Arc};
2727
/// for processing the events carrying sync info and use the info to retrieve blocks from peers
2828
pub struct RecoveryManager {
2929
epoch_state: Arc<EpochState>,
30-
network: NetworkSender,
30+
network: Arc<NetworkSender>,
3131
storage: Arc<dyn PersistentLivenessStorage>,
3232
state_computer: Arc<dyn StateComputer>,
3333
last_committed_round: Round,
@@ -38,7 +38,7 @@ pub struct RecoveryManager {
3838
impl RecoveryManager {
3939
pub fn new(
4040
epoch_state: Arc<EpochState>,
41-
network: NetworkSender,
41+
network: Arc<NetworkSender>,
4242
storage: Arc<dyn PersistentLivenessStorage>,
4343
state_computer: Arc<dyn StateComputer>,
4444
last_committed_round: Round,

consensus/src/round_manager.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ pub struct RoundManager {
188188
proposer_election: UnequivocalProposerElection,
189189
proposal_generator: ProposalGenerator,
190190
safety_rules: Arc<Mutex<MetricsSafetyRules>>,
191-
network: NetworkSender,
191+
network: Arc<NetworkSender>,
192192
storage: Arc<dyn PersistentLivenessStorage>,
193193
onchain_config: OnChainConsensusConfig,
194194
vtxn_config: ValidatorTxnConfig,
@@ -205,7 +205,7 @@ impl RoundManager {
205205
proposer_election: Arc<dyn ProposerElection + Send + Sync>,
206206
proposal_generator: ProposalGenerator,
207207
safety_rules: Arc<Mutex<MetricsSafetyRules>>,
208-
network: NetworkSender,
208+
network: Arc<NetworkSender>,
209209
storage: Arc<dyn PersistentLivenessStorage>,
210210
onchain_config: OnChainConsensusConfig,
211211
buffered_proposal_tx: aptos_channel::Sender<Author, VerifiedEvent>,
@@ -295,15 +295,14 @@ impl RoundManager {
295295
self.log_collected_vote_stats(&new_round_event);
296296
self.round_state.setup_leader_timeout();
297297
let proposal_msg = self.generate_proposal(new_round_event).await?;
298-
let mut network = self.network.clone();
299298
#[cfg(feature = "failpoints")]
300299
{
301300
if self.check_whether_to_inject_reconfiguration_error() {
302301
self.attempt_to_inject_reconfiguration_error(&proposal_msg)
303302
.await?;
304303
}
305304
}
306-
network.broadcast_proposal(proposal_msg).await;
305+
self.network.broadcast_proposal(proposal_msg).await;
307306
counters::PROPOSALS_COUNT.inc();
308307
}
309308
Ok(())
@@ -384,7 +383,7 @@ impl RoundManager {
384383
) -> anyhow::Result<ProposalMsg> {
385384
// Proposal generator will ensure that at most one proposal is generated per round
386385
let sync_info = self.block_store.sync_info();
387-
let mut sender = self.network.clone();
386+
let sender = self.network.clone();
388387
let callback = async move {
389388
sender.broadcast_sync_info(sync_info).await;
390389
}
@@ -1168,7 +1167,6 @@ impl RoundManager {
11681167
.collect();
11691168
half_peers.truncate(half_peers.len() / 2);
11701169
self.network
1171-
.clone()
11721170
.send_proposal(proposal_msg.clone(), half_peers)
11731171
.await;
11741172
Err(anyhow::anyhow!("Injected error in reconfiguration suffix"))

consensus/src/round_manager_fuzzing.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,12 @@ fn create_node_for_fuzzing() -> RoundManager {
155155
epoch: 1,
156156
verifier: storage.get_validator_set().into(),
157157
});
158-
let network = NetworkSender::new(
158+
let network = Arc::new(NetworkSender::new(
159159
signer.author(),
160160
consensus_network_client,
161161
self_sender,
162162
epoch_state.verifier.clone(),
163-
);
163+
));
164164

165165
// TODO: mock
166166
let block_store = build_empty_store(storage.clone(), initial_data);

consensus/src/round_manager_test.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,12 @@ impl NodeSetup {
243243
playground.add_node(twin_id, consensus_tx, network_reqs_rx, conn_mgr_reqs_rx);
244244

245245
let (self_sender, self_receiver) = aptos_channels::new_test(1000);
246-
let network = NetworkSender::new(author, consensus_network_client, self_sender, validators);
246+
let network = Arc::new(NetworkSender::new(
247+
author,
248+
consensus_network_client,
249+
self_sender,
250+
validators,
251+
));
247252

248253
let all_network_events = Box::new(select(network_events, self_receiver));
249254

0 commit comments

Comments
 (0)