Skip to content

Commit 9f6556e

Browse files
fix: optimise first and latest event fetch (#1410)
1 parent f0e7a17 commit 9f6556e

File tree

5 files changed

+333
-165
lines changed

5 files changed

+333
-165
lines changed

src/storage/azure_blob.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,64 @@ impl ObjectStorage for BlobStore {
698698
Ok(streams)
699699
}
700700

701+
async fn list_hours(
702+
&self,
703+
stream_name: &str,
704+
date: &str,
705+
) -> Result<Vec<String>, ObjectStorageError> {
706+
let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date));
707+
let resp = self.client.list_with_delimiter(Some(&pre)).await?;
708+
709+
let hours = resp
710+
.common_prefixes
711+
.iter()
712+
.filter_map(|path| {
713+
let path_str = path.as_ref();
714+
if let Some(stripped) = path_str.strip_prefix(&format!("{}/{}/", stream_name, date))
715+
{
716+
// Remove trailing slash if present, otherwise use as is
717+
let clean_path = stripped.strip_suffix('/').unwrap_or(stripped);
718+
Some(clean_path.to_string())
719+
} else {
720+
None
721+
}
722+
})
723+
.filter(|dir| dir.starts_with("hour="))
724+
.collect();
725+
726+
Ok(hours)
727+
}
728+
729+
async fn list_minutes(
730+
&self,
731+
stream_name: &str,
732+
date: &str,
733+
hour: &str,
734+
) -> Result<Vec<String>, ObjectStorageError> {
735+
let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour));
736+
let resp = self.client.list_with_delimiter(Some(&pre)).await?;
737+
738+
let minutes = resp
739+
.common_prefixes
740+
.iter()
741+
.filter_map(|path| {
742+
let path_str = path.as_ref();
743+
if let Some(stripped) =
744+
path_str.strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour))
745+
{
746+
// Remove trailing slash if present, otherwise use as is
747+
let clean_path = stripped.strip_suffix('/').unwrap_or(stripped);
748+
Some(clean_path.to_string())
749+
} else {
750+
None
751+
}
752+
})
753+
.filter(|dir| dir.starts_with("minute="))
754+
.collect();
755+
756+
Ok(minutes)
757+
}
758+
701759
async fn list_manifest_files(
702760
&self,
703761
stream_name: &str,

src/storage/gcs.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,64 @@ impl ObjectStorage for Gcs {
605605
Ok(streams)
606606
}
607607

608+
async fn list_hours(
609+
&self,
610+
stream_name: &str,
611+
date: &str,
612+
) -> Result<Vec<String>, ObjectStorageError> {
613+
let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date));
614+
let resp = self.client.list_with_delimiter(Some(&pre)).await?;
615+
616+
let hours = resp
617+
.common_prefixes
618+
.iter()
619+
.filter_map(|path| {
620+
let path_str = path.as_ref();
621+
if let Some(stripped) = path_str.strip_prefix(&format!("{}/{}/", stream_name, date))
622+
{
623+
// Remove trailing slash if present, otherwise use as is
624+
let clean_path = stripped.strip_suffix('/').unwrap_or(stripped);
625+
Some(clean_path.to_string())
626+
} else {
627+
None
628+
}
629+
})
630+
.filter(|dir| dir.starts_with("hour="))
631+
.collect();
632+
633+
Ok(hours)
634+
}
635+
636+
async fn list_minutes(
637+
&self,
638+
stream_name: &str,
639+
date: &str,
640+
hour: &str,
641+
) -> Result<Vec<String>, ObjectStorageError> {
642+
let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour));
643+
let resp = self.client.list_with_delimiter(Some(&pre)).await?;
644+
645+
let minutes = resp
646+
.common_prefixes
647+
.iter()
648+
.filter_map(|path| {
649+
let path_str = path.as_ref();
650+
if let Some(stripped) =
651+
path_str.strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour))
652+
{
653+
// Remove trailing slash if present, otherwise use as is
654+
let clean_path = stripped.strip_suffix('/').unwrap_or(stripped);
655+
Some(clean_path.to_string())
656+
} else {
657+
None
658+
}
659+
})
660+
.filter(|dir| dir.starts_with("minute="))
661+
.collect();
662+
663+
Ok(minutes)
664+
}
665+
608666
async fn list_manifest_files(
609667
&self,
610668
stream_name: &str,

src/storage/localfs.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,43 @@ impl ObjectStorage for LocalFS {
422422
Ok(dates.into_iter().flatten().collect())
423423
}
424424

425+
async fn list_hours(
426+
&self,
427+
stream_name: &str,
428+
date: &str,
429+
) -> Result<Vec<String>, ObjectStorageError> {
430+
let path = self.root.join(stream_name).join(date);
431+
let directories = ReadDirStream::new(fs::read_dir(&path).await?);
432+
let entries: Vec<DirEntry> = directories.try_collect().await?;
433+
let entries = entries.into_iter().map(dir_name);
434+
let hours: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?;
435+
Ok(hours
436+
.into_iter()
437+
.flatten()
438+
.filter(|dir| dir.starts_with("hour="))
439+
.collect())
440+
}
441+
442+
async fn list_minutes(
443+
&self,
444+
stream_name: &str,
445+
date: &str,
446+
hour: &str,
447+
) -> Result<Vec<String>, ObjectStorageError> {
448+
let path = self.root.join(stream_name).join(date).join(hour);
449+
// Propagate any read_dir errors instead of swallowing them
450+
let directories = ReadDirStream::new(fs::read_dir(&path).await?);
451+
let entries: Vec<DirEntry> = directories.try_collect().await?;
452+
let entries = entries.into_iter().map(dir_name);
453+
let minutes: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?;
454+
// Filter down to only the "minute=" prefixed directories
455+
Ok(minutes
456+
.into_iter()
457+
.flatten()
458+
.filter(|dir| dir.starts_with("minute="))
459+
.collect())
460+
}
461+
425462
async fn list_manifest_files(
426463
&self,
427464
_stream_name: &str,

0 commit comments

Comments
 (0)