diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5fcb1f14210..c6efd987de8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -843,7 +843,7 @@ jobs: # the same name (we only want to document those anyway) cargo doc --no-deps --lib -p mithril-stm -p mithril-common \ -p mithril-cardano-node-chain -p mithril-cardano-node-internal-database \ - -p mithril-aggregator-client -p mithril-build-script -p mithril-cli-helper \ + -p mithril-aggregator-client -p mithril-aggregator-discovery -p mithril-build-script -p mithril-cli-helper \ -p mithril-dmq -p mithril-doc -p mithril-doc-derive \ -p mithril-era -p mithril-metric -p mithril-persistence -p mithril-resource-pool \ -p mithril-ticker -p mithril-signed-entity-lock -p mithril-signed-entity-preloader \ diff --git a/Cargo.lock b/Cargo.lock index 696f28ce1c1..236de9175fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3674,6 +3674,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "mithril-aggregator-discovery" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "httpmock", + "mithril-aggregator-client", + "mithril-common", + "mockall", + "rand 0.9.2", + "reqwest", + "serde", + "serde_json", + "slog", + "slog-async", + "slog-scope", + "slog-term", + "thiserror 2.0.17", + "tokio", +] + [[package]] name = "mithril-aggregator-fake" version = "0.4.15" @@ -3795,6 +3817,7 @@ dependencies = [ "hex", "http", "httpmock", + "mithril-aggregator-discovery", "mithril-cardano-node-internal-database", "mithril-common", "mockall", diff --git a/Cargo.toml b/Cargo.toml index 5f699538b64..6130d00239b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "internal/cardano-node/mithril-cardano-node-chain", "internal/cardano-node/mithril-cardano-node-internal-database", "internal/mithril-aggregator-client", + "internal/mithril-aggregator-discovery", "internal/mithril-build-script", "internal/mithril-cli-helper", "internal/mithril-dmq", diff --git a/Makefile b/Makefile index a379c8e96a7..8c12ce5464f 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,33 @@ -COMPONENTS = mithril-aggregator mithril-client mithril-client-cli mithril-client-wasm \ - mithril-common mithril-relay mithril-signer mithril-stm \ - internal/mithril-build-script internal/mithril-cli-helper internal/mithril-doc \ +COMPONENTS = demo/protocol-demo \ + internal/cardano-node/mithril-cardano-node-chain \ + internal/cardano-node/mithril-cardano-node-internal-database \ + internal/mithril-aggregator-client \ + internal/mithril-aggregator-discovery \ + internal/mithril-build-script \ + internal/mithril-cli-helper \ internal/mithril-dmq \ - internal/mithril-doc-derive internal/mithril-era internal/mithril-metric internal/mithril-persistence \ + internal/mithril-doc \ + internal/mithril-doc-derive \ + internal/mithril-era \ + internal/mithril-metric \ + internal/mithril-persistence \ internal/mithril-protocol-config \ - internal/mithril-resource-pool internal/mithril-ticker \ - internal/cardano-node/mithril-cardano-node-chain internal/cardano-node/mithril-cardano-node-internal-database \ - internal/signed-entity/mithril-signed-entity-lock internal/signed-entity/mithril-signed-entity-preloader \ - internal/tests/mithril-api-spec internal/tests/mithril-test-http-server \ - demo/protocol-demo \ - mithril-test-lab/mithril-aggregator-fake mithril-test-lab/mithril-end-to-end + internal/mithril-resource-pool \ + internal/mithril-ticker \ + internal/signed-entity/mithril-signed-entity-lock \ + internal/signed-entity/mithril-signed-entity-preloader \ + internal/tests/mithril-api-spec \ + internal/tests/mithril-test-http-server \ + mithril-aggregator \ + mithril-client \ + mithril-client-cli \ + mithril-client-wasm \ + mithril-common \ + mithril-relay \ + mithril-signer \ + mithril-stm \ + mithril-test-lab/mithril-aggregator-fake \ + mithril-test-lab/mithril-end-to-end GOALS := $(or $(MAKECMDGOALS),all) NON_COMPONENT_GOALS := check-format format diff --git a/README.md b/README.md index 283cc97477f..5d0e9b5f950 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,10 @@ This repository consists of the following parts: - [**Mithril signer**](./mithril-signer): the node of the **Mithril network** responsible for producing individual signatures that are collected and aggregated by the **Mithril aggregator**. - [**Internal**](./internal): the shared tools and API used by **Mithril** crates. - - [**Mithril aggregator client**](./internal/mithril-aggregator-client): a client to request data from a Mithril Aggregator, used by **Mithril network** nodes and client library. + + - [**Mithril aggregator client**](./internal/mithril-aggregator-client): a client to request data from a Mithril aggregator, used by **Mithril network** nodes and client library. + + - [**Mithril aggregator discovery**](./internal/mithril-aggregator-discovery): mechanisms to discover available Mithril aggregator, used by **Mithril network** nodes and client library. - [**Mithril build script**](./internal/mithril-build-script): a toolbox for Mithril crates that uses a build script phase. @@ -113,11 +116,13 @@ This repository consists of the following parts: - [**Mithril signed entity prealoader**](./internal/signed-entity/mithril-signed-entity-preloader): a **preload** mechanism for the Cardano transaction signed entity, used by **Mithril network** nodes. - [**tests**](./internal/tests): shared testing tools used by **Mithril** crates. + - [**Mithril api spec**](./internal/tests/mithril-api-spec): toolset to verify conformity of http routes against an Open Api specification, used by **Mithril network** nodes. - [**Mithril test http server**](internal/tests/mithril-test-http-server): provides a test http server, used by **Mithril network** nodes. - [**Mithril test lab**](./mithril-test-lab): the suite of tools that allow us to test and stress the **Mithril** protocol implementations. + - [**Mithril devnet**](./mithril-test-lab/mithril-devnet): the private **Mithril/Cardano network** used to scaffold a **Mithril network** on top of a **Cardano network**. - [**Mithril end to end**](./mithril-test-lab/mithril-end-to-end): the tool used to run test scenarios against a **Mithril devnet**. diff --git a/internal/mithril-aggregator-discovery/Cargo.toml b/internal/mithril-aggregator-discovery/Cargo.toml new file mode 100644 index 00000000000..f6106fc4f47 --- /dev/null +++ b/internal/mithril-aggregator-discovery/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "mithril-aggregator-discovery" +description = "Mechanisms to discover aggregator available in a Mithril network." +version = "0.1.0" +authors.workspace = true +documentation.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true +include = ["**/*.rs", "Cargo.toml", "README.md", ".gitignore"] + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +mithril-common = { path = "../../mithril-common" } +mithril-aggregator-client = { path = "../mithril-aggregator-client" } +rand = { version = "0.9.2"} +reqwest = { workspace = true, features = [ + "default", + "gzip", + "zstd", + "deflate", + "brotli" +] } +serde = { workspace = true } +serde_json = { workspace = true } +slog = { workspace = true } +slog-scope = "4.4.0" +thiserror = { workspace = true } +tokio = { workspace = true, features = ["sync", "rt-multi-thread"] } + +[dev-dependencies] +mockall = { workspace = true } +httpmock = "0.8.1" +slog-async = { workspace = true } +slog-term = { workspace = true } +tokio = { workspace = true, features = ["macros"] } diff --git a/internal/mithril-aggregator-discovery/Makefile b/internal/mithril-aggregator-discovery/Makefile new file mode 100644 index 00000000000..e503264d97d --- /dev/null +++ b/internal/mithril-aggregator-discovery/Makefile @@ -0,0 +1,19 @@ +.PHONY: all build test check doc + +CARGO = cargo + +all: test build + +build: + ${CARGO} build --release + +test: + ${CARGO} test + +check: + ${CARGO} check --release --all-features --all-targets + ${CARGO} clippy --release --all-features --all-targets + ${CARGO} fmt --check + +doc: + ${CARGO} doc --no-deps --open \ No newline at end of file diff --git a/internal/mithril-aggregator-discovery/README.md b/internal/mithril-aggregator-discovery/README.md new file mode 100644 index 00000000000..56a780b6020 --- /dev/null +++ b/internal/mithril-aggregator-discovery/README.md @@ -0,0 +1,3 @@ +# Mithril-aggregator-discovery + +This crate provides mechanisms to discover aggregators in a Mithril network. diff --git a/internal/mithril-aggregator-discovery/src/capabilities_discoverer.rs b/internal/mithril-aggregator-discovery/src/capabilities_discoverer.rs new file mode 100644 index 00000000000..0f7b6c91619 --- /dev/null +++ b/internal/mithril-aggregator-discovery/src/capabilities_discoverer.rs @@ -0,0 +1,451 @@ +use std::sync::Arc; + +use mithril_common::{ + AggregateSignatureType, StdResult, entities::SignedEntityTypeDiscriminants, + messages::AggregatorCapabilities, +}; + +use crate::{AggregatorDiscoverer, AggregatorEndpoint, MithrilNetwork}; + +/// Required capabilities for an aggregator. +#[derive(Clone, PartialEq, Eq, Debug)] +pub enum RequiredAggregatorCapabilities { + /// All + All, + /// Signed entity type. + SignedEntityType(SignedEntityTypeDiscriminants), + /// Aggregate signature type. + AggregateSignatureType(AggregateSignatureType), + /// Logical OR of required capabilities. + Or(Vec), + /// Logical AND of required capabilities. + And(Vec), +} + +impl RequiredAggregatorCapabilities { + /// Check if the available capabilities match the required capabilities. + fn matches(&self, available: &AggregatorCapabilities) -> bool { + match self { + RequiredAggregatorCapabilities::All => true, + RequiredAggregatorCapabilities::SignedEntityType(required_signed_entity_type) => { + available + .signed_entity_types + .iter() + .any(|req| req == required_signed_entity_type) + } + RequiredAggregatorCapabilities::AggregateSignatureType( + required_aggregate_signature_types, + ) => *required_aggregate_signature_types == available.aggregate_signature_type, + RequiredAggregatorCapabilities::Or(requirements) => { + requirements.iter().any(|req| req.matches(available)) + } + RequiredAggregatorCapabilities::And(requirements) => { + requirements.iter().all(|req| req.matches(available)) + } + } + } +} + +/// An aggregator discoverer for specific capabilities. +pub struct CapableAggregatorDiscoverer { + required_capabilities: RequiredAggregatorCapabilities, + inner_discoverer: Arc, +} + +impl CapableAggregatorDiscoverer { + /// Creates a new `CapableAggregatorDiscoverer` instance with the provided capabilities. + pub fn new( + capabilities: RequiredAggregatorCapabilities, + inner_discoverer: Arc, + ) -> Self { + Self { + required_capabilities: capabilities, + inner_discoverer, + } + } +} + +#[async_trait::async_trait] +impl AggregatorDiscoverer for CapableAggregatorDiscoverer { + async fn get_available_aggregators( + &self, + network: MithrilNetwork, + ) -> StdResult>> { + let aggregator_endpoints = self.inner_discoverer.get_available_aggregators(network).await?; + + Ok(Box::new(CapableAggregatorDiscovererIterator { + required_capabilities: self.required_capabilities.clone(), + inner_iterator: aggregator_endpoints, + })) + } +} + +/// An iterator over aggregator endpoints filtered by capabilities. +struct CapableAggregatorDiscovererIterator { + required_capabilities: RequiredAggregatorCapabilities, + inner_iterator: Box>, +} + +impl Iterator for CapableAggregatorDiscovererIterator { + type Item = AggregatorEndpoint; + + fn next(&mut self) -> Option { + for aggregator_endpoint in self.inner_iterator.by_ref() { + let aggregator_endpoint_clone = aggregator_endpoint.clone(); + let aggregator_capabilities = tokio::task::block_in_place(move || { + tokio::runtime::Handle::current().block_on(async move { + aggregator_endpoint_clone.retrieve_capabilities().await + }) + }); + if let Ok(aggregator_capabilities) = aggregator_capabilities + && self.required_capabilities.matches(&aggregator_capabilities) + { + return Some(aggregator_endpoint); + } + } + + None + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + + use httpmock::MockServer; + use serde_json::json; + + use mithril_common::{ + AggregateSignatureType::Concatenation, + entities::SignedEntityTypeDiscriminants::{ + CardanoDatabase, CardanoStakeDistribution, CardanoTransactions, + MithrilStakeDistribution, + }, + messages::AggregatorFeaturesMessage, + }; + + use super::*; + + mod required_capabilities { + use super::*; + + #[test] + fn required_capabilities_match_all_success() { + let required = RequiredAggregatorCapabilities::All; + let available = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([]), + cardano_transactions_prover: None, + }; + + assert!(required.matches(&available)); + } + + #[test] + fn required_capabilities_match_signed_entity_types_success() { + let required = + RequiredAggregatorCapabilities::SignedEntityType(CardanoStakeDistribution); + let available = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([ + CardanoTransactions, + CardanoStakeDistribution, + CardanoDatabase, + ]), + cardano_transactions_prover: None, + }; + + assert!(required.matches(&available)); + } + + #[test] + fn required_capabilities_match_signed_entity_types_failure() { + let required = + RequiredAggregatorCapabilities::SignedEntityType(MithrilStakeDistribution); + let available = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([ + CardanoTransactions, + CardanoStakeDistribution, + CardanoDatabase, + ]), + cardano_transactions_prover: None, + }; + + assert!(!required.matches(&available)); + } + + #[test] + fn required_capabilities_match_signed_aggregate_signature_type_success() { + let required = RequiredAggregatorCapabilities::AggregateSignatureType(Concatenation); + let available = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([ + CardanoTransactions, + CardanoStakeDistribution, + CardanoDatabase, + ]), + cardano_transactions_prover: None, + }; + + assert!(required.matches(&available)); + } + + #[test] + fn required_capabilities_match_or_success() { + let required = RequiredAggregatorCapabilities::Or(vec![ + RequiredAggregatorCapabilities::SignedEntityType(MithrilStakeDistribution), + RequiredAggregatorCapabilities::AggregateSignatureType(Concatenation), + ]); + let available = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([ + CardanoTransactions, + CardanoStakeDistribution, + CardanoDatabase, + ]), + cardano_transactions_prover: None, + }; + + assert!(required.matches(&available)); + } + + #[test] + fn required_capabilities_match_and_success() { + let required = RequiredAggregatorCapabilities::And(vec![ + RequiredAggregatorCapabilities::SignedEntityType(CardanoTransactions), + RequiredAggregatorCapabilities::SignedEntityType(CardanoStakeDistribution), + RequiredAggregatorCapabilities::AggregateSignatureType(Concatenation), + ]); + let available = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([ + CardanoTransactions, + CardanoStakeDistribution, + CardanoDatabase, + ]), + cardano_transactions_prover: None, + }; + + assert!(required.matches(&available)); + } + + #[test] + fn required_capabilities_match_and_failure() { + let required = RequiredAggregatorCapabilities::And(vec![ + RequiredAggregatorCapabilities::SignedEntityType(CardanoTransactions), + RequiredAggregatorCapabilities::SignedEntityType(CardanoStakeDistribution), + RequiredAggregatorCapabilities::AggregateSignatureType(Concatenation), + ]); + let available = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([CardanoTransactions]), + cardano_transactions_prover: None, + }; + + assert!(!required.matches(&available)); + } + } + + mod capable_discoverer { + use super::*; + + fn create_aggregator_features_message( + capabilities: AggregatorCapabilities, + ) -> AggregatorFeaturesMessage { + AggregatorFeaturesMessage { + open_api_version: "1.0.0".to_string(), + documentation_url: "https://docs".to_string(), + capabilities, + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn get_available_aggregators_success() { + let capabilities = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([ + CardanoStakeDistribution, + CardanoTransactions, + ]), + cardano_transactions_prover: None, + }; + let aggregator_server = MockServer::start(); + let aggregator_server_mock = aggregator_server.mock(|when, then| { + when.path("/"); + then.status(200) + .body(json!(create_aggregator_features_message(capabilities)).to_string()); + }); + let discoverer = CapableAggregatorDiscoverer::new( + RequiredAggregatorCapabilities::And(vec![ + RequiredAggregatorCapabilities::SignedEntityType(CardanoTransactions), + RequiredAggregatorCapabilities::AggregateSignatureType(Concatenation), + ]), + Arc::new(crate::test::double::AggregatorDiscovererFake::new(vec![ + Ok(vec![AggregatorEndpoint::new(aggregator_server.url("/"))]), + ])), + ); + + let mut aggregators = discoverer + .get_available_aggregators(MithrilNetwork::new("release-devnet".into())) + .await + .unwrap(); + + let next_aggregator = aggregators.next(); + aggregator_server_mock.assert(); + assert_eq!( + Some(AggregatorEndpoint::new(aggregator_server.url("/"))), + next_aggregator + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn get_available_aggregators_succeeds_when_aggregator_capabilities_do_not_match() { + let capabilities = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([CardanoTransactions]), + cardano_transactions_prover: None, + }; + let aggregator_server = MockServer::start(); + let aggregator_server_mock = aggregator_server.mock(|when, then| { + when.path("/"); + then.status(200) + .body(json!(create_aggregator_features_message(capabilities)).to_string()); + }); + let discoverer = CapableAggregatorDiscoverer::new( + RequiredAggregatorCapabilities::And(vec![ + RequiredAggregatorCapabilities::SignedEntityType(CardanoDatabase), + RequiredAggregatorCapabilities::AggregateSignatureType(Concatenation), + ]), + Arc::new(crate::test::double::AggregatorDiscovererFake::new(vec![ + Ok(vec![AggregatorEndpoint::new(aggregator_server.url("/"))]), + ])), + ); + + let mut aggregators = discoverer + .get_available_aggregators(MithrilNetwork::new("release-devnet".into())) + .await + .unwrap(); + + let next_aggregator = aggregators.next(); + aggregator_server_mock.assert(); + assert!(next_aggregator.is_none()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn get_available_aggregators_succeeds_when_one_aggregator_returns_an_error() { + let aggregator_server_1 = MockServer::start(); + let aggregator_server_mock_1 = aggregator_server_1.mock(|when, then| { + when.path("/"); + then.status(500); + }); + let capabilities_2 = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([CardanoStakeDistribution, CardanoDatabase]), + cardano_transactions_prover: None, + }; + let aggregator_server_2 = MockServer::start(); + let aggregator_server_mock_2 = aggregator_server_2.mock(|when, then| { + when.path("/"); + then.status(200) + .body(json!(create_aggregator_features_message(capabilities_2)).to_string()); + }); + let discoverer = CapableAggregatorDiscoverer::new( + RequiredAggregatorCapabilities::And(vec![ + RequiredAggregatorCapabilities::SignedEntityType(CardanoDatabase), + RequiredAggregatorCapabilities::AggregateSignatureType(Concatenation), + ]), + Arc::new(crate::test::double::AggregatorDiscovererFake::new(vec![ + Ok(vec![ + AggregatorEndpoint::new(aggregator_server_1.url("/")), + AggregatorEndpoint::new(aggregator_server_2.url("/")), + ]), + ])), + ); + + let mut aggregators = discoverer + .get_available_aggregators(MithrilNetwork::new("release-devnet".into())) + .await + .unwrap(); + + let next_aggregator = aggregators.next(); + aggregator_server_mock_1.assert(); + aggregator_server_mock_2.assert(); + assert_eq!( + Some(AggregatorEndpoint::new(aggregator_server_2.url("/"))), + next_aggregator + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn get_available_aggregators_succeeds_and_makes_minimum_calls_to_aggregators() { + let aggregator_server_1 = MockServer::start(); + let aggregator_server_mock_1 = aggregator_server_1.mock(|when, then| { + when.path("/"); + then.status(500); + }); + let capabilities_2 = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([CardanoStakeDistribution]), + cardano_transactions_prover: None, + }; + let aggregator_server_2 = MockServer::start(); + let aggregator_server_mock_2 = aggregator_server_2.mock(|when, then| { + when.path("/"); + then.status(200) + .body(json!(create_aggregator_features_message(capabilities_2)).to_string()); + }); + let capabilities_3 = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([CardanoDatabase]), + cardano_transactions_prover: None, + }; + let aggregator_server_3 = MockServer::start(); + let aggregator_server_mock_3 = aggregator_server_3.mock(|when, then| { + when.path("/"); + then.status(200) + .body(json!(create_aggregator_features_message(capabilities_3)).to_string()); + }); + let capabilities_4 = AggregatorCapabilities { + aggregate_signature_type: Concatenation, + signed_entity_types: BTreeSet::from([CardanoDatabase]), + cardano_transactions_prover: None, + }; + let aggregator_server_4 = MockServer::start(); + let aggregator_server_mock_4 = aggregator_server_4.mock(|when, then| { + when.path("/"); + then.status(200) + .body(json!(create_aggregator_features_message(capabilities_4)).to_string()); + }); + let discoverer = CapableAggregatorDiscoverer::new( + RequiredAggregatorCapabilities::And(vec![ + RequiredAggregatorCapabilities::SignedEntityType(CardanoDatabase), + RequiredAggregatorCapabilities::AggregateSignatureType(Concatenation), + ]), + Arc::new(crate::test::double::AggregatorDiscovererFake::new(vec![ + Ok(vec![ + AggregatorEndpoint::new(aggregator_server_1.url("/")), + AggregatorEndpoint::new(aggregator_server_2.url("/")), + AggregatorEndpoint::new(aggregator_server_3.url("/")), + AggregatorEndpoint::new(aggregator_server_4.url("/")), + ]), + ])), + ); + + let mut aggregators = discoverer + .get_available_aggregators(MithrilNetwork::new("release-devnet".into())) + .await + .unwrap(); + + let next_aggregator = aggregators.next(); + aggregator_server_mock_1.assert(); + aggregator_server_mock_2.assert(); + aggregator_server_mock_3.assert(); + assert_eq!(0, aggregator_server_mock_4.calls()); + assert_eq!( + Some(AggregatorEndpoint::new(aggregator_server_3.url("/"))), + next_aggregator + ); + } + } +} diff --git a/internal/mithril-aggregator-discovery/src/http_config_discoverer.rs b/internal/mithril-aggregator-discovery/src/http_config_discoverer.rs new file mode 100644 index 00000000000..34ba0622329 --- /dev/null +++ b/internal/mithril-aggregator-discovery/src/http_config_discoverer.rs @@ -0,0 +1,202 @@ +use std::{collections::HashMap, time::Duration}; + +use anyhow::Context; +use reqwest::Client; +use serde::{Deserialize, Serialize}; + +use mithril_common::StdResult; + +use crate::{AggregatorDiscoverer, AggregatorEndpoint, MithrilNetwork}; + +const DEFAULT_REMOTE_NETWORKS_CONFIG_URL: &str = + "https://raw.githubusercontent.com/input-output-hk/mithril/main/networks.json"; + +/// Representation of the networks configuration file. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct NetworksConfigMessage { + #[serde(flatten)] + pub networks: HashMap, +} + +/// Representation of a network environment in the networks configuration file. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct NetworkEnvironmentMessage { + #[serde(rename = "mithril-networks")] + pub mithril_networks: Vec>, +} + +/// Representation of a Mithril network in the networks configuration file. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct MithrilNetworkMessage { + pub aggregators: Vec, +} + +/// Representation of an aggregator in the networks configuration file. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct AggregatorMessage { + pub url: String, +} + +/// An implementation of the [AggregatorDiscoverer] trait which discovers aggregators from remote networks configuration. +/// +/// The reference file is the `networks.json` file hosted in the Mithril GitHub repository. +pub struct HttpConfigAggregatorDiscoverer { + configuration_file_url: String, +} + +impl HttpConfigAggregatorDiscoverer { + const HTTP_TIMEOUT: Duration = Duration::from_secs(10); + + /// Creates a new `HttpConfigAggregatorDiscoverer` instance with the provided results. + pub fn new(configuration_file_url: &str) -> Self { + Self { + configuration_file_url: configuration_file_url.to_string(), + } + } + + /// Builds a reqwest HTTP client. + fn build_client(&self) -> StdResult { + let client_builder = Client::builder().timeout(Self::HTTP_TIMEOUT); + let client = client_builder.build()?; + + Ok(client) + } +} + +impl Default for HttpConfigAggregatorDiscoverer { + fn default() -> Self { + Self::new(DEFAULT_REMOTE_NETWORKS_CONFIG_URL) + } +} + +#[async_trait::async_trait] +impl AggregatorDiscoverer for HttpConfigAggregatorDiscoverer { + async fn get_available_aggregators( + &self, + network: MithrilNetwork, + ) -> StdResult>> { + let client = self.build_client()?; + let networks_configuration_response = client + .get(&self.configuration_file_url) + .send() + .await + .with_context(|| { + format!( + "AggregatorDiscovererHttpConfig failed retrieving configuration file from {}", + &self.configuration_file_url + ) + })? + .json::() + .await + .with_context(|| { + format!( + "AggregatorDiscovererHttpConfig failed parsing configuration file from {}", + &self.configuration_file_url + ) + })?; + let aggregator_endpoints = networks_configuration_response + .networks + .values() + .flat_map(|env| &env.mithril_networks) + .flat_map(|network_map| network_map.iter()) + .filter(|(name, _)| *name == network.name()) + .flat_map(|(_, network)| &network.aggregators) + .map(|aggregator_msg| AggregatorEndpoint::new(aggregator_msg.url.clone())) + .collect::>(); + + Ok(Box::new(aggregator_endpoints.into_iter())) + } +} + +#[cfg(test)] +mod tests { + use httpmock::MockServer; + + use super::*; + + const TEST_NETWORKS_CONFIG_JSON_SUCCESS: &str = r#" + { + "devnet": { + "mithril-networks": [ + { + "release-devnet": { + "aggregators": [ + { "url": "https://release-devnet-aggregator1" }, + { "url": "https://release-devnet-aggregator2" } + ] + } + } + ] + }, + "testnet": { + "mithril-networks": [ + { + "preview-testnet": { + "aggregators": [ + { "url": "https://preview-testnet-aggregator1" }, + { "url": "https://preview-testnet-aggregator2" } + ] + } + } + ] + } + }"#; + + const TEST_NETWORKS_CONFIG_JSON_FAILURE: &str = r#" + { + {"} + }"#; + + fn create_server_and_discoverer(content: &str) -> (MockServer, HttpConfigAggregatorDiscoverer) { + let size = content.len() as u64; + let server = MockServer::start(); + server.mock(|when, then| { + when.method(httpmock::Method::GET).path("/networks.json"); + then.status(200) + .body(content) + .header(reqwest::header::CONTENT_LENGTH.as_str(), size.to_string()); + }); + let configuration_file_url = format!("{}{}", server.url("/"), "networks.json"); + let discoverer = HttpConfigAggregatorDiscoverer::new(&configuration_file_url); + + (server, discoverer) + } + + #[tokio::test] + async fn get_available_aggregators_success() { + let content = TEST_NETWORKS_CONFIG_JSON_SUCCESS; + let (_server, discoverer) = create_server_and_discoverer(content); + let aggregators = discoverer + .get_available_aggregators(MithrilNetwork::new("release-devnet".into())) + .await + .unwrap(); + + assert_eq!( + vec![ + AggregatorEndpoint::new("https://release-devnet-aggregator1".into()), + AggregatorEndpoint::new("https://release-devnet-aggregator2".into()), + ], + aggregators.collect::>() + ); + + let mut aggregators = discoverer + .get_available_aggregators(MithrilNetwork::new("unknown".into())) + .await + .unwrap(); + + assert!(aggregators.next().is_none()); + } + + #[tokio::test] + async fn get_available_aggregators_failure() { + let content = TEST_NETWORKS_CONFIG_JSON_FAILURE; + let (_server, discoverer) = create_server_and_discoverer(content); + let result = discoverer + .get_available_aggregators(MithrilNetwork::new("release-devnet".into())) + .await; + assert!( + result.is_err(), + "The retrieval of the aggregators should fail" + ); + } +} diff --git a/internal/mithril-aggregator-discovery/src/interface.rs b/internal/mithril-aggregator-discovery/src/interface.rs new file mode 100644 index 00000000000..61b725a12f1 --- /dev/null +++ b/internal/mithril-aggregator-discovery/src/interface.rs @@ -0,0 +1,18 @@ +//! Interface definition for Mithril Protocol Configuration provider. + +use mithril_common::StdResult; + +use crate::model::{AggregatorEndpoint, MithrilNetwork}; + +/// An aggregator discoverer. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait AggregatorDiscoverer: Sync + Send { + /// Get an iterator over a list of available aggregators in a Mithril network. + /// + /// Note: there is no guarantee that the returned aggregators is sorted, complete or up-to-date. + async fn get_available_aggregators( + &self, + network: MithrilNetwork, + ) -> StdResult>>; +} diff --git a/internal/mithril-aggregator-discovery/src/lib.rs b/internal/mithril-aggregator-discovery/src/lib.rs new file mode 100644 index 00000000000..077c0fbf01a --- /dev/null +++ b/internal/mithril-aggregator-discovery/src/lib.rs @@ -0,0 +1,15 @@ +#![warn(missing_docs)] +//! This crate provides mechanisms to discover aggregators in a Mithril network. + +mod capabilities_discoverer; +mod http_config_discoverer; +mod interface; +mod model; +mod rand_discoverer; +pub mod test; + +pub use capabilities_discoverer::{CapableAggregatorDiscoverer, RequiredAggregatorCapabilities}; +pub use http_config_discoverer::HttpConfigAggregatorDiscoverer; +pub use interface::AggregatorDiscoverer; +pub use model::{AggregatorEndpoint, MithrilNetwork}; +pub use rand_discoverer::ShuffleAggregatorDiscoverer; diff --git a/internal/mithril-aggregator-discovery/src/model.rs b/internal/mithril-aggregator-discovery/src/model.rs new file mode 100644 index 00000000000..6f6f27666d4 --- /dev/null +++ b/internal/mithril-aggregator-discovery/src/model.rs @@ -0,0 +1,71 @@ +use std::time::Duration; + +use mithril_aggregator_client::{AggregatorHttpClient, query::GetAggregatorFeaturesQuery}; +use mithril_common::{StdResult, messages::AggregatorCapabilities}; + +/// Representation of a Mithril network +// TODO: to move to mithril common +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MithrilNetwork(String); + +impl MithrilNetwork { + /// Create a new MithrilNetwork instance + pub fn new(name: String) -> Self { + Self(name) + } + + /// Create a dummy MithrilNetwork instance for testing purposes + pub fn dummy() -> Self { + Self("dummy".to_string()) + } + + /// Retrieve the name of the Mithril network + pub fn name(&self) -> &str { + &self.0 + } +} + +impl From for MithrilNetwork { + fn from(name: String) -> Self { + MithrilNetwork::new(name) + } +} + +/// Representation of an aggregator endpoint +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AggregatorEndpoint { + url: String, +} + +impl AggregatorEndpoint { + const HTTP_TIMEOUT: Duration = Duration::from_secs(5); + + /// Create a new AggregatorEndpoint instance + pub fn new(url: String) -> Self { + Self { url } + } + + /// Retrieve the capabilities of the aggregator + pub async fn retrieve_capabilities(&self) -> StdResult { + let aggregator_client = AggregatorHttpClient::builder(self.url.clone()) + .with_timeout(Self::HTTP_TIMEOUT) + .build()?; + + Ok(aggregator_client + .send(GetAggregatorFeaturesQuery::current()) + .await? + .capabilities) + } +} + +impl From for String { + fn from(endpoint: AggregatorEndpoint) -> Self { + endpoint.url + } +} + +impl std::fmt::Display for AggregatorEndpoint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.url) + } +} diff --git a/internal/mithril-aggregator-discovery/src/rand_discoverer.rs b/internal/mithril-aggregator-discovery/src/rand_discoverer.rs new file mode 100644 index 00000000000..a967ae8b127 --- /dev/null +++ b/internal/mithril-aggregator-discovery/src/rand_discoverer.rs @@ -0,0 +1,77 @@ +use std::sync::Arc; + +use rand::{Rng, seq::SliceRandom}; +use tokio::sync::Mutex; + +use mithril_common::StdResult; + +use crate::{AggregatorDiscoverer, AggregatorEndpoint, MithrilNetwork}; + +/// A discoverer that returns a random set of aggregators +pub struct ShuffleAggregatorDiscoverer { + random_generator: Arc>, + inner_discoverer: Arc, +} + +impl ShuffleAggregatorDiscoverer { + /// Creates a new `ShuffleAggregatorDiscoverer` instance with the provided inner discoverer. + pub fn new(inner_discoverer: Arc, random_generator: R) -> Self { + Self { + inner_discoverer, + random_generator: Arc::new(Mutex::new(random_generator)), + } + } +} + +#[async_trait::async_trait] +impl AggregatorDiscoverer for ShuffleAggregatorDiscoverer { + async fn get_available_aggregators( + &self, + network: MithrilNetwork, + ) -> StdResult>> { + let mut aggregators: Vec = self + .inner_discoverer + .get_available_aggregators(network) + .await? + .collect(); + let mut rng = self.random_generator.lock().await; + aggregators.shuffle(&mut *rng); + + Ok(Box::new(aggregators.into_iter())) + } +} + +#[cfg(test)] +mod tests { + use rand::{SeedableRng, rngs::StdRng}; + + use crate::test::double::AggregatorDiscovererFake; + + use super::*; + + #[tokio::test] + async fn shuffle_aggregator_discoverer() { + let inner_discoverer = AggregatorDiscovererFake::new(vec![Ok(vec![ + AggregatorEndpoint::new("https://release-devnet-aggregator1".to_string()), + AggregatorEndpoint::new("https://release-devnet-aggregator2".to_string()), + AggregatorEndpoint::new("https://release-devnet-aggregator3".to_string()), + ])]); + let seed = [0u8; 32]; + let rng = StdRng::from_seed(seed); + let discoverer = ShuffleAggregatorDiscoverer::new(Arc::new(inner_discoverer), rng); + + let aggregators = discoverer + .get_available_aggregators(MithrilNetwork::new("release-devnet".into())) + .await + .unwrap(); + + assert_eq!( + vec![ + AggregatorEndpoint::new("https://release-devnet-aggregator3".into()), + AggregatorEndpoint::new("https://release-devnet-aggregator2".into()), + AggregatorEndpoint::new("https://release-devnet-aggregator1".into()), + ], + aggregators.collect::>() + ); + } +} diff --git a/internal/mithril-aggregator-discovery/src/test/double/discoverer.rs b/internal/mithril-aggregator-discovery/src/test/double/discoverer.rs new file mode 100644 index 00000000000..808fb441174 --- /dev/null +++ b/internal/mithril-aggregator-discovery/src/test/double/discoverer.rs @@ -0,0 +1,78 @@ +use std::collections::VecDeque; + +use tokio::sync::Mutex; + +use mithril_common::StdResult; + +use crate::{AggregatorDiscoverer, AggregatorEndpoint, MithrilNetwork}; + +type AggregatorListReturn = StdResult>; + +/// A fake implementation of the [AggregatorDiscoverer] trait for testing purposes. +pub struct AggregatorDiscovererFake { + results: Mutex>, +} + +impl AggregatorDiscovererFake { + /// Creates a new `AggregatorDiscovererFake` instance with the provided results. + pub fn new(results: Vec) -> Self { + Self { + results: Mutex::new(VecDeque::from(results)), + } + } +} + +#[async_trait::async_trait] +impl AggregatorDiscoverer for AggregatorDiscovererFake { + async fn get_available_aggregators( + &self, + _network: MithrilNetwork, + ) -> StdResult>> { + let mut results = self.results.lock().await; + + let endpoints = results.pop_front().ok_or_else(|| { + anyhow::anyhow!("No more results available in AggregatorDiscovererFake") + })??; + + Ok(Box::new(endpoints.into_iter())) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[tokio::test] + async fn get_available_aggregators_success() { + let consumer = AggregatorDiscovererFake::new(vec![ + Ok(vec![AggregatorEndpoint::new("test-1".to_string())]), + Ok(vec![AggregatorEndpoint::new("test-2".to_string())]), + ]); + + let messages = consumer + .get_available_aggregators(MithrilNetwork::dummy()) + .await + .unwrap(); + + assert_eq!( + vec![AggregatorEndpoint::new("test-1".to_string())], + messages.collect::>() + ); + } + + #[tokio::test] + async fn consume_messages_failure() { + let consumer = AggregatorDiscovererFake::new(vec![ + Err(anyhow::anyhow!("Test error")), + Ok(vec![AggregatorEndpoint::new("test-2".to_string())]), + ]); + + let result = consumer.get_available_aggregators(MithrilNetwork::dummy()).await; + + assert!( + result.is_err(), + "AggregatorDiscovererFake should return an error" + ); + } +} diff --git a/internal/mithril-aggregator-discovery/src/test/double/mod.rs b/internal/mithril-aggregator-discovery/src/test/double/mod.rs new file mode 100644 index 00000000000..31b02b52c2d --- /dev/null +++ b/internal/mithril-aggregator-discovery/src/test/double/mod.rs @@ -0,0 +1,7 @@ +//! Test doubles +//! +//! Enable unit testing with controlled inputs and predictable behavior. + +mod discoverer; + +pub use discoverer::*; diff --git a/internal/mithril-aggregator-discovery/src/test/mod.rs b/internal/mithril-aggregator-discovery/src/test/mod.rs new file mode 100644 index 00000000000..4be2cc86b8b --- /dev/null +++ b/internal/mithril-aggregator-discovery/src/test/mod.rs @@ -0,0 +1,7 @@ +//! Test utilities. +//! +//! ⚠ Do not use in production code ⚠ +//! +//! This module provides in particular test doubles for the traits defined in this crate. + +pub mod double; diff --git a/mithril-client-cli/src/commands/tools/aggregator_discovery.rs b/mithril-client-cli/src/commands/tools/aggregator_discovery.rs new file mode 100644 index 00000000000..b3d4f82895a --- /dev/null +++ b/mithril-client-cli/src/commands/tools/aggregator_discovery.rs @@ -0,0 +1,192 @@ +use clap::Parser; + +use mithril_client::{ + AggregatorDiscoveryType, ClientBuilder, MithrilNetwork, MithrilResult, + RequiredAggregatorCapabilities, + common::{AggregateSignatureType, SignedEntityTypeDiscriminants}, +}; + +/// Clap command to select an aggregator from the available ones with automatic discovery. +#[derive(Parser, Debug, Clone)] +pub struct AggregatorSelectCommand { + /// Path to the Cardano node database directory. + #[clap(long)] + network: MithrilNetwork, + + /// Maximum number of entries to retrieve + #[clap(long, default_value_t = 1)] + max_entries: usize, + + /// Signed entity types to consider for the discovery + /// + /// If not provided, all signed entity types are considered. + #[clap(long, default_value = "[]")] + signed_entity_types: Vec, + + /// Aggregate signature types to consider for the discovery + /// + /// If not provided, all aggregate signature types are considered. + #[clap(long, default_value = "[]")] + aggregate_signature_types: Vec, +} + +impl AggregatorSelectCommand { + /// Main command execution + pub async fn execute(&self) -> MithrilResult<()> { + let required_capabilities = self.build_required_capabilities(); + let client_builder = + ClientBuilder::new(AggregatorDiscoveryType::Automatic(self.network.clone())) + .with_capabilities(required_capabilities) + .with_default_aggregator_discoverer(); + let aggregator_endpoints = client_builder + .discover_aggregator(&self.network)? + .take(self.max_entries); + + println!( + "Discovering at most {} aggregator endpoints:", + self.max_entries, + ); + for endpoint in aggregator_endpoints { + println!("- Found: {endpoint}"); + } + + Ok(()) + } + + fn build_required_capabilities(&self) -> RequiredAggregatorCapabilities { + if self.signed_entity_types.is_empty() && self.aggregate_signature_types.is_empty() { + return RequiredAggregatorCapabilities::All; + } + + let mut required_capabilities = vec![]; + if !self.signed_entity_types.is_empty() { + let mut required_capabilities_signed_entity_types = vec![]; + for signed_entity_type in &self.signed_entity_types { + required_capabilities_signed_entity_types.push( + RequiredAggregatorCapabilities::SignedEntityType(*signed_entity_type), + ); + } + required_capabilities.push(RequiredAggregatorCapabilities::Or( + required_capabilities_signed_entity_types, + )); + } + + if !self.aggregate_signature_types.is_empty() { + let mut required_capabilities_aggregate_signature_types = vec![]; + for aggregate_signature_type in &self.aggregate_signature_types { + required_capabilities_aggregate_signature_types.push( + RequiredAggregatorCapabilities::AggregateSignatureType( + *aggregate_signature_type, + ), + ); + } + required_capabilities.push(RequiredAggregatorCapabilities::Or( + required_capabilities_aggregate_signature_types, + )); + } + if required_capabilities.len() == 1 { + return required_capabilities.into_iter().next().unwrap(); + } else { + return RequiredAggregatorCapabilities::And(required_capabilities); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mithril_client::common::SignedEntityTypeDiscriminants; + + #[test] + fn test_build_required_capabilities_all() { + let command = AggregatorSelectCommand { + network: MithrilNetwork::dummy(), + max_entries: 1, + signed_entity_types: vec![], + aggregate_signature_types: vec![], + }; + + let required_capabilities = command.build_required_capabilities(); + assert_eq!(required_capabilities, RequiredAggregatorCapabilities::All); + } + + #[test] + fn test_build_required_capabilities_signed_entity_types() { + let command = AggregatorSelectCommand { + network: MithrilNetwork::dummy(), + max_entries: 1, + signed_entity_types: vec![ + SignedEntityTypeDiscriminants::CardanoTransactions, + SignedEntityTypeDiscriminants::CardanoStakeDistribution, + ], + aggregate_signature_types: vec![], + }; + + let required_capabilities = command.build_required_capabilities(); + + assert_eq!( + required_capabilities, + RequiredAggregatorCapabilities::Or(vec![ + RequiredAggregatorCapabilities::SignedEntityType( + SignedEntityTypeDiscriminants::CardanoTransactions + ), + RequiredAggregatorCapabilities::SignedEntityType( + SignedEntityTypeDiscriminants::CardanoStakeDistribution + ), + ]) + ); + } + + #[test] + fn test_build_required_capabilities_aggregate_signature_types() { + let command = AggregatorSelectCommand { + network: MithrilNetwork::dummy(), + max_entries: 1, + signed_entity_types: vec![], + aggregate_signature_types: vec![AggregateSignatureType::Concatenation], + }; + let required_capabilities = command.build_required_capabilities(); + + assert_eq!( + required_capabilities, + RequiredAggregatorCapabilities::Or(vec![ + RequiredAggregatorCapabilities::AggregateSignatureType( + AggregateSignatureType::Concatenation + ), + ]) + ); + } + + #[test] + fn test_build_required_capabilities_both() { + let command = AggregatorSelectCommand { + network: MithrilNetwork::dummy(), + max_entries: 1, + signed_entity_types: vec![ + SignedEntityTypeDiscriminants::CardanoTransactions, + SignedEntityTypeDiscriminants::CardanoStakeDistribution, + ], + aggregate_signature_types: vec![AggregateSignatureType::Concatenation], + }; + let required_capabilities = command.build_required_capabilities(); + + assert_eq!( + required_capabilities, + RequiredAggregatorCapabilities::And(vec![ + RequiredAggregatorCapabilities::Or(vec![ + RequiredAggregatorCapabilities::SignedEntityType( + SignedEntityTypeDiscriminants::CardanoTransactions + ), + RequiredAggregatorCapabilities::SignedEntityType( + SignedEntityTypeDiscriminants::CardanoStakeDistribution + ), + ]), + RequiredAggregatorCapabilities::Or(vec![ + RequiredAggregatorCapabilities::AggregateSignatureType( + AggregateSignatureType::Concatenation + ), + ]), + ]) + ); + } +} diff --git a/mithril-client-cli/src/commands/tools/mod.rs b/mithril-client-cli/src/commands/tools/mod.rs index 65fefa1d54e..181b362d81b 100644 --- a/mithril-client-cli/src/commands/tools/mod.rs +++ b/mithril-client-cli/src/commands/tools/mod.rs @@ -3,8 +3,10 @@ //! Provides utility subcommands such as converting restored InMemory UTxO-HD ledger snapshot //! to different flavors (Legacy, LMDB). +mod aggregator_discovery; mod snapshot_converter; +pub use aggregator_discovery::*; pub use snapshot_converter::*; use anyhow::anyhow; @@ -18,6 +20,9 @@ pub enum ToolsCommands { /// UTxO-HD related commands #[clap(subcommand, name = "utxo-hd")] UTxOHD(UTxOHDCommands), + /// Aggregator discovery related commands + #[clap(subcommand, name = "aggregator-discovery")] + AggregatorDiscovery(AggregatorDiscoveryCommands), } impl ToolsCommands { @@ -25,6 +30,7 @@ impl ToolsCommands { pub async fn execute(&self) -> MithrilResult<()> { match self { Self::UTxOHD(cmd) => cmd.execute().await, + Self::AggregatorDiscovery(cmd) => cmd.execute().await, } } } @@ -52,3 +58,20 @@ impl UTxOHDCommands { } } } + +/// Aggregator discovery related commands +#[derive(Subcommand, Debug, Clone)] +pub enum AggregatorDiscoveryCommands { + /// Select an aggregator from the available ones with automatic discovery + #[clap(arg_required_else_help = false)] + Select(AggregatorSelectCommand), +} + +impl AggregatorDiscoveryCommands { + /// Execute Aggregator discovery command + pub async fn execute(&self) -> MithrilResult<()> { + match self { + Self::Select(cmd) => cmd.execute().await, + } + } +} diff --git a/mithril-client/Cargo.toml b/mithril-client/Cargo.toml index 29837bb7a66..2b0c287d35b 100644 --- a/mithril-client/Cargo.toml +++ b/mithril-client/Cargo.toml @@ -59,6 +59,7 @@ flate2 = { version = "1.1.4", optional = true } flume = { version = "0.11.1", optional = true } futures = "0.3.31" mithril-common = { path = "../mithril-common", version = ">=0.6", default-features = false } +mithril-aggregator-discovery = { path = "../internal/mithril-aggregator-discovery" } reqwest = { workspace = true, default-features = false, features = [ "charset", "http2", diff --git a/mithril-client/src/aggregator_discovery.rs b/mithril-client/src/aggregator_discovery.rs new file mode 100644 index 00000000000..e69de29bb2d diff --git a/mithril-client/src/client.rs b/mithril-client/src/client.rs index c65164b55ae..d885ed17d0a 100644 --- a/mithril-client/src/client.rs +++ b/mithril-client/src/client.rs @@ -1,12 +1,17 @@ use anyhow::{Context, anyhow}; #[cfg(feature = "fs")] use chrono::Utc; + use reqwest::Url; use serde::{Deserialize, Serialize}; use slog::{Logger, o}; use std::collections::HashMap; use std::sync::Arc; +use mithril_aggregator_discovery::{ + AggregatorDiscoverer, AggregatorEndpoint, CapableAggregatorDiscoverer, + HttpConfigAggregatorDiscoverer, MithrilNetwork, RequiredAggregatorCapabilities, +}; use mithril_common::api_version::APIVersionProvider; use mithril_common::{MITHRIL_CLIENT_TYPE_HEADER, MITHRIL_ORIGIN_TAG_HEADER}; @@ -40,6 +45,20 @@ const fn one_week_in_seconds() -> u32 { 604800 } +/// The type of discovery to use to find the aggregator to connect to. +pub enum AggregatorDiscoveryType { + /// Automatically discover the aggregator. + Automatic(MithrilNetwork), + /// Use a specific URL to connect to the aggregator. + Url(String), +} + +/// The genesis verification key. +pub enum GenesisVerificationKey { + /// The verification key is provided as a JSON Hex-encoded string. + JsonHex(String), +} + /// Options that can be used to configure the client. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ClientOptions { @@ -158,8 +177,10 @@ impl Client { /// Builder than can be used to create a [Client] easily or with custom dependencies. pub struct ClientBuilder { - aggregator_endpoint: Option, - genesis_verification_key: String, + aggregator_discovery: AggregatorDiscoveryType, + aggregator_capabilities: Option, + aggregator_discoverer: Option>, + genesis_verification_key: Option, origin_tag: Option, client_type: Option, #[cfg(feature = "fs")] @@ -180,35 +201,30 @@ impl ClientBuilder { /// Constructs a new `ClientBuilder` that fetches data from the aggregator at the given /// endpoint and with the given genesis verification key. pub fn aggregator(endpoint: &str, genesis_verification_key: &str) -> ClientBuilder { - Self { - aggregator_endpoint: Some(endpoint.to_string()), - genesis_verification_key: genesis_verification_key.to_string(), - origin_tag: None, - client_type: None, - #[cfg(feature = "fs")] - ancillary_verification_key: None, - aggregator_client: None, - certificate_verifier: None, - #[cfg(feature = "fs")] - http_file_downloader: None, - #[cfg(feature = "unstable")] - certificate_verifier_cache: None, - era_fetcher: None, - logger: None, - feedback_receivers: vec![], - options: ClientOptions::default(), - } + Self::new(AggregatorDiscoveryType::Url(endpoint.to_string())).with_genesis_verification_key( + GenesisVerificationKey::JsonHex(genesis_verification_key.to_string()), + ) + } + + /// Constructs a new `ClientBuilder` that automatically discovers the aggregator for the given + /// Mithril network and with the given genesis verification key. + pub fn automatic(network: &str, genesis_verification_key: &str) -> ClientBuilder { + Self::new(AggregatorDiscoveryType::Automatic(MithrilNetwork::new( + network.to_string(), + ))) + .with_genesis_verification_key(GenesisVerificationKey::JsonHex( + genesis_verification_key.to_string(), + )) + .with_default_aggregator_discoverer() } /// Constructs a new `ClientBuilder` without any dependency set. - /// - /// Use [ClientBuilder::aggregator] if you don't need to set a custom [AggregatorClient] - /// to request data from the aggregator. - #[deprecated(since = "0.12.33", note = "Will be removed in 0.13.0")] - pub fn new(genesis_verification_key: &str) -> ClientBuilder { + pub fn new(aggregator_discovery: AggregatorDiscoveryType) -> ClientBuilder { Self { - aggregator_endpoint: None, - genesis_verification_key: genesis_verification_key.to_string(), + aggregator_discovery, + aggregator_capabilities: None, + aggregator_discoverer: None, + genesis_verification_key: None, origin_tag: None, client_type: None, #[cfg(feature = "fs")] @@ -226,6 +242,43 @@ impl ClientBuilder { } } + /// Sets the genesis verification key to use when verifying certificates. + pub fn with_genesis_verification_key( + mut self, + genesis_verification_key: GenesisVerificationKey, + ) -> ClientBuilder { + self.genesis_verification_key = Some(genesis_verification_key); + + self + } + + /// Sets the aggregator capabilities expected to be matched by the aggregator with which the client will interact. + pub fn with_capabilities( + mut self, + capabilities: RequiredAggregatorCapabilities, + ) -> ClientBuilder { + self.aggregator_capabilities = Some(capabilities); + + self + } + + /// Sets the aggregator discoverer to use to find the aggregator endpoint when in automatic discovery. + pub fn with_aggregator_discoverer( + mut self, + discoverer: Arc, + ) -> ClientBuilder { + self.aggregator_discoverer = Some(discoverer); + + self + } + + /// Sets the default aggregator discoverer to use to find the aggregator endpoint when in automatic discovery. + pub fn with_default_aggregator_discoverer(mut self) -> ClientBuilder { + self.aggregator_discoverer = Some(Arc::new(HttpConfigAggregatorDiscoverer::default())); + + self + } + /// Returns a `Client` that uses the dependencies provided to this `ClientBuilder`. /// /// The builder will try to create the missing dependencies using default implementations @@ -236,6 +289,15 @@ impl ClientBuilder { .clone() .unwrap_or_else(|| Logger::root(slog::Discard, o!())); + let genesis_verification_key = match self.genesis_verification_key { + Some(GenesisVerificationKey::JsonHex(ref key)) => key, + None => { + return Err(anyhow!( + "The genesis verification key must be provided to build the client with the 'with_genesis_verification_key' function" + )); + } + }; + let feedback_sender = FeedbackSender::new(&self.feedback_receivers); let aggregator_client = match self.aggregator_client { @@ -254,7 +316,7 @@ impl ClientBuilder { None => Arc::new( MithrilCertificateVerifier::new( aggregator_client.clone(), - &self.genesis_verification_key, + genesis_verification_key, feedback_sender.clone(), #[cfg(feature = "unstable")] self.certificate_verifier_cache, @@ -342,16 +404,50 @@ impl ClientBuilder { }) } + /// Discover available aggregator endpoints for the given Mithril network and required capabilities. + pub fn discover_aggregator( + &self, + network: &MithrilNetwork, + ) -> MithrilResult> { + match self.aggregator_discoverer.clone() { + Some(discoverer) => { + let discoverer = if let Some(capabilities) = &self.aggregator_capabilities { + Arc::new(CapableAggregatorDiscoverer::new( + capabilities.to_owned(), + discoverer.clone(), + )) as Arc + } else { + discoverer as Arc + }; + tokio::task::block_in_place(move || { + tokio::runtime::Handle::current().block_on(async move { + discoverer + .get_available_aggregators(network.to_owned()) + .await + .with_context(|| "Discovering aggregator endpoint failed") + }) + }) + } + None => Err(anyhow!( + "The aggregator discoverer must be provided to build the client with automatic discovery using the 'with_aggregator_discoverer' function" + )), + } + } + fn build_aggregator_client( &self, logger: Logger, ) -> Result { - let endpoint = self - .aggregator_endpoint.as_ref() - .ok_or(anyhow!("No aggregator endpoint set: \ - You must either provide an aggregator endpoint or your own AggregatorClient implementation"))?; - let endpoint_url = Url::parse(endpoint).with_context(|| { - format!("Invalid aggregator endpoint, it must be a correctly formed url: '{endpoint}'") + let aggregator_endpoint = match self.aggregator_discovery { + AggregatorDiscoveryType::Url(ref url) => url.clone(), + AggregatorDiscoveryType::Automatic(ref network) => self + .discover_aggregator(network)? + .next() + .ok_or_else(|| anyhow!("No aggregator was available through discovery"))? + .into(), + }; + let endpoint_url = Url::parse(&aggregator_endpoint).with_context(|| { + format!("Invalid aggregator endpoint, it must be a correctly formed url: '{aggregator_endpoint}'") })?; let headers = self.compute_http_headers(); diff --git a/mithril-client/src/type_alias.rs b/mithril-client/src/type_alias.rs index 08f58ff75c0..6b0200c1b6f 100644 --- a/mithril-client/src/type_alias.rs +++ b/mithril-client/src/type_alias.rs @@ -66,13 +66,14 @@ pub use mithril_common::messages::CardanoStakeDistributionListItemMessage as Car /// `mithril-common` re-exports pub mod common { + pub use mithril_common::AggregateSignatureType; pub use mithril_common::crypto_helper::MKProof; pub use mithril_common::entities::{ AncillaryLocation, BlockHash, BlockNumber, CardanoDbBeacon, CardanoNetwork, ChainPoint, CompressionAlgorithm, DigestLocation, Epoch, EpochSpecifier, ImmutableFileNumber, ImmutablesLocation, MagicId, MultiFilesUri, ProtocolMessage, ProtocolMessagePartKey, - ProtocolParameters, SignedEntityType, SlotNumber, StakeDistribution, SupportedEra, - TemplateUri, TransactionHash, + ProtocolParameters, SignedEntityType, SignedEntityTypeDiscriminants, SlotNumber, + StakeDistribution, SupportedEra, TemplateUri, TransactionHash, }; pub use mithril_common::messages::{ AncillaryMessagePart, DigestsMessagePart, ImmutablesMessagePart, @@ -85,3 +86,9 @@ pub mod common { pub use mithril_common::test::double::Dummy; } } + +/// Required capabilities for an aggregator. +pub use mithril_aggregator_discovery::RequiredAggregatorCapabilities; + +/// Mithril network +pub use mithril_aggregator_discovery::MithrilNetwork; diff --git a/mithril-stm/src/aggregate_signature/signature.rs b/mithril-stm/src/aggregate_signature/signature.rs index 82d7fa01cda..d7721df0b0d 100644 --- a/mithril-stm/src/aggregate_signature/signature.rs +++ b/mithril-stm/src/aggregate_signature/signature.rs @@ -59,6 +59,17 @@ impl From<&AggregateSignature> } } +impl From for AggregateSignatureType { + fn from(s: String) -> Self { + match s.as_str() { + "Concatenation" => AggregateSignatureType::Concatenation, + #[cfg(feature = "future_proof_system")] + "Future" => AggregateSignatureType::Future, + _ => panic!("Unknown aggregate signature type: {}", s), + } + } +} + impl Display for AggregateSignatureType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self {