Skip to content

Commit 0b2e19e

Browse files
author
Devdutt Shenoi
committed
refactor: in-place extraction of hot-tier manifest files
1 parent bcc8669 commit 0b2e19e

File tree

2 files changed

+21
-19
lines changed

2 files changed

+21
-19
lines changed

src/hottier.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -463,31 +463,36 @@ impl HotTierManager {
463463
Ok(hot_tier_manifest)
464464
}
465465

466-
///get the list of files from all the manifests present in hot tier directory for the stream
466+
/// Returns the list of manifest files present in hot tier directory for the stream
467467
pub async fn get_hot_tier_manifest_files(
468468
&self,
469469
stream: &str,
470-
manifest_files: Vec<File>,
471-
) -> Result<(Vec<File>, Vec<File>), HotTierError> {
470+
manifest_files: &mut Vec<File>,
471+
) -> Result<Vec<File>, HotTierError> {
472+
// Fetch the list of hot tier parquet files for the given stream.
472473
let mut hot_tier_files = self.get_hot_tier_parquet_files(stream).await?;
474+
475+
// Retain only the files in `hot_tier_files` that also exist in `manifest_files`.
473476
hot_tier_files.retain(|file| {
474477
manifest_files
475478
.iter()
476479
.any(|manifest_file| manifest_file.file_path.eq(&file.file_path))
477480
});
481+
482+
// Sort `hot_tier_files` in descending order by file path.
478483
hot_tier_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));
479484

480-
let mut remaining_files: Vec<File> = manifest_files
481-
.into_iter()
482-
.filter(|manifest_file| {
483-
hot_tier_files
484-
.iter()
485-
.all(|file| !file.file_path.eq(&manifest_file.file_path))
486-
})
487-
.collect();
488-
remaining_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));
489-
490-
Ok((hot_tier_files, remaining_files))
485+
// Update `manifest_files` to exclude files that are present in the filtered `hot_tier_files`.
486+
manifest_files.retain(|manifest_file| {
487+
hot_tier_files
488+
.iter()
489+
.all(|file| !file.file_path.eq(&manifest_file.file_path))
490+
});
491+
492+
// Sort `manifest_files` in descending order by file path.
493+
manifest_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));
494+
495+
Ok(hot_tier_files)
491496
}
492497

493498
///get the list of parquet files from the hot tier directory for the stream

src/query/stream_schema_provider.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,10 @@ impl StandardTableProvider {
227227
state: &dyn Session,
228228
time_partition: Option<String>,
229229
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
230-
let (hot_tier_files, remainder) = hot_tier_manager
231-
.get_hot_tier_manifest_files(&self.stream, manifest_files.clone())
230+
let hot_tier_files = hot_tier_manager
231+
.get_hot_tier_manifest_files(&self.stream, manifest_files)
232232
.await
233233
.map_err(|err| DataFusionError::External(Box::new(err)))?;
234-
// Assign remaining entries back to manifest list
235-
// This is to be used for remote query
236-
*manifest_files = remainder;
237234

238235
let hot_tier_files = hot_tier_files
239236
.into_iter()

0 commit comments

Comments
 (0)