Skip to content

Commit 9ca3e8b

Browse files
authored
Backport: process epoch changes and events every time a block is executed (#4799) (#4810)
## Motivation We would only process epoch changes and event streams when explicitly processing the inbox, which means it was possible eg. for users to unintentionally skip epoch updates while keeping proposing blocks on their chains. ## Proposal Process epoch changes (and any other relevant event streams) every time a block is proposed. (Backport of #4799) ## Test Plan CI ## Release Plan - Nothing to do ## Links - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 3109595 commit 9ca3e8b

File tree

2 files changed

+84
-107
lines changed
  • linera-chain/src/data_types
  • linera-core/src/client

2 files changed

+84
-107
lines changed

linera-chain/src/data_types/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,15 @@ pub enum Transaction {
159159

160160
impl BcsHashable<'_> for Transaction {}
161161

162+
impl Transaction {
163+
pub fn incoming_bundle(&self) -> Option<&IncomingBundle> {
164+
match self {
165+
Transaction::ReceiveMessages(bundle) => Some(bundle),
166+
_ => None,
167+
}
168+
}
169+
}
170+
162171
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, SimpleObject)]
163172
#[graphql(name = "Operation")]
164173
pub struct OperationMetadata {

linera-core/src/client/mod.rs

Lines changed: 75 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,75 +1998,6 @@ impl<Env: Environment> ChainClient<Env> {
19981998
Ok(info)
19991999
}
20002000

2001-
/// Submits a fast block proposal to the validators.
2002-
///
2003-
/// This must only be used with valid epoch and super owner.
2004-
#[instrument(level = "trace", skip(committee, operations))]
2005-
pub async fn submit_fast_block_proposal(
2006-
&self,
2007-
committee: &Committee,
2008-
operations: &[Operation],
2009-
incoming_bundles: &[IncomingBundle],
2010-
super_owner: AccountOwner,
2011-
) -> Result<(u64, u64, u64, u64), ChainClientError> {
2012-
let creating_proposal_start = Instant::now();
2013-
let info = self.chain_info().await?;
2014-
let timestamp = self.next_timestamp(incoming_bundles, info.timestamp);
2015-
let transactions = incoming_bundles
2016-
.iter()
2017-
.map(|bundle| Transaction::ReceiveMessages(bundle.clone()))
2018-
.chain(
2019-
operations
2020-
.iter()
2021-
.map(|operation| Transaction::ExecuteOperation(operation.clone())),
2022-
)
2023-
.collect::<Vec<_>>();
2024-
let proposed_block = ProposedBlock {
2025-
epoch: info.epoch,
2026-
chain_id: self.chain_id,
2027-
transactions,
2028-
previous_block_hash: info.block_hash,
2029-
height: info.next_block_height,
2030-
authenticated_signer: Some(super_owner),
2031-
timestamp,
2032-
};
2033-
let proposal = Box::new(
2034-
BlockProposal::new_initial(
2035-
super_owner,
2036-
Round::Fast,
2037-
proposed_block.clone(),
2038-
self.signer(),
2039-
)
2040-
.await
2041-
.map_err(ChainClientError::signer_failure)?,
2042-
);
2043-
let creating_proposal_ms = creating_proposal_start.elapsed().as_millis() as u64;
2044-
let stage_block_execution_start = Instant::now();
2045-
let block = self
2046-
.client
2047-
.local_node
2048-
.stage_block_execution(proposed_block, None, Vec::new())
2049-
.await?
2050-
.0;
2051-
let stage_block_execution_ms = stage_block_execution_start.elapsed().as_millis() as u64;
2052-
let creating_confirmed_block_start = Instant::now();
2053-
let value = ConfirmedBlock::new(block);
2054-
let creating_confirmed_block_ms =
2055-
creating_confirmed_block_start.elapsed().as_millis() as u64;
2056-
let submitting_block_proposal_start = Instant::now();
2057-
self.client
2058-
.submit_block_proposal(committee, proposal, value)
2059-
.await?;
2060-
let submitting_block_proposal_ms =
2061-
submitting_block_proposal_start.elapsed().as_millis() as u64;
2062-
Ok((
2063-
creating_proposal_ms,
2064-
stage_block_execution_ms,
2065-
creating_confirmed_block_ms,
2066-
submitting_block_proposal_ms,
2067-
))
2068-
}
2069-
20702001
/// Attempts to update all validators about the local chain.
20712002
#[instrument(level = "trace", skip(old_committee))]
20722003
pub async fn update_validators(
@@ -2582,6 +2513,26 @@ impl<Env: Environment> ChainClient<Env> {
25822513
&self,
25832514
operations: Vec<Operation>,
25842515
blobs: Vec<Blob>,
2516+
) -> Result<ExecuteBlockOutcome, ChainClientError> {
2517+
let transactions = self.prepend_epochs_messages_and_events(operations).await?;
2518+
2519+
if transactions.is_empty() {
2520+
return Err(ChainClientError::LocalNodeError(
2521+
LocalNodeError::WorkerError(WorkerError::ChainError(Box::new(
2522+
ChainError::EmptyBlock,
2523+
))),
2524+
));
2525+
}
2526+
2527+
self.execute_prepared_transactions(transactions, blobs)
2528+
.await
2529+
}
2530+
2531+
#[instrument(level = "trace", skip(transactions, blobs))]
2532+
async fn execute_prepared_transactions(
2533+
&self,
2534+
transactions: Vec<Transaction>,
2535+
blobs: Vec<Blob>,
25852536
) -> Result<ExecuteBlockOutcome, ChainClientError> {
25862537
#[cfg(with_metrics)]
25872538
let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
@@ -2599,16 +2550,10 @@ impl<Env: Environment> ChainClient<Env> {
25992550
ClientOutcome::Committed(None) => {}
26002551
}
26012552

2602-
let incoming_bundles = self.pending_message_bundles().await?;
2603-
let identity = self.identity().await?;
2604-
let confirmed_value = self
2605-
.new_pending_block(incoming_bundles, operations, blobs, identity)
2606-
.await?;
2553+
let block = self.new_pending_block(transactions, blobs).await?;
26072554

26082555
match self.process_pending_block_without_prepare().await? {
2609-
ClientOutcome::Committed(Some(certificate))
2610-
if certificate.block() == confirmed_value.block() =>
2611-
{
2556+
ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
26122557
Ok(ExecuteBlockOutcome::Executed(certificate))
26132558
}
26142559
ClientOutcome::Committed(Some(certificate)) => {
@@ -2624,17 +2569,48 @@ impl<Env: Environment> ChainClient<Env> {
26242569
}
26252570
}
26262571

2572+
/// Creates a vector of transactions which, in addition to the provided operations,
2573+
/// also contains epoch changes, receiving message bundles and event stream updates
2574+
/// (if there are any to be processed).
2575+
/// This should be called when executing a block, in order to make sure that any pending
2576+
/// messages or events are included in it.
2577+
#[instrument(level = "trace", skip(operations))]
2578+
async fn prepend_epochs_messages_and_events(
2579+
&self,
2580+
operations: Vec<Operation>,
2581+
) -> Result<Vec<Transaction>, ChainClientError> {
2582+
let incoming_bundles = self.pending_message_bundles().await?;
2583+
let stream_updates = self.collect_stream_updates().await?;
2584+
Ok(self
2585+
.collect_epoch_changes()
2586+
.await?
2587+
.into_iter()
2588+
.map(Transaction::ExecuteOperation)
2589+
.chain(
2590+
incoming_bundles
2591+
.into_iter()
2592+
.map(Transaction::ReceiveMessages),
2593+
)
2594+
.chain(
2595+
stream_updates
2596+
.into_iter()
2597+
.map(Transaction::ExecuteOperation),
2598+
)
2599+
.chain(operations.into_iter().map(Transaction::ExecuteOperation))
2600+
.collect::<Vec<_>>())
2601+
}
2602+
26272603
/// Creates a new pending block and handles the proposal in the local node.
26282604
/// Next time `process_pending_block_without_prepare` is called, this block will be proposed
26292605
/// to the validators.
2630-
#[instrument(level = "trace", skip(incoming_bundles, operations, blobs))]
2606+
#[instrument(level = "trace", skip(transactions, blobs))]
26312607
async fn new_pending_block(
26322608
&self,
2633-
incoming_bundles: Vec<IncomingBundle>,
2634-
operations: Vec<Operation>,
2609+
transactions: Vec<Transaction>,
26352610
blobs: Vec<Blob>,
2636-
identity: AccountOwner,
2637-
) -> Result<ConfirmedBlock, ChainClientError> {
2611+
) -> Result<Block, ChainClientError> {
2612+
let identity = self.identity().await?;
2613+
26382614
ensure!(
26392615
self.pending_proposal().is_none(),
26402616
ChainClientError::BlockProposalError(
@@ -2643,12 +2619,7 @@ impl<Env: Environment> ChainClient<Env> {
26432619
)
26442620
);
26452621
let info = self.chain_info_with_committees().await?;
2646-
let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
2647-
let transactions = incoming_bundles
2648-
.into_iter()
2649-
.map(Transaction::ReceiveMessages)
2650-
.chain(operations.into_iter().map(Transaction::ExecuteOperation))
2651-
.collect::<Vec<_>>();
2622+
let timestamp = self.next_timestamp(&transactions, info.timestamp);
26522623
let proposed_block = ProposedBlock {
26532624
epoch: info.epoch,
26542625
chain_id: self.chain_id,
@@ -2681,22 +2652,19 @@ impl<Env: Environment> ChainClient<Env> {
26812652
self.update_state(|state| {
26822653
state.set_pending_proposal(proposed_block.clone(), blobs.clone())
26832654
});
2684-
Ok(ConfirmedBlock::new(block))
2655+
Ok(block)
26852656
}
26862657

26872658
/// Returns a suitable timestamp for the next block.
26882659
///
26892660
/// This will usually be the current time according to the local clock, but may be slightly
26902661
/// ahead to make sure it's not earlier than the incoming messages or the previous block.
2691-
#[instrument(level = "trace", skip(incoming_bundles))]
2692-
fn next_timestamp(
2693-
&self,
2694-
incoming_bundles: &[IncomingBundle],
2695-
block_time: Timestamp,
2696-
) -> Timestamp {
2662+
#[instrument(level = "trace", skip(transactions))]
2663+
fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
26972664
let local_time = self.storage_client().clock().current_time();
2698-
incoming_bundles
2665+
transactions
26992666
.iter()
2667+
.filter_map(Transaction::incoming_bundle)
27002668
.map(|msg| msg.bundle.timestamp)
27012669
.max()
27022670
.map_or(local_time, |timestamp| timestamp.max(local_time))
@@ -2824,11 +2792,11 @@ impl<Env: Environment> ChainClient<Env> {
28242792
return Ok((chain_balance, Some(owner_balance)));
28252793
}
28262794
let info = self.chain_info().await?;
2827-
let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
28282795
let transactions = incoming_bundles
28292796
.into_iter()
28302797
.map(Transaction::ReceiveMessages)
28312798
.collect::<Vec<_>>();
2799+
let timestamp = self.next_timestamp(&transactions, info.timestamp);
28322800
let block = ProposedBlock {
28332801
epoch: info.epoch,
28342802
chain_id: self.chain_id,
@@ -3589,20 +3557,20 @@ impl<Env: Environment> ChainClient<Env> {
35893557
#[cfg(with_metrics)]
35903558
let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
35913559

3592-
let mut epoch_change_ops = self.collect_epoch_changes().await?.into_iter();
3593-
35943560
let mut certificates = Vec::new();
35953561
loop {
3596-
let incoming_bundles = self.pending_message_bundles().await?;
3597-
let stream_updates = self.collect_stream_updates().await?;
3598-
let block_operations = stream_updates
3599-
.into_iter()
3600-
.chain(epoch_change_ops.next())
3601-
.collect::<Vec<_>>();
3602-
if incoming_bundles.is_empty() && block_operations.is_empty() {
3562+
// We provide no operations - this means that the only operations executed
3563+
// will be epoch changes, receiving messages and processing event stream
3564+
// updates, if any are pending.
3565+
let transactions = self.prepend_epochs_messages_and_events(vec![]).await?;
3566+
// Nothing in the inbox and no stream updates to be processed.
3567+
if transactions.is_empty() {
36033568
return Ok((certificates, None));
36043569
}
3605-
match self.execute_block(block_operations, vec![]).await {
3570+
match self
3571+
.execute_prepared_transactions(transactions, vec![])
3572+
.await
3573+
{
36063574
Ok(ExecuteBlockOutcome::Executed(certificate))
36073575
| Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
36083576
Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {

0 commit comments

Comments
 (0)