Skip to content

Commit 9ecf4a6

Browse files
Zekun Lizekun000
authored andcommitted
[zaptos] move commit vote broadcast into pipeline builder
1 parent 60b16f4 commit 9ecf4a6

File tree

9 files changed

+37
-38
lines changed

9 files changed

+37
-38
lines changed

consensus/consensus-types/src/pipelined_block.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -560,16 +560,6 @@ impl PipelinedBlock {
560560
let _ = fut.commit_ledger_fut.await;
561561
}
562562
}
563-
564-
pub async fn wait_for_commit_vote(&self) -> Option<CommitVote> {
565-
// may be aborted (e.g. by reset)
566-
if let Some(fut) = self.pipeline_futs() {
567-
// this may be cancelled
568-
fut.commit_vote_fut.await.ok()
569-
} else {
570-
None
571-
}
572-
}
573563
}
574564

575565
#[derive(Debug, Clone, Eq, PartialEq)]

consensus/src/pipeline/decoupled_execution_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::sync::{
3030
pub fn prepare_phases_and_buffer_manager(
3131
author: Author,
3232
safety_rules: Arc<dyn CommitSignerProvider>,
33-
commit_msg_tx: NetworkSender,
33+
commit_msg_tx: Arc<NetworkSender>,
3434
commit_msg_rx: Receiver<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
3535
block_rx: UnboundedReceiver<OrderedBlocks>,
3636
sync_rx: UnboundedReceiver<ResetRequest>,
@@ -95,7 +95,7 @@ pub fn prepare_phases_and_buffer_manager(
9595
let (persisting_phase_request_tx, persisting_phase_request_rx) =
9696
create_channel::<CountedRequest<PersistingRequest>>();
9797
let (persisting_phase_response_tx, persisting_phase_response_rx) = create_channel();
98-
let commit_msg_tx = Arc::new(commit_msg_tx);
98+
let commit_msg_tx = commit_msg_tx;
9999

100100
let persisting_phase_processor = PersistingPhase::new(commit_msg_tx.clone());
101101
let persisting_phase = PipelinePhase::new(

consensus/src/pipeline/execution_client.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,8 @@ impl ExecutionProxyClient {
208208
buffer_manager_back_pressure_enabled: bool,
209209
consensus_observer_config: ConsensusObserverConfig,
210210
consensus_publisher: Option<Arc<ConsensusPublisher>>,
211+
network_sender: Arc<NetworkSender>,
211212
) {
212-
let network_sender = NetworkSender::new(
213-
self.author,
214-
self.network_sender.clone(),
215-
self.self_sender.clone(),
216-
epoch_state.verifier.clone(),
217-
);
218-
219213
let (reset_buffer_manager_tx, reset_buffer_manager_rx) = unbounded::<ResetRequest>();
220214

221215
let (commit_msg_tx, commit_msg_rx) =
@@ -240,7 +234,7 @@ impl ExecutionProxyClient {
240234
rand_config,
241235
fast_rand_config,
242236
rand_ready_block_tx,
243-
Arc::new(network_sender.clone()),
237+
network_sender.clone(),
244238
self.rand_storage.clone(),
245239
self.bounded_executor.clone(),
246240
&self.consensus_config.rand_rb_config,
@@ -319,6 +313,12 @@ impl TExecutionClient for ExecutionProxyClient {
319313
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
320314
highest_committed_round: Round,
321315
) {
316+
let network_sender = Arc::new(NetworkSender::new(
317+
self.author,
318+
self.network_sender.clone(),
319+
self.self_sender.clone(),
320+
epoch_state.verifier.clone(),
321+
));
322322
let maybe_rand_msg_tx = self.spawn_decoupled_execution(
323323
maybe_consensus_key,
324324
commit_signer_provider,
@@ -331,6 +331,7 @@ impl TExecutionClient for ExecutionProxyClient {
331331
self.consensus_config.enable_pre_commit,
332332
self.consensus_observer_config,
333333
self.consensus_publisher.clone(),
334+
network_sender.clone(),
334335
);
335336

336337
let transaction_shuffler =
@@ -353,6 +354,7 @@ impl TExecutionClient for ExecutionProxyClient {
353354
randomness_enabled,
354355
onchain_consensus_config.clone(),
355356
aux_version,
357+
network_sender,
356358
);
357359

358360
maybe_rand_msg_tx

consensus/src/pipeline/pipeline_builder.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{
66
block_storage::tracing::{observe_block, BlockStage},
77
counters::{self, update_counters_for_block, update_counters_for_compute_result},
88
monitor,
9+
network::NetworkSender,
910
payload_manager::TPayloadManager,
1011
txn_notifier::TxnNotifier,
1112
IntGaugeGuard,
@@ -135,6 +136,7 @@ pub struct PipelineBuilder {
135136
persisted_auxiliary_info_version: u8,
136137
rand_check_enabled: bool,
137138
module_cache: Arc<Mutex<Option<ValidationState<CachedStateView>>>>,
139+
network_sender: Arc<NetworkSender>,
138140
}
139141

140142
fn spawn_shared_fut<
@@ -257,6 +259,7 @@ impl PipelineBuilder {
257259
enable_pre_commit: bool,
258260
consensus_onchain_config: &OnChainConsensusConfig,
259261
persisted_auxiliary_info_version: u8,
262+
network_sender: Arc<NetworkSender>,
260263
) -> Self {
261264
let module_cache = Arc::new(Mutex::new(None));
262265
Self {
@@ -274,6 +277,7 @@ impl PipelineBuilder {
274277
persisted_auxiliary_info_version,
275278
rand_check_enabled: consensus_onchain_config.rand_check_enabled(),
276279
module_cache,
280+
network_sender,
277281
}
278282
}
279283

@@ -434,14 +438,15 @@ impl PipelineBuilder {
434438
None,
435439
);
436440
let commit_vote_fut = spawn_shared_fut(
437-
Self::sign_commit_vote(
441+
Self::sign_and_broadcast_commit_vote(
438442
ledger_update_fut.clone(),
439443
order_vote_rx,
440444
order_proof_fut.clone(),
441445
commit_proof_fut.clone(),
442446
self.signer.clone(),
443447
block.clone(),
444448
self.order_vote_enabled,
449+
self.network_sender.clone(),
445450
),
446451
Some(&mut abort_handles),
447452
);
@@ -860,15 +865,16 @@ impl PipelineBuilder {
860865
}
861866

862867
/// Precondition: 1. ledger update finishes, 2. order vote or order proof or commit proof is received
863-
/// What it does: Sign the commit vote with execution result, it needs to update the timestamp for reconfig suffix blocks
864-
async fn sign_commit_vote(
868+
/// What it does: Sign the commit vote with execution result and broadcast, it needs to update the timestamp for reconfig suffix blocks
869+
async fn sign_and_broadcast_commit_vote(
865870
ledger_update_fut: TaskFuture<LedgerUpdateResult>,
866871
order_vote_rx: oneshot::Receiver<()>,
867872
order_proof_fut: TaskFuture<WrappedLedgerInfo>,
868873
commit_proof_fut: TaskFuture<LedgerInfoWithSignatures>,
869874
signer: Arc<ValidatorSigner>,
870875
block: Arc<Block>,
871876
order_vote_enabled: bool,
877+
network_sender: Arc<NetworkSender>,
872878
) -> TaskResult<CommitVoteResult> {
873879
let mut tracker = Tracker::start_waiting("sign_commit_vote", &block);
874880
let (compute_result, _, epoch_end_timestamp) = ledger_update_fut.await?;
@@ -907,11 +913,11 @@ impl PipelineBuilder {
907913
let ledger_info = LedgerInfo::new(block_info, consensus_data_hash);
908914
info!("[Pipeline] Signed ledger info {ledger_info}");
909915
let signature = signer.sign(&ledger_info).expect("Signing should succeed");
910-
Ok(CommitVote::new_with_signature(
911-
signer.author(),
912-
ledger_info,
913-
signature,
914-
))
916+
let commit_vote = CommitVote::new_with_signature(signer.author(), ledger_info, signature);
917+
network_sender
918+
.broadcast_commit_vote(commit_vote.clone())
919+
.await;
920+
Ok(commit_vote)
915921
}
916922

917923
/// Precondition: 1. ledger update finishes, 2. parent block's phase finishes 2. order proof is received

consensus/src/pipeline/tests/buffer_manager_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ pub fn prepare_buffer_manager(
114114

115115
let (self_loop_tx, self_loop_rx) = aptos_channels::new_unbounded_test();
116116
let validators = Arc::new(validators);
117-
let network = NetworkSender::new(
117+
let network = Arc::new(NetworkSender::new(
118118
author,
119119
consensus_network_client,
120120
self_loop_tx,
121121
validators.clone(),
122-
);
122+
));
123123

124124
let (msg_tx, msg_rx) = aptos_channel::new::<
125125
AccountAddress,

consensus/src/round_manager.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1644,12 +1644,6 @@ impl RoundManager {
16441644
if let Some(tx) = proposed_block.pipeline_tx().lock().as_mut() {
16451645
let _ = tx.order_vote_tx.take().map(|tx| tx.send(()));
16461646
}
1647-
let network = self.network.clone();
1648-
tokio::spawn(async move {
1649-
if let Some(commit_vote) = proposed_block.wait_for_commit_vote().await {
1650-
network.broadcast_commit_vote(commit_vote).await;
1651-
}
1652-
});
16531647
}
16541648
ORDER_VOTE_BROADCASTED.inc();
16551649
}

consensus/src/state_computer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// SPDX-License-Identifier: Apache-2.0
44

55
use crate::{
6-
block_preparer::BlockPreparer, error::StateSyncError, monitor,
6+
block_preparer::BlockPreparer, error::StateSyncError, monitor, network::NetworkSender,
77
payload_manager::TPayloadManager, pipeline::pipeline_builder::PipelineBuilder,
88
state_replication::StateComputer, transaction_deduper::TransactionDeduper,
99
transaction_shuffler::TransactionShuffler, txn_notifier::TxnNotifier,
@@ -46,6 +46,7 @@ struct MutableState {
4646
is_randomness_enabled: bool,
4747
consensus_onchain_config: OnChainConsensusConfig,
4848
persisted_auxiliary_info_version: u8,
49+
network_sender: Arc<NetworkSender>,
4950
}
5051

5152
/// Basic communication with the Execution module;
@@ -89,6 +90,7 @@ impl ExecutionProxy {
8990
is_randomness_enabled,
9091
consensus_onchain_config,
9192
persisted_auxiliary_info_version,
93+
network_sender,
9294
} = self
9395
.state
9496
.read()
@@ -115,6 +117,7 @@ impl ExecutionProxy {
115117
self.enable_pre_commit,
116118
&consensus_onchain_config,
117119
persisted_auxiliary_info_version,
120+
network_sender,
118121
)
119122
}
120123
}
@@ -235,6 +238,7 @@ impl StateComputer for ExecutionProxy {
235238
randomness_enabled: bool,
236239
consensus_onchain_config: OnChainConsensusConfig,
237240
persisted_auxiliary_info_version: u8,
241+
network_sender: Arc<NetworkSender>,
238242
) {
239243
*self.state.write() = Some(MutableState {
240244
validators: epoch_state
@@ -249,6 +253,7 @@ impl StateComputer for ExecutionProxy {
249253
is_randomness_enabled: randomness_enabled,
250254
consensus_onchain_config,
251255
persisted_auxiliary_info_version,
256+
network_sender,
252257
});
253258
}
254259

consensus/src/state_replication.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// SPDX-License-Identifier: Apache-2.0
44

55
use crate::{
6-
error::StateSyncError, payload_manager::TPayloadManager,
6+
error::StateSyncError, network::NetworkSender, payload_manager::TPayloadManager,
77
transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler,
88
};
99
use anyhow::Result;
@@ -48,6 +48,7 @@ pub trait StateComputer: Send + Sync {
4848
randomness_enabled: bool,
4949
consensus_onchain_config: OnChainConsensusConfig,
5050
persisted_auxiliary_info_version: u8,
51+
network_sender: Arc<NetworkSender>,
5152
);
5253

5354
// Reconfigure to clear epoch state at end of epoch.

consensus/src/test_utils/mock_state_computer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ impl StateComputer for RandomComputeResultStateComputer {
6161
_: bool,
6262
_: OnChainConsensusConfig,
6363
_: u8,
64+
_: Arc<crate::network::NetworkSender>,
6465
) {
6566
}
6667

0 commit comments

Comments
 (0)