Skip to content

Commit 36ca2f9

Browse files
author
Devdutt Shenoi
committed
feat: split methods and use unordered futures
1 parent dd2b8b2 commit 36ca2f9

File tree

1 file changed

+49
-34
lines changed

1 file changed

+49
-34
lines changed

src/hottier.rs

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,8 @@ impl HotTierManager {
407407
);
408408
self.put_hot_tier(stream, &mut stream_hot_tier).await?;
409409
file_processed = true;
410-
let mut hot_tier_manifest = self
411-
.get_stream_hot_tier_manifest_for_date(stream, &date)
412-
.await?;
410+
let path = self.get_stream_path_for_date(stream, &date);
411+
let mut hot_tier_manifest = HotTierManager::get_hot_tier_manifest_from_path(path).await?;
413412
hot_tier_manifest.files.push(parquet_file.clone());
414413
hot_tier_manifest
415414
.files
@@ -464,36 +463,40 @@ impl HotTierManager {
464463
Ok(date_list)
465464
}
466465

467-
///get hot tier manifest for the stream and date
468-
pub async fn get_stream_hot_tier_manifest_for_date(
469-
&self,
470-
stream: &str,
471-
date: &NaiveDate,
472-
) -> Result<Manifest, HotTierError> {
466+
///get hot tier manifest on path
467+
pub async fn get_hot_tier_manifest_from_path(path: PathBuf) -> Result<Manifest, HotTierError> {
468+
if !path.exists() {
469+
return Ok(Manifest::default());
470+
}
471+
472+
// List the directories and prepare the hot tier manifest
473+
let mut date_dirs = fs::read_dir(&path).await?;
473474
let mut hot_tier_manifest = Manifest::default();
474-
let path = self
475-
.hot_tier_path
476-
.join(stream)
477-
.join(format!("date={}", date));
478-
if path.exists() {
479-
let date_dirs = ReadDirStream::new(fs::read_dir(&path).await?);
480-
let manifest_files: Vec<DirEntry> = date_dirs.try_collect().await?;
481-
for manifest in manifest_files {
482-
if !manifest
483-
.file_name()
484-
.to_string_lossy()
485-
.ends_with(".manifest.json")
486-
{
487-
continue;
488-
}
489-
let file = fs::read(manifest.path()).await?;
490-
let manifest: Manifest = serde_json::from_slice(&file)?;
491-
hot_tier_manifest.files.extend(manifest.files);
475+
476+
// Avoid unnecessary checks and keep only valid manifest files
477+
while let Some(manifest) = date_dirs.next_entry().await? {
478+
if !manifest
479+
.file_name()
480+
.to_string_lossy()
481+
.ends_with(".manifest.json")
482+
{
483+
continue;
492484
}
485+
// Deserialize each manifest file and extend the hot tier manifest with its files
486+
let file = fs::read(manifest.path()).await?;
487+
let manifest: Manifest = serde_json::from_slice(&file)?;
488+
hot_tier_manifest.files.extend(manifest.files);
493489
}
490+
494491
Ok(hot_tier_manifest)
495492
}
496493

494+
pub fn get_stream_path_for_date(&self, stream: &str, date: &NaiveDate) -> PathBuf {
495+
self.hot_tier_path
496+
.join(stream)
497+
.join(format!("date={}", date))
498+
}
499+
497500
///get the list of files from all the manifests present in hot tier directory for the stream
498501
pub async fn get_hot_tier_manifest_files(
499502
&self,
@@ -526,17 +529,29 @@ impl HotTierManager {
526529
&self,
527530
stream: &str,
528531
) -> Result<Vec<File>, HotTierError> {
529-
let mut hot_tier_parquet_files: Vec<File> = Vec::new();
532+
// Fetch list of dates for the given stream
530533
let date_list = self.fetch_hot_tier_dates(stream).await?;
534+
535+
// Create an unordered iter of futures to async collect files
536+
let mut tasks = FuturesUnordered::new();
537+
538+
// For each date, fetch the manifest and extract parquet files
531539
for date in date_list {
532-
let manifest = self
533-
.get_stream_hot_tier_manifest_for_date(stream, &date)
534-
.await?;
540+
let path = self.get_stream_path_for_date(stream, &date);
541+
tasks.push(async move {
542+
HotTierManager::get_hot_tier_manifest_from_path(path)
543+
.await
544+
.map(|manifest| manifest.files.clone())
545+
.unwrap_or_default() // If fetching manifest fails, return an empty vector
546+
});
547+
}
535548

536-
for parquet_file in manifest.files {
537-
hot_tier_parquet_files.push(parquet_file.clone());
538-
}
549+
// Collect parquet files for all dates
550+
let mut hot_tier_parquet_files: Vec<File> = vec![];
551+
while let Some(files) = tasks.next().await {
552+
hot_tier_parquet_files.extend(files);
539553
}
554+
540555
Ok(hot_tier_parquet_files)
541556
}
542557

0 commit comments

Comments
 (0)