Skip to content

Commit cce10df

Browse files
authored
Add BlockExecutionTracker (#3982)
## Motivation Logic that tracks the resource consumption during block execution is scattered. The execution of block's transactions was imperative - we tracked every index (application, message, blob) in the main function logic which made reading and understanding harder. This meant an error of incompleteness was easy to miss. ## Proposal Add a new struct `BlockExecutionTracker` (similar to `TransactionTracker`) that wraps the block-wide context of the underlying transaction execution. It's responsible for updating indices, updating resources used after transaction execution. ## Test Plan CI ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 4e6c45c commit cce10df

File tree

13 files changed

+453
-254
lines changed

13 files changed

+453
-254
lines changed

linera-chain/src/block_tracker.rs

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
// Copyright (c) Zefchain Labs, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::collections::BTreeSet;
5+
6+
use custom_debug_derive::Debug;
7+
use linera_base::{
8+
data_types::{Blob, Event, OracleResponse, Timestamp},
9+
identifiers::{AccountOwner, BlobType, ChainId},
10+
};
11+
use linera_execution::{
12+
OutgoingMessage, ResourceController, ResourceTracker, SystemExecutionStateView,
13+
TransactionOutcome, TransactionTracker,
14+
};
15+
use linera_views::context::Context;
16+
17+
use crate::{
18+
chain::EMPTY_BLOCK_SIZE,
19+
data_types::{OperationResult, ProposedBlock, Transaction},
20+
ChainError, ChainExecutionContext, ExecutionResultExt,
21+
};
22+
23+
/// Tracks execution of transactions within a block.
24+
/// Captures the resource policy, produced messages, oracle responses and events.
25+
#[derive(Debug)]
26+
pub struct BlockExecutionTracker<'resources> {
27+
resource_controller: &'resources mut ResourceController<Option<AccountOwner>, ResourceTracker>,
28+
local_time: Timestamp,
29+
#[debug(skip_if = Option::is_none)]
30+
pub replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
31+
pub next_message_index: u32,
32+
pub next_application_index: u32,
33+
pub next_chain_index: u32,
34+
#[debug(skip_if = Vec::is_empty)]
35+
pub oracle_responses: Vec<Vec<OracleResponse>>,
36+
#[debug(skip_if = Vec::is_empty)]
37+
pub events: Vec<Vec<Event>>,
38+
#[debug(skip_if = Vec::is_empty)]
39+
pub blobs: Vec<Vec<Blob>>,
40+
#[debug(skip_if = Vec::is_empty)]
41+
pub messages: Vec<Vec<OutgoingMessage>>,
42+
#[debug(skip_if = Vec::is_empty)]
43+
pub operation_results: Vec<OperationResult>,
44+
// Index of the currently executed transaction in a block.
45+
transaction_index: u32,
46+
47+
// We expect the number of outcomes to be equal to the number of transactions in the block.
48+
expected_outcomes_count: usize,
49+
}
50+
51+
impl<'resources> BlockExecutionTracker<'resources> {
52+
/// Creates a new BlockExecutionTracker.
53+
pub fn new(
54+
resource_controller: &'resources mut ResourceController<
55+
Option<AccountOwner>,
56+
ResourceTracker,
57+
>,
58+
local_time: Timestamp,
59+
replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
60+
proposal: &ProposedBlock,
61+
) -> Result<Self, ChainError> {
62+
resource_controller
63+
.track_block_size(EMPTY_BLOCK_SIZE)
64+
.with_execution_context(ChainExecutionContext::Block)?;
65+
66+
Ok(Self {
67+
resource_controller,
68+
local_time,
69+
replaying_oracle_responses,
70+
next_message_index: 0,
71+
next_application_index: 0,
72+
next_chain_index: 0,
73+
oracle_responses: Vec::new(),
74+
events: Vec::new(),
75+
blobs: Vec::new(),
76+
messages: Vec::new(),
77+
operation_results: Vec::new(),
78+
transaction_index: 0,
79+
expected_outcomes_count: proposal.incoming_bundles.len() + proposal.operations.len(),
80+
})
81+
}
82+
83+
/// Returns a new TransactionTracker for the current transaction.
84+
pub fn new_transaction_tracker(&mut self) -> Result<TransactionTracker, ChainError> {
85+
Ok(TransactionTracker::new(
86+
self.local_time,
87+
self.transaction_index,
88+
self.next_message_index,
89+
self.next_application_index,
90+
self.next_chain_index,
91+
self.oracle_responses()?,
92+
))
93+
}
94+
95+
/// Returns oracle responses for the current transaction.
96+
fn oracle_responses(&self) -> Result<Option<Vec<OracleResponse>>, ChainError> {
97+
if let Some(responses) = self.replaying_oracle_responses.as_ref() {
98+
match responses.get(self.transaction_index as usize) {
99+
Some(responses) => Ok(Some(responses.clone())),
100+
None => Err(ChainError::MissingOracleResponseList),
101+
}
102+
} else {
103+
Ok(None)
104+
}
105+
}
106+
107+
/// Processes the transaction outcome.
108+
///
109+
/// Updates block tracker with indexes for the next messages, applications, etc.
110+
/// so that the execution of the next transaction doesn't overwrite the previous ones.
111+
///
112+
/// Tracks the resources used by the transaction - size of the incoming and outgoing messages, blobs, etc.
113+
pub async fn process_txn_outcome<C>(
114+
&mut self,
115+
txn_outcome: &TransactionOutcome,
116+
view: &mut SystemExecutionStateView<C>,
117+
context: ChainExecutionContext,
118+
) -> Result<(), ChainError>
119+
where
120+
C: Context + Clone + Send + Sync + 'static,
121+
{
122+
self.next_message_index = txn_outcome.next_message_index;
123+
self.next_application_index = txn_outcome.next_application_index;
124+
self.next_chain_index = txn_outcome.next_chain_index;
125+
self.oracle_responses
126+
.push(txn_outcome.oracle_responses.clone());
127+
self.events.push(txn_outcome.events.clone());
128+
self.blobs.push(txn_outcome.blobs.clone());
129+
self.messages.push(txn_outcome.outgoing_messages.clone());
130+
if matches!(context, ChainExecutionContext::Operation(_)) {
131+
self.operation_results
132+
.push(OperationResult(txn_outcome.operation_result.clone()));
133+
}
134+
135+
let mut resource_controller = self.resource_controller.with_state(view).await?;
136+
137+
for message_out in &txn_outcome.outgoing_messages {
138+
resource_controller
139+
.track_message(&message_out.message)
140+
.with_execution_context(context)?;
141+
}
142+
143+
resource_controller
144+
.track_block_size_of(&(
145+
&txn_outcome.oracle_responses,
146+
&txn_outcome.outgoing_messages,
147+
&txn_outcome.events,
148+
&txn_outcome.blobs,
149+
))
150+
.with_execution_context(context)?;
151+
152+
for blob in &txn_outcome.blobs {
153+
if blob.content().blob_type() == BlobType::Data {
154+
resource_controller
155+
.track_blob_published(blob.content())
156+
.with_execution_context(context)?;
157+
}
158+
}
159+
160+
self.resource_controller
161+
.track_block_size_of(&(&txn_outcome.operation_result))
162+
.with_execution_context(context)?;
163+
164+
self.transaction_index += 1;
165+
Ok(())
166+
}
167+
168+
/// Returns recipient chain ids for outgoing messages in the block.
169+
pub fn recipients(&self) -> BTreeSet<ChainId> {
170+
self.messages
171+
.iter()
172+
.flatten()
173+
.map(|msg| msg.destination)
174+
.collect()
175+
}
176+
177+
/// Returns the execution context for the current transaction.
178+
pub fn chain_execution_context(&self, transaction: &Transaction<'_>) -> ChainExecutionContext {
179+
match transaction {
180+
Transaction::ReceiveMessages(_) => {
181+
ChainExecutionContext::IncomingBundle(self.transaction_index)
182+
}
183+
Transaction::ExecuteOperation(_) => {
184+
ChainExecutionContext::Operation(self.transaction_index)
185+
}
186+
}
187+
}
188+
189+
/// Returns a mutable reference to the resource controller.
190+
pub fn resource_controller_mut(
191+
&mut self,
192+
) -> &mut ResourceController<Option<AccountOwner>, ResourceTracker> {
193+
self.resource_controller
194+
}
195+
196+
/// Finalizes the execution and returns the collected results.
197+
///
198+
/// This method should be called after all transactions have been processed.
199+
/// Panics if the number of outcomes does match the expected count.
200+
pub fn finalize(self) -> FinalizeExecutionResult {
201+
// Asserts that the number of outcomes matches the expected count.
202+
assert_eq!(self.oracle_responses.len(), self.expected_outcomes_count);
203+
assert_eq!(self.messages.len(), self.expected_outcomes_count);
204+
assert_eq!(self.events.len(), self.expected_outcomes_count);
205+
assert_eq!(self.blobs.len(), self.expected_outcomes_count);
206+
207+
#[cfg(with_metrics)]
208+
crate::chain::metrics::track_block_metrics(&self.resource_controller.tracker);
209+
210+
(
211+
self.messages,
212+
self.oracle_responses,
213+
self.events,
214+
self.blobs,
215+
self.operation_results,
216+
)
217+
}
218+
}
219+
220+
pub(crate) type FinalizeExecutionResult = (
221+
Vec<Vec<OutgoingMessage>>,
222+
Vec<Vec<OracleResponse>>,
223+
Vec<Vec<Event>>,
224+
Vec<Vec<Blob>>,
225+
Vec<OperationResult>,
226+
);

0 commit comments

Comments
 (0)