Skip to content

Commit 7be4ef1

Browse files
authored
Indexer destination (#3949)
## Motivation Generic indexer destination for the block exporter. To be rebased on top of #3896 . ## Proposal - Integration with the local net. - Global indexing for the blobs for each destination kind. - Also Closes #3669 ## Test Plan Unit + integration tests ## Release Plan - Nothing to do / These changes follow the usual release cycle.
1 parent 57acab7 commit 7be4ef1

File tree

23 files changed

+1690
-159
lines changed

23 files changed

+1690
-159
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

linera-client/src/config.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -246,17 +246,19 @@ pub struct Destination {
246246
pub endpoint: String,
247247
/// The port number of the target destination.
248248
pub port: u16,
249+
/// The description for the gRPC based destination.
250+
/// Discriminates the export mode and the client to use.
251+
pub kind: DestinationKind,
249252
}
250253

251-
impl Destination {
252-
pub fn address(&self) -> String {
253-
let tls = match self.tls {
254-
TlsConfig::ClearText => "http",
255-
TlsConfig::Tls => "https",
256-
};
257-
258-
format!("{}://{}:{}", tls, self.endpoint, self.port)
259-
}
254+
/// The description for the gRPC based destination.
255+
/// Discriminates the export mode and the client to use.
256+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy)]
257+
pub enum DestinationKind {
258+
/// The indexer description.
259+
Indexer,
260+
/// The validator description.
261+
Validator,
260262
}
261263

262264
/// The configuration file to impose various limits
@@ -295,3 +297,22 @@ impl Default for LimitsConfig {
295297
}
296298
}
297299
}
300+
301+
impl Destination {
302+
pub fn address(&self) -> String {
303+
match self.kind {
304+
DestinationKind::Indexer => {
305+
let tls = match self.tls {
306+
TlsConfig::ClearText => "http",
307+
TlsConfig::Tls => "https",
308+
};
309+
310+
format!("{}://{}:{}", tls, self.endpoint, self.port)
311+
}
312+
313+
DestinationKind::Validator => {
314+
format!("{}:{}:{}", "grpc", self.endpoint, self.port)
315+
}
316+
}
317+
}
318+
}

linera-service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ tracing.workspace = true
126126

127127
[build-dependencies]
128128
cfg_aliases.workspace = true
129+
tonic-build = { workspace = true, features = ["prost", "transport"] }
129130

130131
[dev-dependencies]
131132
amm.workspace = true
@@ -159,6 +160,7 @@ test-case.workspace = true
159160
test-log = { workspace = true, features = ["trace"] }
160161
test-strategy.workspace = true
161162
tokio = { workspace = true, features = ["full", "test-util"] }
163+
tracing-subscriber = { workspace = true, features = ["fmt"] }
162164

163165
[[bin]]
164166
name = "linera"

