Skip to content

Commit 55f3544

Browse files
authored
remote-ext: fix state download stall on slow connections and reduce memory usage (#1295)
Original PR paritytech/substrate#14746 --- ## Fixing stall ### Introduction I experienced an apparent stall downloading state from `https://rococo-try-runtime-node.parity-chains.parity.io:443` which was having networking difficulties only responding to my JSONRPC requests with 50-200KB/s of bandwidth. This PR fixes the issue causing the stall, and generally improves performance remote-ext when it downloads state by greatly reducing the chances of a timeout occuring. ### Description Introduces a new `REQUEST_DURATION_TARGET` constant and modifies `get_storage_data_dynamic_batch_size` to - Increase or decrease the batch size of the next request depending on whether the elapsed time of the last request was gt or lt the target - Reset the batch size to 1 if the request times out This fixes an issue on slow connections that can otherwise cause multiple timeouts and a stalled download when: 1. The batch size increases rapidly as remote-ext downloads keys with small associated storage values 2. remote-ext tries to process a large series of subsequent keys all with extremely large associated storage values (Rococo has a series of keys 1-5MB large) 3. The huge storage values download for 5 minutes until the request times out 4. The partially downloaded keys are thrown out and remote-ext tries again with a smaller batch size, but the batch size is still far too large and takes 5 minutes to be reduced again 5. The download will be essentially stalled for many hours while the above step cycles After this PR, the request size will - Not grow as large to begin with, as it is regulated downwards as the request duration exceeds the target - Drop immediately to 1 if the request times out. A timeout indicates the keys next in line to download have extremely large storage values compared to previously downloaded keys, and we need to reset the batch size to figure out what our new ideal batch size is. By not resetting down to 1, we risk the next request timing out again. ## Reducing memory As suggested by @bkchr, I adjusted `get_storage_data_dynamic_batch_size` from being recursive to a loop which allows removing a bunch of clones that were chewing through a lot of memory. I noticed actually it was using up to 50GB swap previously when downloading Polkadot keys on a slow connection, because it needed to recurse and clone a lot. After this change it uses only ~1.5GB memory.
1 parent 373b8ac commit 55f3544

File tree

3 files changed

+102
-101
lines changed

3 files changed

+102
-101
lines changed

Cargo.lock

Lines changed: 0 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

substrate/utils/frame/remote-externalities/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ sp-runtime = { path = "../../../primitives/runtime" }
2323
tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread"] }
2424
substrate-rpc-client = { path = "../rpc/client" }
2525
futures = "0.3"
26-
async-recursion = "1.0.4"
2726
indicatif = "0.17.3"
2827
spinners = "4.1.0"
2928
tokio-retry = "0.3.0"

substrate/utils/frame/remote-externalities/src/lib.rs

Lines changed: 102 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
//! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate
2121
//! based chain, or a local state snapshot file.
2222
23-
use async_recursion::async_recursion;
2423
use codec::{Compact, Decode, Encode};
2524
use indicatif::{ProgressBar, ProgressStyle};
2625
use jsonrpsee::{
@@ -44,7 +43,7 @@ use sp_runtime::{
4443
use sp_state_machine::TestExternalities;
4544
use spinners::{Spinner, Spinners};
4645
use std::{
47-
cmp::max,
46+
cmp::{max, min},
4847
fs,
4948
ops::{Deref, DerefMut},
5049
path::{Path, PathBuf},
@@ -353,10 +352,11 @@ where
353352
const PARALLEL_REQUESTS: usize = 4;
354353
const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10;
355354
const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50;
356-
const INITIAL_BATCH_SIZE: usize = 5000;
355+
const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(15);
356+
const INITIAL_BATCH_SIZE: usize = 10;
357357
// nodes by default will not return more than 1000 keys per request
358358
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
359-
const KEYS_PAGE_MAX_RETRIES: usize = 12;
359+
const MAX_RETRIES: usize = 12;
360360
const KEYS_PAGE_RETRY_INTERVAL: Duration = Duration::from_secs(5);
361361

362362
async fn rpc_get_storage(
@@ -411,8 +411,8 @@ where
411411
let keys = loop {
412412
// This loop can hit the node with very rapid requests, occasionally causing it to
413413
// error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry.
414-
let retry_strategy = FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL)
415-
.take(Self::KEYS_PAGE_MAX_RETRIES);
414+
let retry_strategy =
415+
FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
416416
let get_page_closure =
417417
|| self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), at);
418418
let page = Retry::spawn(retry_strategy, get_page_closure).await?;
@@ -448,8 +448,6 @@ where
448448
///
449449
/// * `client` - An `Arc` wrapped `HttpClient` used for making the requests.
450450
/// * `payloads` - A vector of tuples containing a JSONRPC method name and `ArrayParams`
451-
/// * `batch_size` - The initial batch size to use for the request. The batch size will be
452-
/// adjusted dynamically in case of failure.
453451
///
454452
/// # Returns
455453
///
@@ -485,80 +483,107 @@ where
485483
/// }
486484
/// }
487485
/// ```
488-
#[async_recursion]
489486
async fn get_storage_data_dynamic_batch_size(
490487
client: &HttpClient,
491488
payloads: Vec<(String, ArrayParams)>,
492-
batch_size: usize,
493489
bar: &ProgressBar,
494490
) -> Result<Vec<Option<StorageData>>, String> {
495-
// All payloads have been processed
496-
if payloads.is_empty() {
497-
return Ok(vec![])
498-
};
499-
500-
log::debug!(
501-
target: LOG_TARGET,
502-
"Remaining payloads: {} Batch request size: {}",
503-
payloads.len(),
504-
batch_size,
505-
);
491+
let mut all_data: Vec<Option<StorageData>> = vec![];
492+
let mut start_index = 0;
493+
let mut retries = 0usize;
494+
let mut batch_size = Self::INITIAL_BATCH_SIZE;
495+
let total_payloads = payloads.len();
496+
497+
while start_index < total_payloads {
498+
log::debug!(
499+
target: LOG_TARGET,
500+
"Remaining payloads: {} Batch request size: {}",
501+
total_payloads - start_index,
502+
batch_size,
503+
);
506504

507-
// Payloads to attempt to process this batch
508-
let page = payloads.iter().take(batch_size).cloned().collect::<Vec<_>>();
505+
let end_index = usize::min(start_index + batch_size, total_payloads);
506+
let page = &payloads[start_index..end_index];
509507

510-
// Build the batch request
511-
let mut batch = BatchRequestBuilder::new();
512-
for (method, params) in page.iter() {
513-
batch
514-
.insert(method, params.clone())
515-
.map_err(|_| "Invalid batch method and/or params")?
516-
}
517-
let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
518-
Ok(batch_response) => batch_response,
519-
Err(e) => {
520-
if batch_size < 2 {
521-
return Err(e.to_string())
522-
}
508+
// Build the batch request
509+
let mut batch = BatchRequestBuilder::new();
510+
for (method, params) in page.iter() {
511+
batch
512+
.insert(method, params.clone())
513+
.map_err(|_| "Invalid batch method and/or params")?;
514+
}
523515

524-
log::debug!(
525-
target: LOG_TARGET,
526-
"Batch request failed, trying again with smaller batch size. {}",
527-
e.to_string()
528-
);
516+
let request_started = Instant::now();
517+
let batch_response = match client.batch_request::<Option<StorageData>>(batch).await {
518+
Ok(batch_response) => {
519+
retries = 0;
520+
batch_response
521+
},
522+
Err(e) => {
523+
if retries > Self::MAX_RETRIES {
524+
return Err(e.to_string())
525+
}
526+
527+
retries += 1;
528+
let failure_log = format!(
529+
"Batch request failed ({}/{} retries). Error: {}",
530+
retries,
531+
Self::MAX_RETRIES,
532+
e.to_string()
533+
);
534+
// after 2 subsequent failures something very wrong is happening. log a warning
535+
// and reset the batch size down to 1.
536+
if retries >= 2 {
537+
log::warn!("{}", failure_log);
538+
batch_size = 1;
539+
} else {
540+
log::debug!("{}", failure_log);
541+
// Decrease batch size by DECREASE_FACTOR
542+
batch_size =
543+
(batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize;
544+
}
545+
continue
546+
},
547+
};
529548

530-
return Self::get_storage_data_dynamic_batch_size(
531-
client,
532-
payloads,
533-
max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize),
534-
bar,
549+
let request_duration = request_started.elapsed();
550+
batch_size = if request_duration > Self::REQUEST_DURATION_TARGET {
551+
// Decrease batch size
552+
max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize)
553+
} else {
554+
// Increase batch size, but not more than the remaining total payloads to process
555+
min(
556+
total_payloads - start_index,
557+
max(
558+
batch_size + 1,
559+
(batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize,
560+
),
535561
)
536-
.await
537-
},
538-
};
562+
};
563+
564+
log::debug!(
565+
target: LOG_TARGET,
566+
"Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}",
567+
request_duration,
568+
Self::REQUEST_DURATION_TARGET,
569+
end_index - start_index,
570+
batch_size
571+
);
539572

540-
// Collect the data from this batch
541-
let mut data: Vec<Option<StorageData>> = vec![];
542-
let batch_response_len = batch_response.len();
543-
for item in batch_response.into_iter() {
544-
match item {
545-
Ok(x) => data.push(x),
546-
Err(e) => return Err(e.message().to_string()),
573+
let batch_response_len = batch_response.len();
574+
for item in batch_response.into_iter() {
575+
match item {
576+
Ok(x) => all_data.push(x),
577+
Err(e) => return Err(e.message().to_string()),
578+
}
547579
}
580+
bar.inc(batch_response_len as u64);
581+
582+
// Update the start index for the next iteration
583+
start_index = end_index;
548584
}
549-
bar.inc(batch_response_len as u64);
550585

551-
// Return this data joined with the remaining keys
552-
let remaining_payloads = payloads.iter().skip(batch_size).cloned().collect::<Vec<_>>();
553-
let mut rest = Self::get_storage_data_dynamic_batch_size(
554-
client,
555-
remaining_payloads,
556-
max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize),
557-
bar,
558-
)
559-
.await?;
560-
data.append(&mut rest);
561-
Ok(data)
586+
Ok(all_data)
562587
}
563588

564589
/// Synonym of `getPairs` that uses paged queries to first get the keys, and then
@@ -605,12 +630,7 @@ where
605630
);
606631
let payloads_chunked = payloads.chunks((&payloads.len() / Self::PARALLEL_REQUESTS).max(1));
607632
let requests = payloads_chunked.map(|payload_chunk| {
608-
Self::get_storage_data_dynamic_batch_size(
609-
&client,
610-
payload_chunk.to_vec(),
611-
Self::INITIAL_BATCH_SIZE,
612-
&bar,
613-
)
633+
Self::get_storage_data_dynamic_batch_size(&client, payload_chunk.to_vec(), &bar)
614634
});
615635
// Execute the requests and move the Result outside.
616636
let storage_data_result: Result<Vec<_>, _> =
@@ -683,20 +703,14 @@ where
683703
.collect::<Vec<_>>();
684704

685705
let bar = ProgressBar::new(payloads.len() as u64);
686-
let storage_data = match Self::get_storage_data_dynamic_batch_size(
687-
client,
688-
payloads,
689-
Self::INITIAL_BATCH_SIZE,
690-
&bar,
691-
)
692-
.await
693-
{
694-
Ok(storage_data) => storage_data,
695-
Err(e) => {
696-
log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e);
697-
return Err("batch processing failed")
698-
},
699-
};
706+
let storage_data =
707+
match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await {
708+
Ok(storage_data) => storage_data,
709+
Err(e) => {
710+
log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e);
711+
return Err("batch processing failed")
712+
},
713+
};
700714

701715
assert_eq!(child_keys_len, storage_data.len());
702716

0 commit comments

Comments
 (0)