Skip to content

Commit 3b807e4

Browse files
committed
feat(aggregator-discovery): implement aggregator discovery in client builder
1 parent f043f87 commit 3b807e4

File tree

1 file changed

+45
-22
lines changed

1 file changed

+45
-22
lines changed

mithril-client/src/client.rs

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
use anyhow::{Context, anyhow};
22
#[cfg(feature = "fs")]
33
use chrono::Utc;
4-
use mithril_aggregator_discovery::{
5-
AggregatorDiscoverer, HttpConfigAggregatorDiscoverer, MithrilNetwork,
6-
};
7-
use mithril_common::messages::AggregatorCapabilities;
4+
85
use reqwest::Url;
96
use serde::{Deserialize, Serialize};
107
use slog::{Logger, o};
118
use std::collections::HashMap;
129
use std::sync::Arc;
1310

11+
use mithril_aggregator_discovery::{
12+
AggregatorDiscoverer, CapableAggregatorDiscoverer, HttpConfigAggregatorDiscoverer,
13+
MithrilNetwork,
14+
};
1415
use mithril_common::api_version::APIVersionProvider;
16+
use mithril_common::messages::AggregatorCapabilities;
1517
use mithril_common::{MITHRIL_CLIENT_TYPE_HEADER, MITHRIL_ORIGIN_TAG_HEADER};
1618

1719
use crate::MithrilResult;
@@ -200,11 +202,9 @@ impl ClientBuilder {
200202
/// Constructs a new `ClientBuilder` that fetches data from the aggregator at the given
201203
/// endpoint and with the given genesis verification key.
202204
pub fn aggregator(endpoint: &str, genesis_verification_key: &str) -> ClientBuilder {
203-
Self::new(AggregatorDiscoveryType::Url(endpoint.to_string()))
204-
.with_genesis_verification_key(GenesisVerificationKey::JsonHex(
205-
genesis_verification_key.to_string(),
206-
))
207-
.with_aggregator_discoverer(Arc::new(HttpConfigAggregatorDiscoverer::default()))
205+
Self::new(AggregatorDiscoveryType::Url(endpoint.to_string())).with_genesis_verification_key(
206+
GenesisVerificationKey::JsonHex(genesis_verification_key.to_string()),
207+
)
208208
}
209209

210210
/// Constructs a new `ClientBuilder` without any dependency set.
@@ -258,6 +258,13 @@ impl ClientBuilder {
258258
self
259259
}
260260

261+
/// Sets the default aggregator discoverer to use to find the aggregator endpoint when in automatic discovery.
262+
pub fn with_default_aggregator_discoverer(mut self) -> ClientBuilder {
263+
self.aggregator_discoverer = Some(Arc::new(HttpConfigAggregatorDiscoverer::default()));
264+
265+
self
266+
}
267+
261268
/// Returns a `Client` that uses the dependencies provided to this `ClientBuilder`.
262269
///
263270
/// The builder will try to create the missing dependencies using default implementations
@@ -281,7 +288,7 @@ impl ClientBuilder {
281288
let feedback_sender = FeedbackSender::new(&self.feedback_receivers);
282289

283290
let aggregator_client = match self.aggregator_client {
284-
None => Arc::new(self.build_aggregator_client(logger.clone()).await?),
291+
None => Arc::new(self.build_aggregator_client(logger.clone())?),
285292
Some(client) => client,
286293
};
287294

@@ -384,24 +391,40 @@ impl ClientBuilder {
384391
})
385392
}
386393

387-
async fn build_aggregator_client(
394+
fn build_aggregator_client(
388395
&self,
389396
logger: Logger,
390397
) -> Result<AggregatorHTTPClient, anyhow::Error> {
391398
let aggregator_endpoint = match self.aggregator_discovery {
392399
AggregatorDiscoveryType::Url(ref url) => url.clone(),
393-
AggregatorDiscoveryType::Automatic(ref network) => match &self.aggregator_discoverer {
394-
Some(discoverer) => discoverer
395-
.get_available_aggregators(network.to_owned())
396-
.await
397-
.with_context(|| "Discovering aggregator endpoint failed")?
398-
.next()
399-
.unwrap()
400-
.into(),
401-
None => {
402-
return Err(anyhow!("The aggregator discoverer must be provided to build the client with automatic discovery using the 'with_aggregator_discoverer' function").into());
400+
AggregatorDiscoveryType::Automatic(ref network) => {
401+
match self.aggregator_discoverer.clone() {
402+
Some(discoverer) => {
403+
let discoverer = if let Some(capabilities) = &self.aggregator_capabilities {
404+
Arc::new(CapableAggregatorDiscoverer::new(
405+
capabilities.to_owned(),
406+
discoverer.clone(),
407+
)) as Arc<dyn AggregatorDiscoverer>
408+
} else {
409+
discoverer as Arc<dyn AggregatorDiscoverer>
410+
};
411+
tokio::task::block_in_place(move || {
412+
tokio::runtime::Handle::current().block_on(async move {
413+
discoverer
414+
.get_available_aggregators(network.to_owned())
415+
.await
416+
.with_context(|| "Discovering aggregator endpoint failed")?
417+
.next()
418+
.ok_or(anyhow!("No aggregator was available through discovery"))
419+
})
420+
})?
421+
.into()
422+
}
423+
None => {
424+
return Err(anyhow!("The aggregator discoverer must be provided to build the client with automatic discovery using the 'with_aggregator_discoverer' function").into());
425+
}
403426
}
404-
},
427+
}
405428
};
406429
let endpoint_url = Url::parse(&aggregator_endpoint).with_context(|| {
407430
format!("Invalid aggregator endpoint, it must be a correctly formed url: '{aggregator_endpoint}'")

0 commit comments

Comments
 (0)