diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 34933a98d..1c6cf300b 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -698,6 +698,64 @@ impl ObjectStorage for BlobStore { Ok(streams) } + async fn list_hours( + &self, + stream_name: &str, + date: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let hours = resp + .common_prefixes + .iter() + .filter_map(|path| { + let path_str = path.as_ref(); + if let Some(stripped) = path_str.strip_prefix(&format!("{}/{}/", stream_name, date)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } + }) + .filter(|dir| dir.starts_with("hour=")) + .collect(); + + Ok(hours) + } + + async fn list_minutes( + &self, + stream_name: &str, + date: &str, + hour: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let minutes = resp + .common_prefixes + .iter() + .filter_map(|path| { + let path_str = path.as_ref(); + if let Some(stripped) = + path_str.strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } + }) + .filter(|dir| dir.starts_with("minute=")) + .collect(); + + Ok(minutes) + } + async fn list_manifest_files( &self, stream_name: &str, diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 23bae4710..8171344f5 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -605,6 +605,64 @@ impl ObjectStorage for Gcs { Ok(streams) } + async fn list_hours( + &self, + stream_name: &str, + date: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let hours = resp + .common_prefixes + .iter() + .filter_map(|path| { + let path_str = path.as_ref(); + if let Some(stripped) = path_str.strip_prefix(&format!("{}/{}/", stream_name, date)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } + }) + .filter(|dir| dir.starts_with("hour=")) + .collect(); + + Ok(hours) + } + + async fn list_minutes( + &self, + stream_name: &str, + date: &str, + hour: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let minutes = resp + .common_prefixes + .iter() + .filter_map(|path| { + let path_str = path.as_ref(); + if let Some(stripped) = + path_str.strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } + }) + .filter(|dir| dir.starts_with("minute=")) + .collect(); + + Ok(minutes) + } + async fn list_manifest_files( &self, stream_name: &str, diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 8e8e99541..82eca88fe 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -422,6 +422,43 @@ impl ObjectStorage for LocalFS { Ok(dates.into_iter().flatten().collect()) } + async fn list_hours( + &self, + stream_name: &str, + date: &str, + ) -> Result, ObjectStorageError> { + let path = self.root.join(stream_name).join(date); + let directories = ReadDirStream::new(fs::read_dir(&path).await?); + let entries: Vec = directories.try_collect().await?; + let entries = entries.into_iter().map(dir_name); + let hours: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?; + Ok(hours + .into_iter() + .flatten() + .filter(|dir| dir.starts_with("hour=")) + .collect()) + } + + async fn list_minutes( + &self, + stream_name: &str, + date: &str, + hour: &str, + ) -> Result, ObjectStorageError> { + let path = self.root.join(stream_name).join(date).join(hour); + // Propagate any read_dir errors instead of swallowing them + let directories = ReadDirStream::new(fs::read_dir(&path).await?); + let entries: Vec = directories.try_collect().await?; + let entries = entries.into_iter().map(dir_name); + let minutes: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?; + // Filter down to only the "minute=" prefixed directories + Ok(minutes + .into_iter() + .flatten() + .filter(|dir| dir.starts_with("minute=")) + .collect()) + } + async fn list_manifest_files( &self, _stream_name: &str, diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 1f6d39e88..a1e987068 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -45,7 +45,6 @@ use ulid::Ulid; use crate::alerts::AlertConfig; use crate::alerts::target::Target; -use crate::catalog::snapshot::ManifestItem; use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; @@ -71,8 +70,6 @@ use super::{ STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, StorageMetadata, retention::Retention, }; -use crate::event::DEFAULT_TIMESTAMP_KEY; - /// Context for upload operations containing stream information pub(crate) struct UploadContext { stream: Arc, @@ -309,6 +306,24 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; + /// Lists the immediate “hour=” partition directories under the given date. + /// Only immediate child entries named `hour=HH` should be returned (no trailing slash). + /// `HH` must be zero-padded two-digit numerals (`"hour=00"` through `"hour=23"`). + async fn list_hours( + &self, + stream_name: &str, + date: &str, + ) -> Result, ObjectStorageError>; + + /// Lists the immediate “minute=” partition directories under the given date/hour. + /// Only immediate child entries named `minute=MM` should be returned (no trailing slash). + /// `MM` must be zero-padded two-digit numerals (`"minute=00"` through `"minute=59"`). + async fn list_minutes( + &self, + stream_name: &str, + date: &str, + hour: &str, + ) -> Result, ObjectStorageError>; async fn list_manifest_files( &self, stream_name: &str, @@ -839,6 +854,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } /// Retrieves both the first and latest event timestamps from storage for the specified stream. + /// Uses directory structure traversal instead of downloading manifest files for better performance. + /// + /// This optimized implementation avoids downloading potentially large manifest files by leveraging + /// the hierarchical directory structure (date=YYYY-MM-DD/hour=HH/minute=MM/) to derive timestamps. + /// It performs efficient list operations to find the min/max date, hour, and minute combinations, + /// then constructs the actual timestamps from this directory information. /// /// # Arguments /// @@ -852,76 +873,42 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { &self, stream_name: &str, ) -> Result<(Option, Option), ObjectStorageError> { - // Get time partition for the stream - let time_partition = if let Ok(stream) = crate::parseable::PARSEABLE.get_stream(stream_name) - { - stream.get_time_partition() - } else { - None - }; - - // Get parsed stream metadata files - let stream_jsons = self.get_stream_meta_from_storage(stream_name).await?; - - // Collect all manifest items from snapshots - let mut all_manifest_items = Vec::new(); - for stream_format in &stream_jsons { - let manifest_items = &stream_format.snapshot.manifest_list; - all_manifest_items.extend(manifest_items.iter()); - } - - if all_manifest_items.is_empty() { + // Get all available dates for the stream + let dates = self.list_dates(stream_name).await?; + if dates.is_empty() { return Ok((None, None)); } - // Find min/max in one pass - let (mut first_manifest_item, mut latest_manifest_item) = (None, None); - for &item in &all_manifest_items { - if first_manifest_item - .is_none_or(|cur: &ManifestItem| item.time_lower_bound < cur.time_lower_bound) - { - first_manifest_item = Some(item); - } - if latest_manifest_item - .is_none_or(|cur: &ManifestItem| item.time_upper_bound > cur.time_upper_bound) - { - latest_manifest_item = Some(item); - } + // Parse and sort dates to find min and max + let mut parsed_dates: Vec<_> = dates + .iter() + .filter_map(|date_str| { + // Extract date from "date=YYYY-MM-DD" format + if let Some(date_part) = date_str.strip_prefix("date=") { + chrono::NaiveDate::parse_from_str(date_part, "%Y-%m-%d") + .ok() + .map(|date| (date, date_str)) + } else { + None + } + }) + .collect(); + + if parsed_dates.is_empty() { + return Ok((None, None)); } - let partition_column = time_partition.as_deref().unwrap_or(DEFAULT_TIMESTAMP_KEY); + parsed_dates.sort_by_key(|(date, _)| *date); + let min_date = &parsed_dates[0].1; + let max_date = &parsed_dates[parsed_dates.len() - 1].1; - // Extract first and latest timestamps - check if we can reuse the same manifest - let (first_timestamp, latest_timestamp) = if let (Some(first_item), Some(latest_item)) = - (first_manifest_item, latest_manifest_item) - { - if first_item.manifest_path == latest_item.manifest_path { - // Same manifest, we can get both min and max in one pass - let manifest = self - .load_manifest_from_path(&first_item.manifest_path) - .await?; - self.extract_timestamps_from_manifest(&manifest, partition_column) - } else { - // Different manifests, need to load separately - let first_ts = self - .extract_timestamp_from_manifest( - &first_item.manifest_path, - partition_column, - true, - ) - .await?; - let latest_ts = self - .extract_timestamp_from_manifest( - &latest_item.manifest_path, - partition_column, - false, - ) - .await?; - (first_ts, latest_ts) - } - } else { - (None, None) - }; + // Extract timestamps for min and max dates + let first_timestamp = self + .extract_timestamp_for_date(stream_name, min_date, true) + .await?; + let latest_timestamp = self + .extract_timestamp_for_date(stream_name, max_date, false) + .await?; let first_event_at = first_timestamp.map(|ts| ts.to_rfc3339()); let latest_event_at = latest_timestamp.map(|ts| ts.to_rfc3339()); @@ -929,111 +916,81 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok((first_event_at, latest_event_at)) } - /// Helper method to load a manifest file from object storage - async fn load_manifest_from_path( + /// Extract timestamp for a specific date by traversing the hour/minute structure + async fn extract_timestamp_for_date( &self, - manifest_path: &str, - ) -> Result { - use crate::{catalog::manifest::Manifest, query::QUERY_SESSION}; - - let object_store = QUERY_SESSION - .state() - .runtime_env() - .object_store_registry - .get_store(&self.store_url()) - .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; - - let path = object_store::path::Path::parse(manifest_path) - .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; - - let manifest_response = object_store - .get(&path) - .await - .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; - - let manifest_bytes = manifest_response - .bytes() - .await - .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; + stream_name: &str, + date: &str, + find_min: bool, + ) -> Result>, ObjectStorageError> { + // Get all hours for this date + let hours = self.list_hours(stream_name, date).await?; + if hours.is_empty() { + return Ok(None); + } - let manifest: Manifest = serde_json::from_slice(&manifest_bytes) - .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; + // Find min/max hour and corresponding string without collecting all values + let (target_hour_value, target_hour_str) = hours + .iter() + .filter_map(|hour_str| { + hour_str.strip_prefix("hour=").and_then(|hour_part| { + hour_part.parse::().ok().map(|hour| (hour, hour_str)) + }) + }) + .reduce(|acc, curr| { + if find_min { + if curr.0 < acc.0 { curr } else { acc } + } else if curr.0 > acc.0 { + curr + } else { + acc + } + }) + .ok_or_else(|| ObjectStorageError::Custom("No valid hours found".to_string()))?; - Ok(manifest) - } + // Get all minutes for the target hour + let minutes = self + .list_minutes(stream_name, date, target_hour_str) + .await?; + if minutes.is_empty() { + return Ok(None); + } - /// Helper method to extract min and max timestamps from a manifest - /// Returns (min_timestamp, max_timestamp) - fn extract_timestamps_from_manifest( - &self, - manifest: &Manifest, - partition_column: &str, - ) -> (Option>, Option>) { - use crate::catalog::column::TypedStatistics; - use chrono::{DateTime, Utc}; - - let mut min_timestamp: Option> = None; - let mut max_timestamp: Option> = None; - - for file in &manifest.files { - if let Some(column) = file.columns.iter().find(|col| col.name == partition_column) - && let Some(stats) = &column.stats - { - match stats { - TypedStatistics::Int(int_stats) => { - if let Some(min_ts) = DateTime::from_timestamp_millis(int_stats.min) { - min_timestamp = Some(match min_timestamp { - Some(existing) => existing.min(min_ts), - None => min_ts, - }); - } - if let Some(max_ts) = DateTime::from_timestamp_millis(int_stats.max) { - max_timestamp = Some(match max_timestamp { - Some(existing) => existing.max(max_ts), - None => max_ts, - }); - } - } - TypedStatistics::String(str_stats) => { - if let Ok(min_ts) = DateTime::parse_from_rfc3339(&str_stats.min) { - let min_ts = min_ts.with_timezone(&Utc); - min_timestamp = Some(match min_timestamp { - Some(existing) => existing.min(min_ts), - None => min_ts, - }); - } - if let Ok(max_ts) = DateTime::parse_from_rfc3339(&str_stats.max) { - let max_ts = max_ts.with_timezone(&Utc); - max_timestamp = Some(match max_timestamp { - Some(existing) => existing.max(max_ts), - None => max_ts, - }); - } - } - _ => {} // Skip other types + // Find min/max minute directly without collecting all values + let target_minute = minutes + .iter() + .filter_map(|minute_str| { + minute_str + .strip_prefix("minute=") + .and_then(|minute_part| minute_part.parse::().ok()) + }) + .reduce(|acc, curr| { + if find_min { + if curr < acc { curr } else { acc } + } else if curr > acc { + curr + } else { + acc } - } - } + }) + .ok_or_else(|| ObjectStorageError::Custom("No valid minutes found".to_string()))?; - (min_timestamp, max_timestamp) - } + // Extract date components and construct timestamp + if let Some(date_part) = date.strip_prefix("date=") + && let Ok(parsed_date) = chrono::NaiveDate::parse_from_str(date_part, "%Y-%m-%d") + { + // Create timestamp from date, hour, and minute with seconds hardcoded to 00 + let naive_datetime = parsed_date + .and_hms_opt(target_hour_value, target_minute, 0) + .unwrap_or_else(|| parsed_date.and_hms_opt(0, 0, 0).unwrap()); + + return Ok(Some(DateTime::from_naive_utc_and_offset( + naive_datetime, + Utc, + ))); + } - /// Helper method to extract timestamp from a manifest file - async fn extract_timestamp_from_manifest( - &self, - manifest_path: &str, - partition_column: &str, - find_min: bool, - ) -> Result>, ObjectStorageError> { - let manifest = self.load_manifest_from_path(manifest_path).await?; - let (min_timestamp, max_timestamp) = - self.extract_timestamps_from_manifest(&manifest, partition_column); - - Ok(if find_min { - min_timestamp - } else { - max_timestamp - }) + Ok(None) } // pick a better name diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 1f150c4de..824ab021a 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -781,6 +781,64 @@ impl ObjectStorage for S3 { Ok(streams) } + async fn list_hours( + &self, + stream_name: &str, + date: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let hours: Vec = resp + .common_prefixes + .iter() + .filter_map(|path| { + let path_str = path.as_ref(); + if let Some(stripped) = path_str.strip_prefix(&format!("{}/{}/", stream_name, date)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } + }) + .filter(|dir| dir.starts_with("hour=")) + .collect(); + + Ok(hours) + } + + async fn list_minutes( + &self, + stream_name: &str, + date: &str, + hour: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let minutes: Vec = resp + .common_prefixes + .iter() + .filter_map(|path| { + let path_str = path.as_ref(); + if let Some(stripped) = + path_str.strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } + }) + .filter(|dir| dir.starts_with("minute=")) + .collect(); + + Ok(minutes) + } + async fn list_manifest_files( &self, stream_name: &str,