Skip to content

Commit ee94a3a

Browse files
authored
Process epoch changes and events every time a block is executed (#4799)
## Motivation We would only process epoch changes and event streams when explicitly processing the inbox, which means it was possible eg. for users to unintentionally skip epoch updates while keeping proposing blocks on their chains. ## Proposal Process epoch changes (and any other relevant event streams) every time a block is proposed. ## Test Plan CI ## Release Plan - These changes should be backported to the latest `testnet` branch, then - be released in a new SDK, ## Links - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 8bd839a commit ee94a3a

File tree

2 files changed

+84
-107
lines changed
  • linera-chain/src/data_types
  • linera-core/src/client

2 files changed

+84
-107
lines changed

linera-chain/src/data_types/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,15 @@ pub enum Transaction {
159159

160160
impl BcsHashable<'_> for Transaction {}
161161

162+
impl Transaction {
163+
pub fn incoming_bundle(&self) -> Option<&IncomingBundle> {
164+
match self {
165+
Transaction::ReceiveMessages(bundle) => Some(bundle),
166+
_ => None,
167+
}
168+
}
169+
}
170+
162171
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, SimpleObject)]
163172
#[graphql(name = "Operation")]
164173
pub struct OperationMetadata {

linera-core/src/client/mod.rs

Lines changed: 75 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -2122,75 +2122,6 @@ impl<Env: Environment> ChainClient<Env> {
21222122
Ok(info)
21232123
}
21242124

2125-
/// Submits a fast block proposal to the validators.
2126-
///
2127-
/// This must only be used with valid epoch and super owner.
2128-
#[instrument(level = "trace", skip(committee, operations))]
2129-
pub async fn submit_fast_block_proposal(
2130-
&self,
2131-
committee: &Committee,
2132-
operations: &[Operation],
2133-
incoming_bundles: &[IncomingBundle],
2134-
super_owner: AccountOwner,
2135-
) -> Result<(u64, u64, u64, u64), ChainClientError> {
2136-
let creating_proposal_start = Instant::now();
2137-
let info = self.chain_info().await?;
2138-
let timestamp = self.next_timestamp(incoming_bundles, info.timestamp);
2139-
let transactions = incoming_bundles
2140-
.iter()
2141-
.map(|bundle| Transaction::ReceiveMessages(bundle.clone()))
2142-
.chain(
2143-
operations
2144-
.iter()
2145-
.map(|operation| Transaction::ExecuteOperation(operation.clone())),
2146-
)
2147-
.collect::<Vec<_>>();
2148-
let proposed_block = ProposedBlock {
2149-
epoch: info.epoch,
2150-
chain_id: self.chain_id,
2151-
transactions,
2152-
previous_block_hash: info.block_hash,
2153-
height: info.next_block_height,
2154-
authenticated_owner: Some(super_owner),
2155-
timestamp,
2156-
};
2157-
let proposal = Box::new(
2158-
BlockProposal::new_initial(
2159-
super_owner,
2160-
Round::Fast,
2161-
proposed_block.clone(),
2162-
self.signer(),
2163-
)
2164-
.await
2165-
.map_err(ChainClientError::signer_failure)?,
2166-
);
2167-
let creating_proposal_ms = creating_proposal_start.elapsed().as_millis() as u64;
2168-
let stage_block_execution_start = Instant::now();
2169-
let block = self
2170-
.client
2171-
.local_node
2172-
.stage_block_execution(proposed_block, None, Vec::new())
2173-
.await?
2174-
.0;
2175-
let stage_block_execution_ms = stage_block_execution_start.elapsed().as_millis() as u64;
2176-
let creating_confirmed_block_start = Instant::now();
2177-
let value = ConfirmedBlock::new(block);
2178-
let creating_confirmed_block_ms =
2179-
creating_confirmed_block_start.elapsed().as_millis() as u64;
2180-
let submitting_block_proposal_start = Instant::now();
2181-
self.client
2182-
.submit_block_proposal(committee, proposal, value)
2183-
.await?;
2184-
let submitting_block_proposal_ms =
2185-
submitting_block_proposal_start.elapsed().as_millis() as u64;
2186-
Ok((
2187-
creating_proposal_ms,
2188-
stage_block_execution_ms,
2189-
creating_confirmed_block_ms,
2190-
submitting_block_proposal_ms,
2191-
))
2192-
}
2193-
21942125
/// Attempts to update all validators about the local chain.
21952126
#[instrument(level = "trace", skip(old_committee))]
21962127
pub async fn update_validators(
@@ -2706,6 +2637,26 @@ impl<Env: Environment> ChainClient<Env> {
27062637
&self,
27072638
operations: Vec<Operation>,
27082639
blobs: Vec<Blob>,
2640+
) -> Result<ExecuteBlockOutcome, ChainClientError> {
2641+
let transactions = self.prepend_epochs_messages_and_events(operations).await?;
2642+
2643+
if transactions.is_empty() {
2644+
return Err(ChainClientError::LocalNodeError(
2645+
LocalNodeError::WorkerError(WorkerError::ChainError(Box::new(
2646+
ChainError::EmptyBlock,
2647+
))),
2648+
));
2649+
}
2650+
2651+
self.execute_prepared_transactions(transactions, blobs)
2652+
.await
2653+
}
2654+
2655+
#[instrument(level = "trace", skip(transactions, blobs))]
2656+
async fn execute_prepared_transactions(
2657+
&self,
2658+
transactions: Vec<Transaction>,
2659+
blobs: Vec<Blob>,
27092660
) -> Result<ExecuteBlockOutcome, ChainClientError> {
27102661
#[cfg(with_metrics)]
27112662
let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
@@ -2723,16 +2674,10 @@ impl<Env: Environment> ChainClient<Env> {
27232674
ClientOutcome::Committed(None) => {}
27242675
}
27252676

2726-
let incoming_bundles = self.pending_message_bundles().await?;
2727-
let identity = self.identity().await?;
2728-
let confirmed_value = self
2729-
.new_pending_block(incoming_bundles, operations, blobs, identity)
2730-
.await?;
2677+
let block = self.new_pending_block(transactions, blobs).await?;
27312678

27322679
match self.process_pending_block_without_prepare().await? {
2733-
ClientOutcome::Committed(Some(certificate))
2734-
if certificate.block() == confirmed_value.block() =>
2735-
{
2680+
ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
27362681
Ok(ExecuteBlockOutcome::Executed(certificate))
27372682
}
27382683
ClientOutcome::Committed(Some(certificate)) => {
@@ -2748,17 +2693,48 @@ impl<Env: Environment> ChainClient<Env> {
27482693
}
27492694
}
27502695

2696+
/// Creates a vector of transactions which, in addition to the provided operations,
2697+
/// also contains epoch changes, receiving message bundles and event stream updates
2698+
/// (if there are any to be processed).
2699+
/// This should be called when executing a block, in order to make sure that any pending
2700+
/// messages or events are included in it.
2701+
#[instrument(level = "trace", skip(operations))]
2702+
async fn prepend_epochs_messages_and_events(
2703+
&self,
2704+
operations: Vec<Operation>,
2705+
) -> Result<Vec<Transaction>, ChainClientError> {
2706+
let incoming_bundles = self.pending_message_bundles().await?;
2707+
let stream_updates = self.collect_stream_updates().await?;
2708+
Ok(self
2709+
.collect_epoch_changes()
2710+
.await?
2711+
.into_iter()
2712+
.map(Transaction::ExecuteOperation)
2713+
.chain(
2714+
incoming_bundles
2715+
.into_iter()
2716+
.map(Transaction::ReceiveMessages),
2717+
)
2718+
.chain(
2719+
stream_updates
2720+
.into_iter()
2721+
.map(Transaction::ExecuteOperation),
2722+
)
2723+
.chain(operations.into_iter().map(Transaction::ExecuteOperation))
2724+
.collect::<Vec<_>>())
2725+
}
2726+
27512727
/// Creates a new pending block and handles the proposal in the local node.
27522728
/// Next time `process_pending_block_without_prepare` is called, this block will be proposed
27532729
/// to the validators.
2754-
#[instrument(level = "trace", skip(incoming_bundles, operations, blobs))]
2730+
#[instrument(level = "trace", skip(transactions, blobs))]
27552731
async fn new_pending_block(
27562732
&self,
2757-
incoming_bundles: Vec<IncomingBundle>,
2758-
operations: Vec<Operation>,
2733+
transactions: Vec<Transaction>,
27592734
blobs: Vec<Blob>,
2760-
identity: AccountOwner,
2761-
) -> Result<ConfirmedBlock, ChainClientError> {
2735+
) -> Result<Block, ChainClientError> {
2736+
let identity = self.identity().await?;
2737+
27622738
ensure!(
27632739
self.pending_proposal().is_none(),
27642740
ChainClientError::BlockProposalError(
@@ -2767,12 +2743,7 @@ impl<Env: Environment> ChainClient<Env> {
27672743
)
27682744
);
27692745
let info = self.chain_info_with_committees().await?;
2770-
let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
2771-
let transactions = incoming_bundles
2772-
.into_iter()
2773-
.map(Transaction::ReceiveMessages)
2774-
.chain(operations.into_iter().map(Transaction::ExecuteOperation))
2775-
.collect::<Vec<_>>();
2746+
let timestamp = self.next_timestamp(&transactions, info.timestamp);
27762747
let proposed_block = ProposedBlock {
27772748
epoch: info.epoch,
27782749
chain_id: self.chain_id,
@@ -2805,22 +2776,19 @@ impl<Env: Environment> ChainClient<Env> {
28052776
self.update_state(|state| {
28062777
state.set_pending_proposal(proposed_block.clone(), blobs.clone())
28072778
});
2808-
Ok(ConfirmedBlock::new(block))
2779+
Ok(block)
28092780
}
28102781

28112782
/// Returns a suitable timestamp for the next block.
28122783
///
28132784
/// This will usually be the current time according to the local clock, but may be slightly
28142785
/// ahead to make sure it's not earlier than the incoming messages or the previous block.
2815-
#[instrument(level = "trace", skip(incoming_bundles))]
2816-
fn next_timestamp(
2817-
&self,
2818-
incoming_bundles: &[IncomingBundle],
2819-
block_time: Timestamp,
2820-
) -> Timestamp {
2786+
#[instrument(level = "trace", skip(transactions))]
2787+
fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
28212788
let local_time = self.storage_client().clock().current_time();
2822-
incoming_bundles
2789+
transactions
28232790
.iter()
2791+
.filter_map(Transaction::incoming_bundle)
28242792
.map(|msg| msg.bundle.timestamp)
28252793
.max()
28262794
.map_or(local_time, |timestamp| timestamp.max(local_time))
@@ -2948,11 +2916,11 @@ impl<Env: Environment> ChainClient<Env> {
29482916
return Ok((chain_balance, Some(owner_balance)));
29492917
}
29502918
let info = self.chain_info().await?;
2951-
let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
29522919
let transactions = incoming_bundles
29532920
.into_iter()
29542921
.map(Transaction::ReceiveMessages)
29552922
.collect::<Vec<_>>();
2923+
let timestamp = self.next_timestamp(&transactions, info.timestamp);
29562924
let block = ProposedBlock {
29572925
epoch: info.epoch,
29582926
chain_id: self.chain_id,
@@ -3713,20 +3681,20 @@ impl<Env: Environment> ChainClient<Env> {
37133681
#[cfg(with_metrics)]
37143682
let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
37153683

3716-
let mut epoch_change_ops = self.collect_epoch_changes().await?.into_iter();
3717-
37183684
let mut certificates = Vec::new();
37193685
loop {
3720-
let incoming_bundles = self.pending_message_bundles().await?;
3721-
let stream_updates = self.collect_stream_updates().await?;
3722-
let block_operations = stream_updates
3723-
.into_iter()
3724-
.chain(epoch_change_ops.next())
3725-
.collect::<Vec<_>>();
3726-
if incoming_bundles.is_empty() && block_operations.is_empty() {
3686+
// We provide no operations - this means that the only operations executed
3687+
// will be epoch changes, receiving messages and processing event stream
3688+
// updates, if any are pending.
3689+
let transactions = self.prepend_epochs_messages_and_events(vec![]).await?;
3690+
// Nothing in the inbox and no stream updates to be processed.
3691+
if transactions.is_empty() {
37273692
return Ok((certificates, None));
37283693
}
3729-
match self.execute_block(block_operations, vec![]).await {
3694+
match self
3695+
.execute_prepared_transactions(transactions, vec![])
3696+
.await
3697+
{
37303698
Ok(ExecuteBlockOutcome::Executed(certificate))
37313699
| Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
37323700
Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {

0 commit comments

Comments
 (0)