-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathbuilder.rs
More file actions
315 lines (287 loc) · 12.6 KB
/
builder.rs
File metadata and controls
315 lines (287 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
use std::pin::Pin;
use std::sync::Arc;
use anyhow::Context;
use futures::Stream;
use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_protocol::account::delta::AccountUpdateDetails;
use miden_protocol::block::BlockHeader;
use tokio::net::TcpListener;
use tokio::sync::{RwLock, mpsc};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tonic::Status;
use crate::NtxBuilderConfig;
use crate::actor::{AccountActorContext, AccountOrigin, ActorRequest};
use crate::chain_state::ChainState;
use crate::clients::StoreClient;
use crate::coordinator::Coordinator;
use crate::db::Db;
use crate::server::NtxBuilderRpcServer;
// NETWORK TRANSACTION BUILDER
// ================================================================================================
/// A boxed, pinned stream of mempool events with a `'static` lifetime.
///
/// Boxing gives the stream a `'static` lifetime by ensuring it owns all its data, avoiding
/// complex lifetime annotations that would otherwise be required when storing `impl TryStream`.
pub(crate) type MempoolEventStream =
Pin<Box<dyn Stream<Item = Result<MempoolEvent, Status>> + Send>>;
/// Network transaction builder component.
///
/// The network transaction builder is in charge of building transactions that consume notes
/// against network accounts. These notes are identified and communicated by the block producer.
/// The service maintains a list of unconsumed notes and periodically executes and proves
/// transactions that consume them (reaching out to the store to retrieve state as necessary).
///
/// The builder manages the tasks for every network account on the chain through the coordinator.
///
/// Create an instance using [`NtxBuilderConfig::build()`].
pub struct NetworkTransactionBuilder {
/// Configuration for the builder.
config: NtxBuilderConfig,
/// Coordinator for managing actor tasks.
coordinator: Coordinator,
/// Client for the store gRPC API.
store: StoreClient,
/// Database for persistent state.
db: Db,
/// Shared chain state updated by the event loop and read by actors.
chain_state: Arc<RwLock<ChainState>>,
/// Context shared with all account actors.
actor_context: AccountActorContext,
/// Stream of mempool events from the block producer.
mempool_events: MempoolEventStream,
/// Database update requests from account actors.
///
/// We keep database writes centralized so this is how actors communicate
/// items to write.
actor_request_rx: mpsc::Receiver<ActorRequest>,
}
impl NetworkTransactionBuilder {
#[expect(clippy::too_many_arguments)]
pub(crate) fn new(
config: NtxBuilderConfig,
coordinator: Coordinator,
store: StoreClient,
db: Db,
chain_state: Arc<RwLock<ChainState>>,
actor_context: AccountActorContext,
mempool_events: MempoolEventStream,
actor_request_rx: mpsc::Receiver<ActorRequest>,
) -> Self {
Self {
config,
coordinator,
store,
db,
chain_state,
actor_context,
mempool_events,
actor_request_rx,
}
}
/// Runs the network transaction builder event loop until a fatal error occurs.
///
/// If a `TcpListener` is provided, a gRPC server is also spawned to expose the
/// `GetNoteError` endpoint.
///
/// This method:
/// 1. Optionally starts a gRPC server for note error queries
/// 2. Spawns a background task to load existing network accounts from the store
/// 3. Runs the main event loop, processing mempool events and managing actors
///
/// # Errors
///
/// Returns an error if:
/// - The mempool event stream ends unexpectedly
/// - An actor encounters a fatal error
/// - The account loader task fails
/// - The gRPC server fails
pub async fn run(self, listener: Option<TcpListener>) -> anyhow::Result<()> {
let mut join_set = JoinSet::new();
// Start the gRPC server if a listener is provided.
if let Some(listener) = listener {
let server = NtxBuilderRpcServer::new(self.db.clone());
join_set.spawn(async move {
server.serve(listener).await.context("ntx-builder gRPC server failed")
});
}
join_set.spawn(self.run_event_loop());
// Wait for either the event loop or the gRPC server to complete.
// Any completion is treated as fatal.
if let Some(result) = join_set.join_next().await {
result.context("ntx-builder task panicked")??;
}
Ok(())
}
/// Runs the main event loop.
async fn run_event_loop(mut self) -> anyhow::Result<()> {
// Spawn a background task to load network accounts from the store.
// Accounts are sent through a channel and processed in the main event loop.
let (account_tx, mut account_rx) =
mpsc::channel::<NetworkAccountId>(self.config.account_channel_capacity);
let account_loader_store = self.store.clone();
let mut account_loader_handle = tokio::spawn(async move {
account_loader_store
.stream_network_account_ids(account_tx)
.await
.context("failed to load network accounts from store")
});
// Main event loop.
loop {
tokio::select! {
// Handle actor result. If a timed-out actor needs respawning, do so.
result = self.coordinator.next() => {
if let Some(account_id) = result? {
self.coordinator
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
}
},
// Handle mempool events.
event = self.mempool_events.next() => {
let event = event
.context("mempool event stream ended")?
.context("mempool event stream failed")?;
self.handle_mempool_event(event).await?;
},
// Handle account batches loaded from the store.
// Once all accounts are loaded, the channel closes and this branch
// becomes inactive (recv returns None and we stop matching).
Some(account_id) = account_rx.recv() => {
self.handle_loaded_account(account_id).await?;
},
// Handle requests from actors.
Some(request) = self.actor_request_rx.recv() => {
self.handle_actor_request(request).await?;
},
// Handle account loader task completion/failure.
// If the task fails, we abort since the builder would be in a degraded state
// where existing notes against network accounts won't be processed.
result = &mut account_loader_handle => {
result
.context("account loader task panicked")
.flatten()?;
tracing::info!("account loading from store completed");
account_loader_handle = tokio::spawn(std::future::pending());
},
}
}
}
/// Handles account IDs loaded from the store by syncing state to DB and spawning actors.
#[tracing::instrument(name = "ntx.builder.handle_loaded_account", skip(self, account_id))]
async fn handle_loaded_account(
&mut self,
account_id: NetworkAccountId,
) -> Result<(), anyhow::Error> {
// Fetch account from store and write to DB.
let account = self
.store
.get_network_account(account_id)
.await
.context("failed to load account from store")?
.context("account should exist in store")?;
let block_num = self.chain_state.read().await.chain_tip_header.block_num();
let notes = self
.store
.get_unconsumed_network_notes(account_id, block_num.as_u32())
.await
.context("failed to load notes from store")?;
// Write account and notes to DB.
self.db
.sync_account_from_store(account_id, account.clone(), notes.clone())
.await
.context("failed to sync account to DB")?;
self.coordinator
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
Ok(())
}
/// Handles mempool events by writing to DB first, then notifying actors.
#[tracing::instrument(name = "ntx.builder.handle_mempool_event", skip(self, event))]
async fn handle_mempool_event(&mut self, event: MempoolEvent) -> Result<(), anyhow::Error> {
match &event {
MempoolEvent::TransactionAdded { account_delta, .. } => {
// Write event effects to DB first.
self.coordinator
.write_event(&event)
.await
.context("failed to write TransactionAdded to DB")?;
// Handle account deltas in case an account is being created.
if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
// Handle account deltas for network accounts only.
if let Some(network_account) = AccountOrigin::transaction(delta) {
// Spawn new actors if a transaction creates a new network account.
let is_creating_account = delta.is_full_state();
if is_creating_account {
self.coordinator.spawn_actor(network_account, &self.actor_context);
}
}
}
let inactive_targets = self.coordinator.send_targeted(&event);
for account_id in inactive_targets {
self.coordinator
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
}
Ok(())
},
// Update chain state and notify affected actors.
MempoolEvent::BlockCommitted { header, .. } => {
// Write event effects to DB first.
let result = self
.coordinator
.write_event(&event)
.await
.context("failed to write BlockCommitted to DB")?;
self.update_chain_tip(header.as_ref().clone()).await;
self.coordinator.notify_accounts(&result.accounts_to_notify);
Ok(())
},
// Notify affected actors (reverted account actors will self-cancel when they
// detect their account has been removed from the DB).
MempoolEvent::TransactionsReverted(_) => {
// Write event effects to DB first.
let result = self
.coordinator
.write_event(&event)
.await
.context("failed to write TransactionsReverted to DB")?;
self.coordinator.notify_accounts(&result.accounts_to_notify);
Ok(())
},
}
}
/// Processes a request from an account actor.
async fn handle_actor_request(&mut self, request: ActorRequest) -> Result<(), anyhow::Error> {
match request {
ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => {
self.db
.notes_failed(failed_notes, block_num)
.await
.context("failed to mark notes as failed")?;
let _ = ack_tx.send(());
},
ActorRequest::CacheNoteScript { script_root, script } => {
self.db
.insert_note_script(script_root, &script)
.await
.context("failed to cache note script")?;
},
}
Ok(())
}
/// Updates the chain tip and prunes old blocks from the MMR.
async fn update_chain_tip(&mut self, tip: BlockHeader) {
let mut chain_state = self.chain_state.write().await;
// Update MMR which lags by one block.
let mmr_tip = chain_state.chain_tip_header.clone();
Arc::make_mut(&mut chain_state.chain_mmr).add_block(&mmr_tip, true);
// Set the new tip.
chain_state.chain_tip_header = tip;
// Keep MMR pruned.
let pruned_block_height = (chain_state
.chain_mmr
.chain_length()
.as_usize()
.saturating_sub(self.config.max_block_count)) as u32;
Arc::make_mut(&mut chain_state.chain_mmr).prune_to(..pruned_block_height.into());
}
}