Skip to content

Commit 50841d9

Browse files
authored
fix(cubestore): Retying download from external locations (#6321)
1 parent 32c603d commit 50841d9

File tree

1 file changed

+72
-21
lines changed
  • rust/cubestore/cubestore/src/import

1 file changed

+72
-21
lines changed

rust/cubestore/cubestore/src/import/mod.rs

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use crate::util::decimal::Decimal;
4141
use crate::util::maybe_owned::MaybeOwnedStr;
4242
use crate::CubeError;
4343
use datafusion::cube_ext::ordfloat::OrdF64;
44+
use tokio::time::{sleep, Duration};
4445

4546
pub mod limits;
4647

@@ -465,31 +466,13 @@ impl ImportServiceImpl {
465466
temp_dir: &Path,
466467
) -> Result<(File, Option<TempPath>), CubeError> {
467468
if location.starts_with("http") {
468-
let (file, path) = tempfile::Builder::new()
469-
.prefix(&table_id.to_string())
470-
.tempfile_in(temp_dir)
471-
.map_err(|e| {
472-
CubeError::internal(format!(
473-
"Open tempfile in {}: {}",
474-
temp_dir.to_str().unwrap_or("<invalid>"),
475-
e
476-
))
477-
})?
478-
.into_parts();
479-
let mut file = File::from_std(file);
480-
let mut stream = reqwest::get(location).await?.bytes_stream();
481-
let mut size = 0;
482-
while let Some(bytes) = stream.next().await {
483-
let bytes = bytes?;
484-
let slice = bytes.as_ref();
485-
size += slice.len();
486-
file.write_all(slice).await?;
487-
}
469+
let (file, size, path) = self
470+
.download_http_location(location, table_id, temp_dir)
471+
.await?;
488472
log::info!("Import downloaded {} ({} bytes)", location, size);
489473
self.meta_store
490474
.update_location_download_size(table_id, location.to_string(), size as u64)
491475
.await?;
492-
file.seek(SeekFrom::Start(0)).await?;
493476
Ok((file, Some(path)))
494477
} else if location.starts_with("temp://") {
495478
let temp_file = self.download_temp_file(location).await?;
@@ -509,6 +492,74 @@ impl ImportServiceImpl {
509492
}
510493
}
511494

495+
async fn download_http_location(
496+
&self,
497+
location: &str,
498+
table_id: u64,
499+
temp_dir: &Path,
500+
) -> Result<(File, usize, TempPath), CubeError> {
501+
let max_retries: i32 = 10;
502+
let mut retry_attempts = max_retries;
503+
let mut retries_sleep = Duration::from_millis(100);
504+
let sleep_multiplier = 2;
505+
loop {
506+
retry_attempts -= 1;
507+
let result = self
508+
.try_download_http_location(location, table_id, temp_dir)
509+
.await;
510+
511+
if retry_attempts <= 0 {
512+
return result;
513+
}
514+
match result {
515+
Ok(size) => {
516+
return Ok(size);
517+
}
518+
Err(err) => {
519+
log::error!(
520+
"Import {} download error: {}. Retrying {}/{}...",
521+
location,
522+
err,
523+
retry_attempts,
524+
max_retries
525+
);
526+
sleep(retries_sleep).await;
527+
retries_sleep *= sleep_multiplier;
528+
}
529+
}
530+
}
531+
}
532+
533+
async fn try_download_http_location(
534+
&self,
535+
location: &str,
536+
table_id: u64,
537+
temp_dir: &Path,
538+
) -> Result<(File, usize, TempPath), CubeError> {
539+
let (file, path) = tempfile::Builder::new()
540+
.prefix(&table_id.to_string())
541+
.tempfile_in(temp_dir)
542+
.map_err(|e| {
543+
CubeError::internal(format!(
544+
"Open tempfile in {}: {}",
545+
temp_dir.to_str().unwrap_or("<invalid>"),
546+
e
547+
))
548+
})?
549+
.into_parts();
550+
let mut file = File::from_std(file);
551+
let mut stream = reqwest::get(location).await?.bytes_stream();
552+
let mut size = 0;
553+
while let Some(bytes) = stream.next().await {
554+
let bytes = bytes?;
555+
let slice = bytes.as_ref();
556+
size += slice.len();
557+
file.write_all(slice).await?;
558+
}
559+
file.seek(SeekFrom::Start(0)).await?;
560+
Ok((file, size, path))
561+
}
562+
512563
async fn download_temp_file(&self, location: &str) -> Result<File, CubeError> {
513564
let to_download = ImportServiceImpl::temp_uploads_path(location);
514565
// TODO check file size

0 commit comments

Comments
 (0)