Skip to content

Commit fe0f68d

Browse files
authored
Move transaction execution to BlockExecutionTracker (#4121)
## Motivation As part of the bigger work we want to make sure that the execution of incoming messages can be paid for by the grant they carry. This is difficult to test now. ## Proposal Extract transaction execution to `BlockExecutionTracker` to make testing simpler. ## Test Plan CI. This doesn't introduce new features - just a refactor. ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent c7962c7 commit fe0f68d

File tree

2 files changed

+177
-150
lines changed

2 files changed

+177
-150
lines changed

linera-chain/src/block_tracker.rs

Lines changed: 170 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,38 @@
44
use std::collections::{BTreeMap, BTreeSet};
55

66
use custom_debug_derive::Debug;
7+
#[cfg(with_metrics)]
8+
use linera_base::prometheus_util::MeasureLatency;
79
use linera_base::{
8-
data_types::{Blob, Event, OracleResponse, Timestamp},
9-
identifiers::{AccountOwner, BlobId, ChainId},
10+
data_types::{Amount, Blob, BlockHeight, Event, OracleResponse, Timestamp},
11+
ensure,
12+
identifiers::{AccountOwner, BlobId, ChainId, MessageId},
1013
};
1114
use linera_execution::{
12-
OutgoingMessage, ResourceController, ResourceTracker, SystemExecutionStateView,
13-
TransactionOutcome, TransactionTracker,
15+
ExecutionRuntimeContext, ExecutionStateView, MessageContext, OperationContext, OutgoingMessage,
16+
ResourceController, ResourceTracker, SystemExecutionStateView, TransactionOutcome,
17+
TransactionTracker,
1418
};
1519
use linera_views::context::Context;
1620

21+
#[cfg(with_metrics)]
22+
use crate::chain::metrics;
1723
use crate::{
1824
chain::EMPTY_BLOCK_SIZE,
19-
data_types::{OperationResult, ProposedBlock, Transaction},
25+
data_types::{
26+
IncomingBundle, MessageAction, OperationResult, PostedMessage, ProposedBlock, Transaction,
27+
},
2028
ChainError, ChainExecutionContext, ExecutionResultExt,
2129
};
2230

2331
/// Tracks execution of transactions within a block.
2432
/// Captures the resource policy, produced messages, oracle responses and events.
2533
#[derive(Debug)]
2634
pub struct BlockExecutionTracker<'resources, 'blobs> {
35+
chain_id: ChainId,
36+
block_height: BlockHeight,
37+
timestmap: Timestamp,
38+
authenticated_signer: Option<AccountOwner>,
2739
resource_controller: &'resources mut ResourceController<Option<AccountOwner>, ResourceTracker>,
2840
local_time: Timestamp,
2941
#[debug(skip_if = Option::is_none)]
@@ -68,6 +80,10 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
6880
.with_execution_context(ChainExecutionContext::Block)?;
6981

7082
Ok(Self {
83+
chain_id: proposal.chain_id,
84+
block_height: proposal.height,
85+
timestmap: proposal.timestamp,
86+
authenticated_signer: proposal.authenticated_signer,
7187
resource_controller,
7288
local_time,
7389
replaying_oracle_responses,
@@ -85,8 +101,79 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
85101
})
86102
}
87103

104+
/// Executes a transaction in the context of the block.
105+
pub async fn execute_transaction<C>(
106+
&mut self,
107+
transaction: Transaction<'_>,
108+
round: Option<u32>,
109+
chain: &mut ExecutionStateView<C>,
110+
) -> Result<(), ChainError>
111+
where
112+
C: Context + Clone + Send + Sync + 'static,
113+
C::Extra: ExecutionRuntimeContext,
114+
{
115+
let chain_execution_context = self.chain_execution_context(&transaction);
116+
let mut txn_tracker = self.new_transaction_tracker()?;
117+
118+
match transaction {
119+
Transaction::ReceiveMessages(incoming_bundle) => {
120+
self.resource_controller_mut()
121+
.track_block_size_of(&incoming_bundle)
122+
.with_execution_context(chain_execution_context)?;
123+
for (message_id, posted_message) in incoming_bundle.messages_and_ids() {
124+
Box::pin(self.execute_message_in_block(
125+
chain,
126+
message_id,
127+
posted_message,
128+
incoming_bundle,
129+
round,
130+
&mut txn_tracker,
131+
))
132+
.await?;
133+
}
134+
}
135+
Transaction::ExecuteOperation(operation) => {
136+
self.resource_controller_mut()
137+
.with_state(&mut chain.system)
138+
.await?
139+
.track_block_size_of(&operation)
140+
.with_execution_context(chain_execution_context)?;
141+
#[cfg(with_metrics)]
142+
let _operation_latency = metrics::OPERATION_EXECUTION_LATENCY.measure_latency();
143+
let context = OperationContext {
144+
chain_id: self.chain_id,
145+
height: self.block_height,
146+
round,
147+
authenticated_signer: self.authenticated_signer,
148+
authenticated_caller_id: None,
149+
timestamp: self.timestmap,
150+
};
151+
Box::pin(chain.execute_operation(
152+
context,
153+
operation.clone(),
154+
&mut txn_tracker,
155+
self.resource_controller_mut(),
156+
))
157+
.await
158+
.with_execution_context(chain_execution_context)?;
159+
self.resource_controller_mut()
160+
.with_state(&mut chain.system)
161+
.await?
162+
.track_operation(operation)
163+
.with_execution_context(chain_execution_context)?;
164+
}
165+
}
166+
167+
let txn_outcome = txn_tracker
168+
.into_outcome()
169+
.with_execution_context(chain_execution_context)?;
170+
self.process_txn_outcome(&txn_outcome, &mut chain.system, chain_execution_context)
171+
.await?;
172+
Ok(())
173+
}
174+
88175
/// Returns a new TransactionTracker for the current transaction.
89-
pub fn new_transaction_tracker(&mut self) -> Result<TransactionTracker, ChainError> {
176+
fn new_transaction_tracker(&mut self) -> Result<TransactionTracker, ChainError> {
90177
Ok(TransactionTracker::new(
91178
self.local_time,
92179
self.transaction_index,
@@ -97,6 +184,83 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
97184
))
98185
}
99186

187+
/// Executes a message as part of an incoming bundle in a block.
188+
async fn execute_message_in_block<C>(
189+
&mut self,
190+
chain: &mut ExecutionStateView<C>,
191+
message_id: MessageId,
192+
posted_message: &PostedMessage,
193+
incoming_bundle: &IncomingBundle,
194+
round: Option<u32>,
195+
txn_tracker: &mut TransactionTracker,
196+
) -> Result<(), ChainError>
197+
where
198+
C: Context + Clone + Send + Sync + 'static,
199+
C::Extra: ExecutionRuntimeContext,
200+
{
201+
#[cfg(with_metrics)]
202+
let _message_latency = metrics::MESSAGE_EXECUTION_LATENCY.measure_latency();
203+
let context = MessageContext {
204+
chain_id: self.chain_id,
205+
is_bouncing: posted_message.is_bouncing(),
206+
height: self.block_height,
207+
round,
208+
message_id,
209+
authenticated_signer: posted_message.authenticated_signer,
210+
refund_grant_to: posted_message.refund_grant_to,
211+
timestamp: self.timestmap,
212+
};
213+
let mut grant = posted_message.grant;
214+
match incoming_bundle.action {
215+
MessageAction::Accept => {
216+
let chain_execution_context =
217+
ChainExecutionContext::IncomingBundle(txn_tracker.transaction_index());
218+
// Once a chain is closed, accepting incoming messages is not allowed.
219+
ensure!(!chain.system.closed.get(), ChainError::ClosedChain);
220+
221+
Box::pin(chain.execute_message(
222+
context,
223+
posted_message.message.clone(),
224+
(grant > Amount::ZERO).then_some(&mut grant),
225+
txn_tracker,
226+
self.resource_controller_mut(),
227+
))
228+
.await
229+
.with_execution_context(chain_execution_context)?;
230+
chain
231+
.send_refund(context, grant, txn_tracker)
232+
.await
233+
.with_execution_context(chain_execution_context)?;
234+
}
235+
MessageAction::Reject => {
236+
// If rejecting a message fails, the entire block proposal should be
237+
// scrapped.
238+
ensure!(
239+
!posted_message.is_protected() || *chain.system.closed.get(),
240+
ChainError::CannotRejectMessage {
241+
chain_id: self.chain_id,
242+
origin: incoming_bundle.origin,
243+
posted_message: Box::new(posted_message.clone()),
244+
}
245+
);
246+
if posted_message.is_tracked() {
247+
// Bounce the message.
248+
chain
249+
.bounce_message(context, grant, posted_message.message.clone(), txn_tracker)
250+
.await
251+
.with_execution_context(ChainExecutionContext::Block)?;
252+
} else {
253+
// Nothing to do except maybe refund the grant.
254+
chain
255+
.send_refund(context, grant, txn_tracker)
256+
.await
257+
.with_execution_context(ChainExecutionContext::Block)?;
258+
}
259+
}
260+
}
261+
Ok(())
262+
}
263+
100264
/// Returns oracle responses for the current transaction.
101265
fn oracle_responses(&self) -> Result<Option<Vec<OracleResponse>>, ChainError> {
102266
if let Some(responses) = self.replaying_oracle_responses.as_ref() {

0 commit comments

Comments
 (0)