-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathbundled.rs
More file actions
347 lines (317 loc) · 13.8 KB
/
bundled.rs
File metadata and controls
347 lines (317 loc) · 13.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
use std::collections::HashMap;
use std::path::PathBuf;
use anyhow::Context;
use miden_node_block_producer::BlockProducer;
use miden_node_rpc::Rpc;
use miden_node_store::Store;
use miden_node_utils::clap::GrpcOptionsExternal;
use miden_node_utils::grpc::UrlExt;
use miden_node_validator::{Validator, ValidatorSigner};
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey;
use miden_protocol::utils::serde::Deserializable;
use tokio::net::TcpListener;
use tokio::task::JoinSet;
use url::Url;
use super::{ENV_DATA_DIRECTORY, ENV_RPC_URL};
use crate::commands::{
BlockProducerConfig,
BundledValidatorConfig,
ENV_BLOCK_PROVER_URL,
ENV_ENABLE_OTEL,
ENV_GENESIS_CONFIG_FILE,
NtxBuilderConfig,
ValidatorKey,
};
#[derive(clap::Subcommand)]
#[expect(clippy::large_enum_variant, reason = "This is a single use enum")]
pub enum BundledCommand {
/// Bootstraps the blockchain database with the genesis block.
///
/// The genesis block contains a single public faucet account. The private key for this
/// account is written to the `accounts-directory` which can be used to control the account.
///
/// This key is not required by the node and can be moved.
Bootstrap {
/// Directory in which to store the database and raw block data.
#[arg(long, env = ENV_DATA_DIRECTORY, value_name = "DIR")]
data_directory: PathBuf,
// Directory to write the account data to.
#[arg(long, value_name = "DIR")]
accounts_directory: PathBuf,
/// Constructs the genesis block from the given toml file.
#[arg(long, env = ENV_GENESIS_CONFIG_FILE, value_name = "FILE")]
genesis_config_file: Option<PathBuf>,
/// Configuration for the Validator key used to sign genesis block.
#[command(flatten)]
validator_key: ValidatorKey,
},
/// Runs all three node components in the same process.
///
/// The internal gRPC endpoints for the store and block-producer will each be assigned a random
/// open port on localhost (127.0.0.1:0).
Start {
/// Url at which to serve the RPC component's gRPC API.
#[arg(long = "rpc.url", env = ENV_RPC_URL, value_name = "URL")]
rpc_url: Url,
/// The remote block prover's gRPC url. If not provided, a local block prover will be used.
#[arg(long = "block-prover.url", env = ENV_BLOCK_PROVER_URL, value_name = "URL")]
block_prover_url: Option<Url>,
/// Directory in which the Store component should store the database and raw block data.
#[arg(long = "data-directory", env = ENV_DATA_DIRECTORY, value_name = "DIR")]
data_directory: PathBuf,
#[command(flatten)]
block_producer: BlockProducerConfig,
#[command(flatten)]
ntx_builder: NtxBuilderConfig,
#[command(flatten)]
validator: BundledValidatorConfig,
/// Enables the exporting of traces for OpenTelemetry.
///
/// This can be further configured using environment variables as defined in the official
/// OpenTelemetry documentation. See our operator manual for further details.
#[arg(long = "enable-otel", default_value_t = false, env = ENV_ENABLE_OTEL, value_name = "BOOL")]
enable_otel: bool,
#[command(flatten)]
grpc_options: GrpcOptionsExternal,
},
}
impl BundledCommand {
pub async fn handle(self) -> anyhow::Result<()> {
match self {
BundledCommand::Bootstrap {
data_directory,
accounts_directory,
genesis_config_file,
validator_key,
} => {
// Run validator bootstrap to create genesis block + account files.
crate::commands::validator::ValidatorCommand::bootstrap_genesis(
&data_directory,
&accounts_directory,
genesis_config_file.as_ref(),
validator_key,
)
.await
.context("failed to bootstrap genesis block")?;
// Feed the genesis block file into the store bootstrap.
let genesis_block_path =
data_directory.join(crate::commands::validator::GENESIS_BLOCK_FILENAME);
crate::commands::store::bootstrap_store(&data_directory, &genesis_block_path)
.context("failed to bootstrap the store component")
},
BundledCommand::Start {
rpc_url,
block_prover_url,
data_directory,
block_producer,
ntx_builder,
validator,
enable_otel: _,
grpc_options,
} => {
Self::start(
rpc_url,
block_prover_url,
data_directory,
block_producer,
ntx_builder,
validator,
grpc_options,
)
.await
},
}
}
#[expect(clippy::too_many_lines)]
async fn start(
rpc_url: Url,
block_prover_url: Option<Url>,
data_directory: PathBuf,
block_producer: BlockProducerConfig,
ntx_builder: NtxBuilderConfig,
validator: BundledValidatorConfig,
grpc_options: GrpcOptionsExternal,
) -> anyhow::Result<()> {
// Start listening on all gRPC urls so that inter-component connections can be created
// before each component is fully started up.
//
// This is required because `tonic` does not handle retries nor reconnections and our
// services expect to be able to connect on startup.
let grpc_rpc = rpc_url.to_socket().context("Failed to to RPC gRPC socket")?;
let grpc_rpc = TcpListener::bind(grpc_rpc)
.await
.context("Failed to bind to RPC gRPC endpoint")?;
let (block_producer_url, block_producer_address) = {
let socket_addr = TcpListener::bind("127.0.0.1:0")
.await
.context("Failed to bind to block-producer gRPC endpoint")?
.local_addr()
.context("Failed to retrieve the block-producer's gRPC address")?;
let url = Url::parse(&format!("http://{socket_addr}"))
.context("Failed to parse Block Producer URL")?;
(url, socket_addr)
};
// Validator URL is either specified remote, or generated local.
let (validator_url, validator_socket_address) = validator.to_addresses().await?;
// Store addresses for each exposed API
let store_rpc_listener = TcpListener::bind("127.0.0.1:0")
.await
.context("Failed to bind to store RPC gRPC endpoint")?;
let store_ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
.await
.context("Failed to bind to store ntx-builder gRPC endpoint")?;
let store_block_producer_listener = TcpListener::bind("127.0.0.1:0")
.await
.context("Failed to bind to store block-producer gRPC endpoint")?;
let store_rpc_address = store_rpc_listener
.local_addr()
.context("Failed to retrieve the store's RPC gRPC address")?;
let store_block_producer_address = store_block_producer_listener
.local_addr()
.context("Failed to retrieve the store's block-producer gRPC address")?;
let store_ntx_builder_address = store_ntx_builder_listener
.local_addr()
.context("Failed to retrieve the store's ntx-builder gRPC address")?;
let mut join_set = JoinSet::new();
// Start store. The store endpoint is available after loading completes.
let data_directory_clone = data_directory.clone();
let store_id = join_set
.spawn(async move {
Store {
rpc_listener: store_rpc_listener,
block_producer_listener: store_block_producer_listener,
ntx_builder_listener: store_ntx_builder_listener,
data_directory: data_directory_clone,
block_prover_url,
grpc_options: grpc_options.into(),
}
.serve()
.await
.context("failed while serving store component")
})
.id();
let should_start_ntx_builder = !ntx_builder.disabled;
// Start block-producer. The block-producer's endpoint is available after loading completes.
let block_producer_id = {
let validator_url = validator_url.clone();
join_set
.spawn({
let store_url = Url::parse(&format!("http://{store_block_producer_address}"))
.context("Failed to parse URL")?;
async move {
BlockProducer {
block_producer_address,
store_url,
validator_url,
batch_prover_url: block_producer.batch_prover_url,
batch_interval: block_producer.batch_interval,
block_interval: block_producer.block_interval,
max_batches_per_block: block_producer.max_batches_per_block,
max_txs_per_batch: block_producer.max_txs_per_batch,
grpc_options: grpc_options.into(),
mempool_tx_capacity: block_producer.mempool_tx_capacity,
}
.serve()
.await
.context("failed while serving block-producer component")
}
})
.id()
};
// Start RPC component.
let rpc_id = {
let block_producer_url = block_producer_url.clone();
let validator_url = validator_url.clone();
join_set
.spawn(async move {
let store_url = Url::parse(&format!("http://{store_rpc_address}"))
.context("Failed to parse URL")?;
Rpc {
listener: grpc_rpc,
store_url,
block_producer_url: Some(block_producer_url),
validator_url,
grpc_options,
}
.serve()
.await
.context("failed while serving RPC component")
})
.id()
};
// Lookup table so we can identify the failed component.
let mut component_ids = HashMap::from([
(store_id, "store"),
(block_producer_id, "block-producer"),
(rpc_id, "rpc"),
]);
// Start network transaction builder. The endpoint is available after loading completes.
if should_start_ntx_builder {
let store_ntx_builder_url = Url::parse(&format!("http://{store_ntx_builder_address}"))
.context("Failed to parse URL")?;
let block_producer_url = block_producer_url.clone();
let validator_url = validator_url.clone();
let builder_config = ntx_builder.into_builder_config(
store_ntx_builder_url,
block_producer_url,
validator_url,
&data_directory,
);
let id = join_set
.spawn(async move {
builder_config
.build()
.await
.context("failed to initialize ntx builder")?
.run()
.await
.context("failed while serving ntx builder component")
})
.id();
component_ids.insert(id, "ntx-builder");
}
// Start the Validator if we have bound a socket.
if let Some(address) = validator_socket_address {
let secret_key_bytes = hex::decode(validator.validator_key)?;
let signer = SecretKey::read_from_bytes(&secret_key_bytes)?;
let signer = ValidatorSigner::new_local(signer);
let id = join_set
.spawn({
async move {
Validator {
address,
grpc_options: grpc_options.into(),
signer,
data_directory,
}
.serve()
.await
.context("failed while serving validator component")
}
})
.id();
component_ids.insert(id, "validator");
}
// SAFETY: The joinset is definitely not empty.
let component_result = join_set.join_next_with_id().await.unwrap();
// We expect components to run indefinitely, so we treat any return as fatal.
//
// Map all outcomes to an error, and provide component context.
let (id, err) = match component_result {
Ok((id, Ok(_))) => (id, Err(anyhow::anyhow!("Component completed unexpectedly"))),
Ok((id, Err(err))) => (id, Err(err)),
Err(join_err) => (join_err.id(), Err(join_err).context("Joining component task")),
};
let component = component_ids.get(&id).unwrap_or(&"unknown");
// We could abort and gracefully shutdown the other components, but since we're crashing the
// node there is no point.
err.context(format!("Component {component} failed"))
}
pub fn is_open_telemetry_enabled(&self) -> bool {
if let Self::Start { enable_otel, .. } = self {
*enable_otel
} else {
false
}
}
}