linera-service/build.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
// Copyright (c) Zefchain Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
fn main() {
4+
fn main() -> Result<(), Box<dyn std::error::Error>> {
55
cfg_aliases::cfg_aliases! {
66
with_revm: { feature = "revm" },
77
with_testing: { any(test, feature = "test") },
88
with_metrics: { all(not(target_arch = "wasm32"), feature = "metrics") },
99
};
10+
11+
tonic_build::compile_protos("src/exporter/proto/indexer.proto")?;
12+
13+
Ok(())
1014
}

linera-service/src/cli/main.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1936,7 +1936,6 @@ async fn run(options: &ClientOptions) -> Result<i32, Error> {
19361936
faucet_chain,
19371937
faucet_port,
19381938
faucet_amount,
1939-
block_exporters,
19401939
..
19411940
} => {
19421941
net_up_utils::handle_net_up_service(
@@ -1955,7 +1954,6 @@ async fn run(options: &ClientOptions) -> Result<i32, Error> {
19551954
*faucet_chain,
19561955
*faucet_port,
19571956
*faucet_amount,
1958-
*block_exporters,
19591957
)
19601958
.boxed()
19611959
.await?;

linera-service/src/cli/net_up_utils.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@ pub async fn handle_net_up_service(
183183
faucet_chain: Option<u32>,
184184
faucet_port: NonZeroU16,
185185
faucet_amount: Amount,
186-
num_block_exporters: u32,
187186
) -> anyhow::Result<()> {
188187
if num_initial_validators < 1 {
189188
panic!("The local test network must have at least one validator.");
@@ -223,7 +222,7 @@ pub async fn handle_net_up_service(
223222
cross_chain_config,
224223
storage_config_builder,
225224
path_provider,
226-
num_block_exporters,
225+
block_exporters: vec![],
227226
};
228227
let (mut net, client) = config.instantiate().await?;
229228
let faucet_service = print_messages_and_create_faucet(

linera-service/src/cli_wrappers/local_net.rs

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ use linera_base::{
1919
command::{resolve_binary, CommandExt},
2020
data_types::Amount,
2121
};
22-
use linera_client::client_options::ResourceControlPolicyConfig;
22+
use linera_client::{
23+
client_options::ResourceControlPolicyConfig,
24+
config::{BlockExporterConfig, DestinationKind},
25+
};
2326
use linera_core::node::ValidatorNodeProvider;
24-
use linera_rpc::config::CrossChainConfig;
27+
use linera_rpc::config::{CrossChainConfig, TlsConfig};
2528
#[cfg(all(feature = "storage-service", with_testing))]
2629
use linera_storage_service::common::storage_service_test_endpoint;
2730
#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
@@ -184,7 +187,7 @@ pub struct LocalNetConfig {
184187
pub cross_chain_config: CrossChainConfig,
185188
pub storage_config_builder: InnerStorageConfigBuilder,
186189
pub path_provider: PathProvider,
187-
pub num_block_exporters: u32,
190+
pub block_exporters: Vec<BlockExporterConfig>,
188191
}
189192

190193
/// A set of Linera validators running locally as native processes.
@@ -202,7 +205,7 @@ pub struct LocalNet {
202205
common_storage_config: InnerStorageConfig,
203206
cross_chain_config: CrossChainConfig,
204207
path_provider: PathProvider,
205-
num_block_exporters: u32,
208+
block_exporters: Vec<BlockExporterConfig>,
206209
}
207210

208211
/// The name of the environment variable that allows specifying additional arguments to be passed
@@ -305,7 +308,7 @@ impl LocalNetConfig {
305308
num_proxies,
306309
storage_config_builder,
307310
path_provider,
308-
num_block_exporters: 0,
311+
block_exporters: vec![],
309312
}
310313
}
311314
}
@@ -326,7 +329,7 @@ impl LineraNetConfig for LocalNetConfig {
326329
storage_config,
327330
self.cross_chain_config,
328331
self.path_provider,
329-
self.num_block_exporters,
332+
self.block_exporters,
330333
)?;
331334
let client = net.make_client().await;
332335
ensure!(
@@ -391,7 +394,7 @@ impl LocalNet {
391394
common_storage_config: InnerStorageConfig,
392395
cross_chain_config: CrossChainConfig,
393396
path_provider: PathProvider,
394-
num_block_exporters: u32,
397+
block_exporters: Vec<BlockExporterConfig>,
395398
) -> Result<Self> {
396399
Ok(Self {
397400
network,
@@ -407,7 +410,7 @@ impl LocalNet {
407410
common_storage_config,
408411
cross_chain_config,
409412
path_provider,
410-
num_block_exporters,
413+
block_exporters,
411414
})
412415
}
413416

@@ -501,9 +504,9 @@ impl LocalNet {
501504
));
502505
}
503506

504-
for j in 0..self.num_block_exporters {
507+
for j in 0..self.block_exporters.len() {
505508
let host = Network::Grpc.localhost();
506-
let port = Self::block_exporter_port(n, j as usize);
509+
let port = Self::block_exporter_port(n, j);
507510
let config_content = format!(
508511
r#"
509512
@@ -514,7 +517,7 @@ impl LocalNet {
514517
);
515518

516519
content.push_str(&config_content);
517-
let exporter_config = self.generate_block_exporter_config(n, j);
520+
let exporter_config = self.generate_block_exporter_config(n, j as u32);
518521
let config_path = self
519522
.path_provider
520523
.path()
@@ -536,7 +539,7 @@ impl LocalNet {
536539
let n = validator;
537540
let host = Network::Grpc.localhost();
538541
let port = Self::block_exporter_port(n, exporter_id as usize);
539-
let config = format!(
542+
let mut config = format!(
540543
r#"
541544
id = {exporter_id}
542545
@@ -546,6 +549,36 @@ impl LocalNet {
546549
"#
547550
);
548551

552+
for destination in &self.block_exporters[exporter_id as usize]
553+
.destination_config
554+
.destinations
555+
{
556+
let tls = match destination.tls {
557+
TlsConfig::ClearText => "ClearText",
558+
TlsConfig::Tls => "Tls",
559+
};
560+
561+
let endpoint = &destination.endpoint;
562+
let port = destination.port;
563+
let kind = match destination.kind {
564+
DestinationKind::Indexer => "Indexer",
565+
DestinationKind::Validator => "Validator",
566+
};
567+
568+
let destination_string_to_push = format!(
569+
r#"
570+
571+
[[destination_config.destinations]]
572+
tls = "{tls}"
573+
endpoint = "{endpoint}"
574+
port = {port}
575+
kind = "{kind}"
576+
"#
577+
);
578+
579+
config.push_str(&destination_string_to_push);
580+
}
581+
549582
config
550583
}
551584

@@ -799,8 +832,8 @@ impl LocalNet {
799832
let server = self.run_server(index, shard).await?;
800833
validator.add_server(server);
801834
}
802-
for block_exporter in 0..self.num_block_exporters {
803-
let exporter = self.run_exporter(index, block_exporter).await?;
835+
for block_exporter in 0..self.block_exporters.len() {
836+
let exporter = self.run_exporter(index, block_exporter as u32).await?;
804837
validator.add_block_exporter(exporter);
805838
}
806839
self.running_validators.insert(index, validator);

linera-service/src/exporter/common.rs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
// Copyright (c) Zefchain Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::future::IntoFuture;
4+
use std::{future::IntoFuture, net::SocketAddr};
55

66
use bincode::ErrorKind;
77
use custom_debug_derive::Debug;
8-
use linera_base::{crypto::CryptoHash, data_types::BlockHeight, identifiers::ChainId};
9-
use linera_chain::data_types::IncomingBundle;
10-
use linera_rpc::grpc::GrpcProtoConversionError;
8+
use linera_base::{
9+
crypto::CryptoHash,
10+
data_types::BlockHeight,
11+
identifiers::{BlobId, ChainId},
12+
};
13+
use linera_chain::{data_types::IncomingBundle, types::ConfirmedBlock};
14+
use linera_rpc::grpc::{GrpcError, GrpcProtoConversionError};
1115
use linera_sdk::views::ViewError;
1216
use serde::{Deserialize, Serialize};
1317
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
18+
use tonic::Status;
1419

1520
#[derive(thiserror::Error, Debug)]
1621
pub(crate) enum ExporterError {
@@ -38,6 +43,15 @@ pub(crate) enum ExporterError {
3843
#[error("trying to re-initialize the chain: {0}")]
3944
ChainAlreadyExists(ChainId),
4045

46+
#[error("unable to establish an underlying stream")]
47+
SynchronizationFailed(Box<Status>),
48+
49+
#[error(transparent)]
50+
GrpcError(#[from] GrpcError),
51+
52+
#[error("wrong destination")]
53+
DestinationError,
54+
4155
#[error("generic error: {0}")]
4256
GenericError(Box<dyn std::error::Error + Send + Sync + 'static>),
4357
}
@@ -61,6 +75,21 @@ impl From<BadNotificationKind> for ExporterError {
6175
}
6276
}
6377

78+
#[derive(Debug, Clone, Serialize, Deserialize)]
79+
pub(crate) struct CanonicalBlock {
80+
pub blobs: Box<[BlobId]>,
81+
pub block_hash: CryptoHash,
82+
}
83+
84+
impl CanonicalBlock {
85+
pub(crate) fn new(hash: CryptoHash, blobs: &[BlobId]) -> CanonicalBlock {
86+
CanonicalBlock {
87+
block_hash: hash,
88+
blobs: blobs.to_vec().into_boxed_slice(),
89+
}
90+
}
91+
}
92+
6493
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
6594
pub(crate) struct BlockId {
6695
pub hash: CryptoHash,
@@ -90,6 +119,10 @@ impl BlockId {
90119
height: incoming_bundle.bundle.height,
91120
}
92121
}
122+
123+
pub(crate) fn from_confirmed_block(block: &ConfirmedBlock) -> BlockId {
124+
BlockId::new(block.chain_id(), block.inner().hash(), block.height())
125+
}
93126
}
94127

95128
impl LiteBlockId {
@@ -104,14 +137,6 @@ impl From<BlockId> for LiteBlockId {
104137
}
105138
}
106139

107-
#[macro_export]
108-
macro_rules! dispatch {
109-
($func:expr, log = $log_value:expr $(, $args:expr)* ) => {{
110-
tracing::debug!("dispatching batch: {:?} from linera exporter", $log_value);
111-
$func($($args),*).await
112-
}};
113-
}
114-
115140
#[derive(Clone)]
116141
pub(crate) struct ExporterCancellationSignal {
117142
token: CancellationToken,
@@ -131,3 +156,7 @@ impl IntoFuture for ExporterCancellationSignal {
131156
self.token.cancelled_owned()
132157
}
133158
}
159+
160+
pub(crate) fn get_address(port: u16) -> SocketAddr {
161+
SocketAddr::from(([0, 0, 0, 0], port))
162+
}

0 commit comments

Comments
 (0)