Skip to content

Commit 9eb270f

Browse files
apollo_protobuf: move executed_transaction_count into ProposalFin (#11492)
1 parent 7ed8d4a commit 9eb270f

File tree

12 files changed

+62
-215
lines changed

12 files changed

+62
-215
lines changed

crates/apollo_consensus_orchestrator/src/build_proposal.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -288,18 +288,10 @@ async fn get_proposal_content(
288288
}
289289
}
290290

291-
let final_n_executed_txs_u64 = final_n_executed_txs
291+
let executed_transaction_count: u64 = final_n_executed_txs
292292
.try_into()
293293
.expect("Number of executed transactions should fit in u64");
294-
args.stream_sender
295-
.send(ProposalPart::ExecutedTransactionCount(final_n_executed_txs_u64))
296-
.await
297-
.map_err(|e| {
298-
BuildProposalError::SendError(format!(
299-
"Failed to send executed transaction count: {e:?}"
300-
))
301-
})?;
302-
let fin = ProposalFin { proposal_commitment };
294+
let fin = ProposalFin { proposal_commitment, executed_transaction_count };
303295
info!("Sending fin={fin:?}");
304296
args.stream_sender.send(ProposalPart::Fin(fin)).await.map_err(|e| {
305297
BuildProposalError::SendError(format!("Failed to send proposal fin: {e:?}"))

crates/apollo_consensus_orchestrator/src/sequencer_consensus_context.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -972,12 +972,10 @@ async fn send_reproposal(
972972
stream_sender.send(ProposalPart::Transactions(TransactionBatch { transactions })).await?;
973973
n_executed_txs += batch.len();
974974
}
975-
stream_sender
976-
.send(ProposalPart::ExecutedTransactionCount(
977-
n_executed_txs.try_into().expect("Number of executed transactions should fit in u64"),
978-
))
979-
.await?;
980-
stream_sender.send(ProposalPart::Fin(ProposalFin { proposal_commitment: id })).await?;
975+
let executed_transaction_count: u64 =
976+
n_executed_txs.try_into().expect("Number of executed transactions should fit in u64");
977+
let fin = ProposalFin { proposal_commitment: id, executed_transaction_count };
978+
stream_sender.send(ProposalPart::Fin(fin)).await?;
981979

982980
Ok(())
983981
}

crates/apollo_consensus_orchestrator/src/sequencer_consensus_context_test.rs

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,9 @@ async fn validate_then_repropose(#[case] execute_all_txs: bool) {
139139
let transactions =
140140
ProposalPart::Transactions(TransactionBatch { transactions: TX_BATCH.to_vec() });
141141
content_sender.send(transactions.clone()).await.unwrap();
142-
content_sender
143-
.send(ProposalPart::ExecutedTransactionCount(n_executed_txs_count.try_into().unwrap()))
144-
.await
145-
.unwrap();
146142
let fin = ProposalPart::Fin(ProposalFin {
147143
proposal_commitment: ProposalCommitment(STATE_DIFF_COMMITMENT.0.0),
144+
executed_transaction_count: n_executed_txs_count.try_into().unwrap(),
148145
});
149146
content_sender.send(fin.clone()).await.unwrap();
150147
let fin_receiver =
@@ -161,10 +158,6 @@ async fn validate_then_repropose(#[case] execute_all_txs: bool) {
161158
receiver.next().await.unwrap(),
162159
ProposalPart::Transactions(TransactionBatch { transactions: executed_transactions })
163160
);
164-
assert_eq!(
165-
receiver.next().await.unwrap(),
166-
ProposalPart::ExecutedTransactionCount(n_executed_txs_count.try_into().unwrap())
167-
);
168161
assert_eq!(receiver.next().await.unwrap(), fin);
169162
assert!(receiver.next().await.is_none());
170163
}
@@ -181,18 +174,16 @@ async fn proposals_from_different_rounds() {
181174
// Proposal parts sent in the proposals.
182175
let prop_part_txs =
183176
ProposalPart::Transactions(TransactionBatch { transactions: TX_BATCH.to_vec() });
184-
let prop_part_executed_count =
185-
ProposalPart::ExecutedTransactionCount(INTERNAL_TX_BATCH.len().try_into().unwrap());
186177
let prop_part_fin = ProposalPart::Fin(ProposalFin {
187178
proposal_commitment: ProposalCommitment(STATE_DIFF_COMMITMENT.0.0),
179+
executed_transaction_count: INTERNAL_TX_BATCH.len().try_into().unwrap(),
188180
});
189181

190182
// The proposal from the past round is ignored.
191183
let (mut content_sender, content_receiver) =
192184
mpsc::channel(context.config.static_config.proposal_buffer_size);
193185
content_sender.send(ProposalPart::BlockInfo(block_info(BlockNumber(0)))).await.unwrap();
194186
content_sender.send(prop_part_txs.clone()).await.unwrap();
195-
content_sender.send(prop_part_executed_count.clone()).await.unwrap();
196187

197188
let mut init = ProposalInit { round: 0, ..Default::default() };
198189
let fin_receiver_past_round = context.validate_proposal(init, TIMEOUT, content_receiver).await;
@@ -204,7 +195,6 @@ async fn proposals_from_different_rounds() {
204195
mpsc::channel(context.config.static_config.proposal_buffer_size);
205196
content_sender.send(ProposalPart::BlockInfo(block_info(BlockNumber(0)))).await.unwrap();
206197
content_sender.send(prop_part_txs.clone()).await.unwrap();
207-
content_sender.send(prop_part_executed_count.clone()).await.unwrap();
208198
content_sender.send(prop_part_fin.clone()).await.unwrap();
209199
init.round = 1;
210200
let fin_receiver_curr_round = context.validate_proposal(init, TIMEOUT, content_receiver).await;
@@ -215,7 +205,6 @@ async fn proposals_from_different_rounds() {
215205
mpsc::channel(context.config.static_config.proposal_buffer_size);
216206
content_sender.send(ProposalPart::BlockInfo(block_info(BlockNumber(0)))).await.unwrap();
217207
content_sender.send(prop_part_txs.clone()).await.unwrap();
218-
content_sender.send(prop_part_executed_count.clone()).await.unwrap();
219208
content_sender.send(prop_part_fin.clone()).await.unwrap();
220209
let fin_receiver_future_round = context
221210
.validate_proposal(
@@ -284,14 +273,11 @@ async fn build_proposal() {
284273
receiver.next().await.unwrap(),
285274
ProposalPart::Transactions(TransactionBatch { transactions: TX_BATCH.to_vec() })
286275
);
287-
assert_eq!(
288-
receiver.next().await.unwrap(),
289-
ProposalPart::ExecutedTransactionCount(INTERNAL_TX_BATCH.len().try_into().unwrap())
290-
);
291276
assert_eq!(
292277
receiver.next().await.unwrap(),
293278
ProposalPart::Fin(ProposalFin {
294279
proposal_commitment: ProposalCommitment(STATE_DIFF_COMMITMENT.0.0),
280+
executed_transaction_count: INTERNAL_TX_BATCH.len().try_into().unwrap(),
295281
})
296282
);
297283
assert!(receiver.next().await.is_none());
@@ -494,8 +480,6 @@ async fn propose_then_repropose(#[case] execute_all_txs: bool) {
494480
let _init = receiver.next().await.unwrap();
495481
let block_info = receiver.next().await.unwrap();
496482
let _txs = receiver.next().await.unwrap();
497-
let final_n_executed_txs = receiver.next().await.unwrap();
498-
assert!(matches!(final_n_executed_txs, ProposalPart::ExecutedTransactionCount(_)));
499483
let fin = receiver.next().await.unwrap();
500484
assert_eq!(fin_receiver.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0);
501485

@@ -514,7 +498,6 @@ async fn propose_then_repropose(#[case] execute_all_txs: bool) {
514498
let reproposed_txs = ProposalPart::Transactions(TransactionBatch { transactions });
515499
assert_eq!(receiver.next().await.unwrap(), reproposed_txs);
516500

517-
assert_eq!(receiver.next().await.unwrap(), final_n_executed_txs);
518501
assert_eq!(receiver.next().await.unwrap(), fin);
519502
assert!(receiver.next().await.is_none());
520503
}
@@ -711,14 +694,11 @@ async fn oracle_fails_on_startup(#[case] l1_oracle_failure: bool) {
711694
receiver.next().await.unwrap(),
712695
ProposalPart::Transactions(TransactionBatch { transactions: TX_BATCH.to_vec() })
713696
);
714-
assert_eq!(
715-
receiver.next().await.unwrap(),
716-
ProposalPart::ExecutedTransactionCount(INTERNAL_TX_BATCH.len().try_into().unwrap())
717-
);
718697
assert_eq!(
719698
receiver.next().await.unwrap(),
720699
ProposalPart::Fin(ProposalFin {
721700
proposal_commitment: ProposalCommitment(STATE_DIFF_COMMITMENT.0.0),
701+
executed_transaction_count: INTERNAL_TX_BATCH.len().try_into().unwrap(),
722702
})
723703
);
724704
assert!(receiver.next().await.is_none());
@@ -837,14 +817,11 @@ async fn oracle_fails_on_second_block(#[case] l1_oracle_failure: bool) {
837817
receiver.next().await.unwrap(),
838818
ProposalPart::Transactions(TransactionBatch { transactions: TX_BATCH.to_vec() })
839819
);
840-
assert_eq!(
841-
receiver.next().await.unwrap(),
842-
ProposalPart::ExecutedTransactionCount(INTERNAL_TX_BATCH.len().try_into().unwrap())
843-
);
844820
assert_eq!(
845821
receiver.next().await.unwrap(),
846822
ProposalPart::Fin(ProposalFin {
847823
proposal_commitment: ProposalCommitment(STATE_DIFF_COMMITMENT.0.0),
824+
executed_transaction_count: INTERNAL_TX_BATCH.len().try_into().unwrap(),
848825
})
849826
);
850827
assert!(receiver.next().await.is_none());

crates/apollo_consensus_orchestrator/src/test_utils.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -382,13 +382,10 @@ pub(crate) async fn send_proposal_to_validator_context(
382382
.send(ProposalPart::Transactions(TransactionBatch { transactions: TX_BATCH.to_vec() }))
383383
.await
384384
.unwrap();
385-
content_sender
386-
.send(ProposalPart::ExecutedTransactionCount(INTERNAL_TX_BATCH.len().try_into().unwrap()))
387-
.await
388-
.unwrap();
389385
content_sender
390386
.send(ProposalPart::Fin(ProposalFin {
391387
proposal_commitment: ProtoProposalCommitment(STATE_DIFF_COMMITMENT.0.0),
388+
executed_transaction_count: INTERNAL_TX_BATCH.len().try_into().unwrap(),
392389
}))
393390
.await
394391
.unwrap();

crates/apollo_consensus_orchestrator/src/validate_proposal.rs

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ pub(crate) async fn validate_proposal(
133133
mut args: ProposalValidateArguments,
134134
) -> ValidateProposalResult<ProposalCommitment> {
135135
let mut content = Vec::new();
136-
let mut final_n_executed_txs: Option<usize> = None;
137136
let now = args.deps.clock.now();
138137

139138
let Some(deadline) = now.checked_add_signed(chrono::TimeDelta::from_std(args.timeout).unwrap())
@@ -150,7 +149,10 @@ pub(crate) async fn validate_proposal(
150149
.await?
151150
{
152151
SecondProposalPart::BlockInfo(block_info) => block_info,
153-
SecondProposalPart::Fin(ProposalFin { proposal_commitment }) => {
152+
SecondProposalPart::Fin(ProposalFin {
153+
proposal_commitment,
154+
executed_transaction_count: _,
155+
}) => {
154156
return Ok(proposal_commitment);
155157
}
156158
};
@@ -196,7 +198,6 @@ pub(crate) async fn validate_proposal(
196198
args.deps.batcher.as_ref(),
197199
proposal_part.clone(),
198200
&mut content,
199-
&mut final_n_executed_txs,
200201
args.deps.transaction_converter.clone(),
201202
).await {
202203
HandledProposalPart::Finished(built_block, received_fin) => {
@@ -373,9 +374,9 @@ async fn await_second_proposal_part(
373374
Some(ProposalPart::BlockInfo(block_info)) => {
374375
Ok(SecondProposalPart::BlockInfo(block_info))
375376
}
376-
Some(ProposalPart::Fin(ProposalFin { proposal_commitment })) => {
377+
Some(ProposalPart::Fin(ProposalFin { proposal_commitment, executed_transaction_count })) => {
377378
warn!("Received an empty proposal.");
378-
Ok(SecondProposalPart::Fin(ProposalFin { proposal_commitment }))
379+
Ok(SecondProposalPart::Fin(ProposalFin { proposal_commitment, executed_transaction_count }))
379380
}
380381
x => {
381382
Err(ValidateProposalError::InvalidSecondProposalPart(x
@@ -428,7 +429,6 @@ async fn handle_proposal_part(
428429
batcher: &dyn BatcherClient,
429430
proposal_part: Option<ProposalPart>,
430431
content: &mut Vec<Vec<InternalConsensusTransaction>>,
431-
final_n_executed_txs: &mut Option<usize>,
432432
transaction_converter: Arc<dyn TransactionConverterTrait>,
433433
) -> HandledProposalPart {
434434
match proposal_part {
@@ -444,15 +444,18 @@ async fn handle_proposal_part(
444444
}
445445
Some(ProposalPart::Fin(fin)) => {
446446
info!("Received fin={fin:?}");
447-
let Some(final_n_executed_txs_nonopt) = *final_n_executed_txs else {
447+
let Ok(executed_txs_count) = fin.executed_transaction_count.try_into() else {
448448
return HandledProposalPart::Failed(
449-
"Received Fin without executed transaction count".to_string(),
449+
"Number of executed transactions should fit in usize".to_string(),
450450
);
451451
};
452+
453+
*content = truncate_to_executed_txs(content, executed_txs_count);
454+
452455
// Output this along with the ID from batcher, to compare them.
453456
let input = SendProposalContentInput {
454457
proposal_id,
455-
content: SendProposalContent::Finish(final_n_executed_txs_nonopt),
458+
content: SendProposalContent::Finish(executed_txs_count),
456459
};
457460
let response = match batcher.send_proposal_content(input).await {
458461
Ok(response) => response,
@@ -474,21 +477,16 @@ async fn handle_proposal_part(
474477
info!(
475478
network_block_id = ?fin.proposal_commitment,
476479
?batcher_block_id,
477-
final_n_executed_txs_nonopt,
480+
executed_txs_count,
478481
"Finished validating proposal."
479482
);
480-
if final_n_executed_txs_nonopt == 0 {
483+
if executed_txs_count == 0 {
481484
warn!("Validated an empty proposal.");
482485
}
483486
HandledProposalPart::Finished(batcher_block_id, fin)
484487
}
485488
Some(ProposalPart::Transactions(TransactionBatch { transactions: txs })) => {
486489
debug!("Received transaction batch with {} txs", txs.len());
487-
if final_n_executed_txs.is_some() {
488-
return HandledProposalPart::Failed(
489-
"Received transactions after executed transaction count".to_string(),
490-
);
491-
}
492490
let txs =
493491
futures::future::join_all(txs.into_iter().map(|tx| {
494492
transaction_converter.convert_consensus_tx_to_internal_consensus_tx(tx)
@@ -529,24 +527,6 @@ async fn handle_proposal_part(
529527
}
530528
}
531529
}
532-
Some(ProposalPart::ExecutedTransactionCount(executed_txs_count)) => {
533-
debug!("Received executed transaction count: {executed_txs_count}");
534-
if final_n_executed_txs.is_some() {
535-
return HandledProposalPart::Failed(
536-
"Received executed transaction count more than once".to_string(),
537-
);
538-
}
539-
let executed_txs_count_usize_res: Result<usize, _> = executed_txs_count.try_into();
540-
let Ok(executed_txs_count_usize) = executed_txs_count_usize_res else {
541-
return HandledProposalPart::Failed(
542-
"Number of executed transactions should fit in usize".to_string(),
543-
);
544-
};
545-
*final_n_executed_txs = Some(executed_txs_count_usize);
546-
*content = truncate_to_executed_txs(content, executed_txs_count_usize);
547-
548-
HandledProposalPart::Continue
549-
}
550530
_ => HandledProposalPart::Failed("Invalid proposal part".to_string()),
551531
}
552532
}

0 commit comments

Comments
 (0)