Skip to content

Commit f949b9a

Browse files
committed
Fetch appdata in background (cowprotocol#3710)
@marcovc reported that enabling appdata fetching in the driver leads to very slow driver restarts. Since we used `join_all` the driver will only continuing the auction pre-processing if ALL futures finished. Because drivers running in the same k8s cluster as the orderbook have significantly lower latency we never saw that issue. In order to not block the pre-processing for an unreasonable amount of time I adjusted the logic to only await new appdatas for 500ms. That should give it enough time to fetch completely new appdatas we've never seen before (basically every new auction introduces at most a couple of new orders + appdatas). If that were the only change we would likely need MANY auctions to completely fill the cache with only 500ms per auction. To address that issue I adjusted the appdata fetcher to spawn 1 tokio task per appdata that needs to be fetched. That way the caller can await however many requests they want and all the ones they didn't wait for will still make progress while their solver already computes a solution for the current auction. In the next auction all (or at least many) of the missing appdata values should already be available in the cache. e2e tests should already cover that appdatas become available eventually.
1 parent 5d62fe6 commit f949b9a

File tree

4 files changed

+144
-180
lines changed

4 files changed

+144
-180
lines changed

crates/driver/src/domain/competition/order/app_data.rs

Lines changed: 34 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ use {
33
anyhow::Context,
44
app_data::AppDataDocument,
55
derive_more::From,
6-
futures::FutureExt,
76
moka::future::Cache,
87
reqwest::StatusCode,
9-
shared::request_sharing::BoxRequestSharing,
108
std::{collections::HashMap, sync::Arc},
119
thiserror::Error,
1210
url::Url,
@@ -30,10 +28,6 @@ pub struct AppDataRetriever(Arc<Inner>);
3028
struct Inner {
3129
client: reqwest::Client,
3230
base_url: Url,
33-
request_sharing: BoxRequestSharing<
34-
AppDataHash,
35-
Result<Option<Arc<app_data::ValidatedAppData>>, FetchingError>,
36-
>,
3731
cache: Cache<AppDataHash, Option<Arc<app_data::ValidatedAppData>>>,
3832
}
3933

@@ -42,7 +36,6 @@ impl AppDataRetriever {
4236
Self(Arc::new(Inner {
4337
client: reqwest::Client::new(),
4438
base_url: orderbook_url,
45-
request_sharing: BoxRequestSharing::labelled("app_data".to_string()),
4639
cache: Cache::new(cache_size),
4740
}))
4841
}
@@ -57,6 +50,9 @@ impl AppDataRetriever {
5750
}
5851

5952
/// Retrieves the full app-data for the given `app_data` hash, if it exists.
53+
/// HTTP requests needed to fetch the data are spawned in background tasks
54+
/// such that they eventually populate the cache even in case the caller
55+
/// stops awaiting the returned future.
6056
pub async fn get_cached_or_fetch(
6157
&self,
6258
app_data: &AppDataHash,
@@ -65,47 +61,38 @@ impl AppDataRetriever {
6561
return Ok(app_data.clone());
6662
}
6763

68-
let app_data_fut = move |app_data: &AppDataHash| {
69-
let app_data = *app_data;
70-
let self_ = self.clone();
71-
72-
async move {
73-
let url = self_
74-
.0
75-
.base_url
76-
.join(&format!("api/v1/app_data/{:?}", app_data.0))?;
77-
let response = self_.0.client.get(url).send().await?;
78-
let validated_app_data = match response.status() {
79-
StatusCode::NOT_FOUND => None,
80-
_ => {
81-
let appdata: AppDataDocument =
82-
serde_json::from_str(&response.text().await?)
83-
.context("invalid app data document")?;
84-
match appdata.full_app_data == app_data::EMPTY {
85-
true => None, // empty app data
86-
false => Some(Arc::new(app_data::ValidatedAppData {
87-
hash: app_data::AppDataHash(app_data.0.0),
88-
protocol: app_data::parse(appdata.full_app_data.as_bytes())?,
89-
document: appdata.full_app_data,
90-
})),
91-
}
64+
let inner = self.0.clone();
65+
let app_data = *app_data;
66+
67+
let fut = async move {
68+
let url = inner
69+
.base_url
70+
.join(&format!("api/v1/app_data/{:?}", app_data.0))?;
71+
let response = inner.client.get(url).send().await?;
72+
let validated_app_data = match response.status() {
73+
StatusCode::NOT_FOUND => None,
74+
_ => {
75+
let appdata: AppDataDocument = serde_json::from_str(&response.text().await?)
76+
.context("invalid app data document")?;
77+
match appdata.full_app_data == app_data::EMPTY {
78+
true => None, // empty app data
79+
false => Some(Arc::new(app_data::ValidatedAppData {
80+
hash: app_data::AppDataHash(app_data.0.0),
81+
protocol: app_data::parse(appdata.full_app_data.as_bytes())?,
82+
document: appdata.full_app_data,
83+
})),
9284
}
93-
};
94-
self_
95-
.0
96-
.cache
97-
.insert(app_data, validated_app_data.clone())
98-
.await;
99-
100-
Ok(validated_app_data)
101-
}
102-
.boxed()
85+
}
86+
};
87+
inner
88+
.cache
89+
.insert(app_data, validated_app_data.clone())
90+
.await;
91+
92+
Ok(validated_app_data)
10393
};
10494

105-
self.0
106-
.request_sharing
107-
.shared_or_else(*app_data, app_data_fut)
108-
.await
95+
tokio::task::spawn(fut).await?
10996
}
11097
}
11198

@@ -182,6 +169,8 @@ pub enum FetchingError {
182169
InvalidAppData(#[from] anyhow::Error),
183170
#[error("internal error: {0}")]
184171
Internal(String),
172+
#[error("failed to join task: {0}")]
173+
TaskJoinFailed(#[from] tokio::task::JoinError),
185174
}
186175

187176
impl From<reqwest::Error> for FetchingError {
@@ -195,13 +184,3 @@ impl From<url::ParseError> for FetchingError {
195184
FetchingError::Internal(err.to_string())
196185
}
197186
}
198-
199-
impl Clone for FetchingError {
200-
fn clone(&self) -> Self {
201-
match self {
202-
Self::Http(message) => Self::Http(message.clone()),
203-
Self::InvalidAppData(err) => Self::InvalidAppData(shared::clone_anyhow_error(err)),
204-
Self::Internal(message) => Self::Internal(message.clone()),
205-
}
206-
}
207-
}

crates/driver/src/domain/competition/pre_processing.rs

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@ use {
1111
},
1212
anyhow::{Context, Result},
1313
chrono::Utc,
14-
futures::{
15-
FutureExt,
16-
future::{BoxFuture, join_all},
17-
},
14+
futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered},
1815
itertools::Itertools,
1916
model::{
2017
interaction::InteractionData,
@@ -26,8 +23,7 @@ use {
2623
price_estimation::trade_verifier::balance_overrides::BalanceOverrideRequest,
2724
signature_validator::SignatureValidating,
2825
},
29-
std::{collections::HashMap, future::Future, sync::Arc},
30-
tap::TapFallible,
26+
std::{collections::HashMap, future::Future, sync::Arc, time::Duration},
3127
tokio::sync::Mutex,
3228
tracing::Instrument,
3329
};
@@ -339,36 +335,43 @@ impl Utilities {
339335
let _timer2 =
340336
observe::metrics::metrics().on_auction_overhead_start("driver", "fetch_app_data");
341337

342-
let app_data = join_all(
343-
auction
344-
.orders
345-
.iter()
346-
.flat_map(|order| match order.app_data {
347-
AppData::Full(_) => None,
348-
// only fetch appdata we don't already have in full
349-
AppData::Hash(hash) => Some(hash),
350-
})
351-
.unique()
352-
.map(|app_data_hash| {
353-
let app_data_retriever = app_data_retriever.clone();
354-
async move {
355-
let fetched_app_data = app_data_retriever
356-
.get_cached_or_fetch(&app_data_hash)
357-
.await
358-
.tap_err(|err| {
359-
tracing::warn!(?app_data_hash, ?err, "failed to fetch app data");
360-
})
361-
.ok()
362-
.flatten();
338+
let futures: FuturesUnordered<_> = auction
339+
.orders
340+
.iter()
341+
.flat_map(|order| match order.app_data {
342+
AppData::Full(_) => None,
343+
// only fetch appdata we don't already have in full
344+
AppData::Hash(hash) => Some(hash),
345+
})
346+
.unique()
347+
.map(async move |app_data_hash| {
348+
let fetched_app_data = app_data_retriever
349+
.get_cached_or_fetch(&app_data_hash)
350+
.await
351+
.inspect_err(|err| {
352+
tracing::warn!(?app_data_hash, ?err, "failed to fetch app data");
353+
})
354+
.ok()
355+
.flatten();
356+
357+
(app_data_hash, fetched_app_data)
358+
})
359+
.collect();
363360

364-
(app_data_hash, fetched_app_data)
365-
}
366-
}),
367-
)
368-
.await
369-
.into_iter()
370-
.filter_map(|(app_data_hash, app_data)| app_data.map(|app_data| (app_data_hash, app_data)))
371-
.collect::<HashMap<_, _>>();
361+
// Only await responses for a short amount of time. Even if we don't await
362+
// all futures fully the remaining appdata requests will finish in background
363+
// tasks. That way we should have enough time to immediately fetch appdatas
364+
// of new orders (once the cache is filled). But we also don't run the risk
365+
// of stalling the driver completely until everything is fetched.
366+
// In practice that means the solver will only see a few appdatas in the first
367+
// auction after a restart. But on subsequent auctions everything should be
368+
// available.
369+
const MAX_APP_DATA_WAIT: Duration = Duration::from_millis(500);
370+
let app_data: HashMap<_, _> = futures
371+
.take_until(tokio::time::sleep(MAX_APP_DATA_WAIT))
372+
.filter_map(async move |(hash, json)| Some((hash, json?)))
373+
.collect()
374+
.await;
372375

373376
Arc::new(app_data)
374377
}

crates/liquidity-driver/src/domain/competition/order/app_data.rs

Lines changed: 34 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ use {
33
anyhow::Context,
44
app_data::AppDataDocument,
55
derive_more::From,
6-
futures::FutureExt,
76
moka::future::Cache,
87
reqwest::StatusCode,
9-
shared::request_sharing::BoxRequestSharing,
108
std::{collections::HashMap, sync::Arc},
119
thiserror::Error,
1210
url::Url,
@@ -30,10 +28,6 @@ pub struct AppDataRetriever(Arc<Inner>);
3028
struct Inner {
3129
client: reqwest::Client,
3230
base_url: Url,
33-
request_sharing: BoxRequestSharing<
34-
AppDataHash,
35-
Result<Option<Arc<app_data::ValidatedAppData>>, FetchingError>,
36-
>,
3731
cache: Cache<AppDataHash, Option<Arc<app_data::ValidatedAppData>>>,
3832
}
3933

@@ -42,7 +36,6 @@ impl AppDataRetriever {
4236
Self(Arc::new(Inner {
4337
client: reqwest::Client::new(),
4438
base_url: orderbook_url,
45-
request_sharing: BoxRequestSharing::labelled("app_data".to_string()),
4639
cache: Cache::new(cache_size),
4740
}))
4841
}
@@ -57,6 +50,9 @@ impl AppDataRetriever {
5750
}
5851

5952
/// Retrieves the full app-data for the given `app_data` hash, if it exists.
53+
/// HTTP requests needed to fetch the data are spawned in background tasks
54+
/// such that they eventually populate the cache even in case the caller
55+
/// stops awaiting the returned future.
6056
pub async fn get_cached_or_fetch(
6157
&self,
6258
app_data: &AppDataHash,
@@ -65,47 +61,38 @@ impl AppDataRetriever {
6561
return Ok(app_data.clone());
6662
}
6763

68-
let app_data_fut = move |app_data: &AppDataHash| {
69-
let app_data = *app_data;
70-
let self_ = self.clone();
71-
72-
async move {
73-
let url = self_
74-
.0
75-
.base_url
76-
.join(&format!("api/v1/app_data/{:?}", app_data.0))?;
77-
let response = self_.0.client.get(url).send().await?;
78-
let validated_app_data = match response.status() {
79-
StatusCode::NOT_FOUND => None,
80-
_ => {
81-
let appdata: AppDataDocument =
82-
serde_json::from_str(&response.text().await?)
83-
.context("invalid app data document")?;
84-
match appdata.full_app_data == app_data::EMPTY {
85-
true => None, // empty app data
86-
false => Some(Arc::new(app_data::ValidatedAppData {
87-
hash: app_data::AppDataHash(app_data.0.0),
88-
protocol: app_data::parse(appdata.full_app_data.as_bytes())?,
89-
document: appdata.full_app_data,
90-
})),
91-
}
64+
let inner = self.0.clone();
65+
let app_data = *app_data;
66+
67+
let fut = async move {
68+
let url = inner
69+
.base_url
70+
.join(&format!("api/v1/app_data/{:?}", app_data.0))?;
71+
let response = inner.client.get(url).send().await?;
72+
let validated_app_data = match response.status() {
73+
StatusCode::NOT_FOUND => None,
74+
_ => {
75+
let appdata: AppDataDocument = serde_json::from_str(&response.text().await?)
76+
.context("invalid app data document")?;
77+
match appdata.full_app_data == app_data::EMPTY {
78+
true => None, // empty app data
79+
false => Some(Arc::new(app_data::ValidatedAppData {
80+
hash: app_data::AppDataHash(app_data.0.0),
81+
protocol: app_data::parse(appdata.full_app_data.as_bytes())?,
82+
document: appdata.full_app_data,
83+
})),
9284
}
93-
};
94-
self_
95-
.0
96-
.cache
97-
.insert(app_data, validated_app_data.clone())
98-
.await;
99-
100-
Ok(validated_app_data)
101-
}
102-
.boxed()
85+
}
86+
};
87+
inner
88+
.cache
89+
.insert(app_data, validated_app_data.clone())
90+
.await;
91+
92+
Ok(validated_app_data)
10393
};
10494

105-
self.0
106-
.request_sharing
107-
.shared_or_else(*app_data, app_data_fut)
108-
.await
95+
tokio::task::spawn(fut).await?
10996
}
11097
}
11198

@@ -182,6 +169,8 @@ pub enum FetchingError {
182169
InvalidAppData(#[from] anyhow::Error),
183170
#[error("internal error: {0}")]
184171
Internal(String),
172+
#[error("failed to join task: {0}")]
173+
TaskJoinFailed(#[from] tokio::task::JoinError),
185174
}
186175

187176
impl From<reqwest::Error> for FetchingError {
@@ -195,13 +184,3 @@ impl From<url::ParseError> for FetchingError {
195184
FetchingError::Internal(err.to_string())
196185
}
197186
}
198-
199-
impl Clone for FetchingError {
200-
fn clone(&self) -> Self {
201-
match self {
202-
Self::Http(message) => Self::Http(message.clone()),
203-
Self::InvalidAppData(err) => Self::InvalidAppData(shared::clone_anyhow_error(err)),
204-
Self::Internal(message) => Self::Internal(message.clone()),
205-
}
206-
}
207-
}

0 commit comments

Comments
 (0)