Skip to content

Commit 949f4d6

Browse files
author
Devdutt Shenoi
committed
feat: split methods and use unordered futures
1 parent f1af947 commit 949f4d6

File tree

1 file changed

+49
-34
lines changed

1 file changed

+49
-34
lines changed

src/hottier.rs

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -408,9 +408,8 @@ impl HotTierManager {
408408
);
409409
self.put_hot_tier(stream, &mut stream_hot_tier).await?;
410410
file_processed = true;
411-
let mut hot_tier_manifest = self
412-
.get_stream_hot_tier_manifest_for_date(stream, &date)
413-
.await?;
411+
let path = self.get_stream_path_for_date(stream, &date);
412+
let mut hot_tier_manifest = HotTierManager::get_hot_tier_manifest_from_path(path).await?;
414413
hot_tier_manifest.files.push(parquet_file.clone());
415414
hot_tier_manifest
416415
.files
@@ -465,36 +464,40 @@ impl HotTierManager {
465464
Ok(date_list)
466465
}
467466

468-
///get hot tier manifest for the stream and date
469-
pub async fn get_stream_hot_tier_manifest_for_date(
470-
&self,
471-
stream: &str,
472-
date: &NaiveDate,
473-
) -> Result<Manifest, HotTierError> {
467+
///get hot tier manifest on path
468+
pub async fn get_hot_tier_manifest_from_path(path: PathBuf) -> Result<Manifest, HotTierError> {
469+
if !path.exists() {
470+
return Ok(Manifest::default());
471+
}
472+
473+
// List the directories and prepare the hot tier manifest
474+
let mut date_dirs = fs::read_dir(&path).await?;
474475
let mut hot_tier_manifest = Manifest::default();
475-
let path = self
476-
.hot_tier_path
477-
.join(stream)
478-
.join(format!("date={}", date));
479-
if path.exists() {
480-
let date_dirs = ReadDirStream::new(fs::read_dir(&path).await?);
481-
let manifest_files: Vec<DirEntry> = date_dirs.try_collect().await?;
482-
for manifest in manifest_files {
483-
if !manifest
484-
.file_name()
485-
.to_string_lossy()
486-
.ends_with(".manifest.json")
487-
{
488-
continue;
489-
}
490-
let file = fs::read(manifest.path()).await?;
491-
let manifest: Manifest = serde_json::from_slice(&file)?;
492-
hot_tier_manifest.files.extend(manifest.files);
476+
477+
// Avoid unnecessary checks and keep only valid manifest files
478+
while let Some(manifest) = date_dirs.next_entry().await? {
479+
if !manifest
480+
.file_name()
481+
.to_string_lossy()
482+
.ends_with(".manifest.json")
483+
{
484+
continue;
493485
}
486+
// Deserialize each manifest file and extend the hot tier manifest with its files
487+
let file = fs::read(manifest.path()).await?;
488+
let manifest: Manifest = serde_json::from_slice(&file)?;
489+
hot_tier_manifest.files.extend(manifest.files);
494490
}
491+
495492
Ok(hot_tier_manifest)
496493
}
497494

495+
pub fn get_stream_path_for_date(&self, stream: &str, date: &NaiveDate) -> PathBuf {
496+
self.hot_tier_path
497+
.join(stream)
498+
.join(format!("date={}", date))
499+
}
500+
498501
///get the list of files from all the manifests present in hot tier directory for the stream
499502
pub async fn get_hot_tier_manifest_files(
500503
&self,
@@ -527,17 +530,29 @@ impl HotTierManager {
527530
&self,
528531
stream: &str,
529532
) -> Result<Vec<File>, HotTierError> {
530-
let mut hot_tier_parquet_files: Vec<File> = Vec::new();
533+
// Fetch list of dates for the given stream
531534
let date_list = self.fetch_hot_tier_dates(stream).await?;
535+
536+
// Create an unordered iter of futures to async collect files
537+
let mut tasks = FuturesUnordered::new();
538+
539+
// For each date, fetch the manifest and extract parquet files
532540
for date in date_list {
533-
let manifest = self
534-
.get_stream_hot_tier_manifest_for_date(stream, &date)
535-
.await?;
541+
let path = self.get_stream_path_for_date(stream, &date);
542+
tasks.push(async move {
543+
HotTierManager::get_hot_tier_manifest_from_path(path)
544+
.await
545+
.map(|manifest| manifest.files.clone())
546+
.unwrap_or_default() // If fetching manifest fails, return an empty vector
547+
});
548+
}
536549

537-
for parquet_file in manifest.files {
538-
hot_tier_parquet_files.push(parquet_file.clone());
539-
}
550+
// Collect parquet files for all dates
551+
let mut hot_tier_parquet_files: Vec<File> = vec![];
552+
while let Some(files) = tasks.next().await {
553+
hot_tier_parquet_files.extend(files);
540554
}
555+
541556
Ok(hot_tier_parquet_files)
542557
}
543558

0 commit comments

Comments
 (0)