diff --git a/Cargo.lock b/Cargo.lock index de68cb64aa9..18d44c93b53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3305,6 +3305,7 @@ dependencies = [ "clap", "crc32fast", "data_types", + "futures", "futures-util", "hashbrown 0.15.3", "humantime", diff --git a/influxdb3_wal/Cargo.toml b/influxdb3_wal/Cargo.toml index 16e76aa6aae..ba87a25cced 100644 --- a/influxdb3_wal/Cargo.toml +++ b/influxdb3_wal/Cargo.toml @@ -23,6 +23,7 @@ bitcode.workspace = true bytes.workspace = true byteorder.workspace = true crc32fast.workspace = true +futures.workspace = true futures-util.workspace = true hashbrown.workspace = true humantime.workspace = true diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 2c878caa601..4647c1f678e 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -144,18 +144,22 @@ impl WalObjectStore { object_store: Arc, path: Path, ) -> Result { + info!(?path, "loading path"); let file_bytes = object_store.get(&path).await?.bytes().await?; Ok(verify_file_type_and_deserialize(file_bytes)?) } - let mut replay_tasks = Vec::new(); - for path in paths { - let object_store = Arc::clone(&self.object_store); - replay_tasks.push(tokio::spawn(get_contents(object_store, path))); - } + let stream = futures::stream::iter(paths); + let mut replay_tasks = stream + .map(|path| { + let object_store = Arc::clone(&self.object_store); + async move { get_contents(object_store, path).await } + }) + // limit concurrency when loading + .buffered(20); - for wal_contents in replay_tasks { - let wal_contents = wal_contents.await??; + while let Some(wal_contents) = replay_tasks.next().await { + let wal_contents = wal_contents?; // add this to the snapshot tracker, so we know what to clear out later if the replay // was a wal file that had a snapshot diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 450bf2ed290..ede2f5149a8 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -34,6 +34,7 @@ use schema::sort::SortKey; use std::any::Any; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Semaphore; use tokio::sync::oneshot::{self, Receiver}; use tokio::task::JoinSet; @@ -245,6 +246,8 @@ impl QueryableBuffer { let persist_jobs_empty = persist_jobs.is_empty(); let mut set = JoinSet::new(); + // don't allow more than 5 persist jobs to run concurrently + let sempahore = Arc::new(Semaphore::new(5)); for persist_job in persist_jobs { let persister = Arc::clone(&persister); let executor = Arc::clone(&executor); @@ -252,8 +255,13 @@ impl QueryableBuffer { let parquet_cache = parquet_cache.clone(); let buffer = Arc::clone(&buffer); let persisted_files = Arc::clone(&persisted_files); + let semaphore = Arc::clone(&sempahore); set.spawn(async move { + let permit = semaphore + .acquire_owned() + .await + .expect("to get permit to run sort/dedupe in parallel"); let path = persist_job.path.to_string(); let database_id = persist_job.database_id; let table_id = persist_job.table_id; @@ -308,11 +316,16 @@ impl QueryableBuffer { persisted_snapshot .lock() - .add_parquet_file(database_id, table_id, parquet_file) + .add_parquet_file(database_id, table_id, parquet_file); + drop(permit); }); } + // set.join_all().await; set.join_all().await; + // while let Some(res) = set.join_next().await { + // res.expect("sort/dedupe to succeed"); + // } // persist the snapshot file - only if persist jobs are present // if persist_jobs is empty, then parquet file wouldn't have been