Skip to content

optimise first and latest event fetch #1410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 21, 2025

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Aug 21, 2025

Summary by CodeRabbit

  • New Features

    • Hour- and minute-level listing of stream data across all storage backends.
    • Compute and return first and latest event timestamps by traversing date/hour/minute partitions.
  • Refactor

    • Timestamp discovery switched from manifest-based to directory-based traversal for better scalability and performance.
  • Documentation

    • Updated docs/comments to reflect the new directory-based timestamp strategy.

Copy link
Contributor

coderabbitai bot commented Aug 21, 2025

Walkthrough

Adds directory-based time-bucket listing methods (list_hours, list_minutes) to the ObjectStorage trait and implements them for S3, GCS, Azure Blob, and LocalFS. Replaces manifest-based timestamp extraction with directory traversal (date → hour → minute) and adds extract_timestamp_for_date to build DateTime values.

Changes

Cohort / File(s) Summary
ObjectStorage trait & timestamp logic
src/storage/object_storage.rs
Removes manifest-based timestamp extraction and related imports. Adds trait methods list_hours, list_minutes, and extract_timestamp_for_date. Updates get_first_and_latest_event_from_storage to derive RFC3339 timestamps by listing dates → hours → minutes and constructing DateTime<Utc>.
S3 backend
src/storage/s3.rs
Adds list_hours and list_minutes using delimiter-based listing and parsing common_prefixes, returning filtered hour=/minute= directory names.
GCS backend
src/storage/gcs.rs
Adds list_hours and list_minutes via list_with_delimiter on prefixes and extracting hour=/minute= directory names from common_prefixes.
Azure Blob backend
src/storage/azure_blob.rs
Adds list_hours and list_minutes using hierarchical prefix listing (list_with_delimiter) and filtering prefixes starting with hour=/minute=.
Local filesystem backend
src/storage/localfs.rs
Adds list_hours and list_minutes using async directory reads (ReadDirStream + FuturesUnordered), extracting and filtering directory names for hour=/minute= entries.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Caller
  participant TS as object_storage.rs
  participant Obj as ObjectStorage (S3/GCS/Azure/LocalFS)

  Caller->>TS: get_first_and_latest_event_from_storage(stream)
  TS->>Obj: list_dates(stream)
  Obj-->>TS: [date=YYYY-MM-DD, ...]

  alt dates found
    TS->>TS: choose min_date / max_date
    par first (min)
      TS->>Obj: list_hours(stream, min_date)
      Obj-->>TS: [hour=HH, ...]
      TS->>Obj: list_minutes(stream, min_date, chosen_hour)
      Obj-->>TS: [minute=MM, ...]
      TS->>TS: build DateTime<Utc> -> RFC3339
    and latest (max)
      TS->>Obj: list_hours(stream, max_date)
      Obj-->>TS: [hour=HH, ...]
      TS->>Obj: list_minutes(stream, max_date, chosen_hour)
      Obj-->>TS: [minute=MM, ...]
      TS->>TS: build DateTime<Utc> -> RFC3339
    end
    TS-->>Caller: { first: RFC3339?, latest: RFC3339? }
  else no dates
    TS-->>Caller: { first: None, latest: None }
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

for next release

Poem

I hop through date-folders, sniff hour and minute,
No bulky manifests—just tidy time-bucket flint.
I nibble prefixes, return what I find,
Timestamps in order, neatly aligned.
Hooray! says the rabbit, with a hop and a grin 🐇

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/storage/object_storage.rs (1)

869-916: Make min/max selection resilient to sparse/invalid dates; drop redundant branch.

If the earliest or latest date has no valid hour/minute subdirs, the current code can return None for one side even when valid data exists on adjacent dates. Also, the min_date == max_date branch is redundant.

Proposed refactor: scan forward for the first valid timestamp and backward for the last valid timestamp.

@@
-        // Extract timestamps for min and max dates
-        let first_timestamp = self
-            .extract_timestamp_for_date(stream_name, min_date, true)
-            .await?;
-        let latest_timestamp = if min_date == max_date {
-            // Same date, get max timestamp from the same date
-            self.extract_timestamp_for_date(stream_name, max_date, false)
-                .await?
-        } else {
-            // Different dates, get max from the latest date
-            self.extract_timestamp_for_date(stream_name, max_date, false)
-                .await?
-        };
+        // Find first timestamp by scanning from earliest to latest date
+        let mut first_timestamp = None;
+        for (_, date_str) in &parsed_dates {
+            if let Some(ts) = self
+                .extract_timestamp_for_date(stream_name, date_str, true)
+                .await?
+            {
+                first_timestamp = Some(ts);
+                break;
+            }
+        }
+
+        // Find latest timestamp by scanning from latest to earliest date
+        let mut latest_timestamp = None;
+        for (_, date_str) in parsed_dates.iter().rev() {
+            if let Some(ts) = self
+                .extract_timestamp_for_date(stream_name, date_str, false)
+                .await?
+            {
+                latest_timestamp = Some(ts);
+                break;
+            }
+        }
@@
-        let first_event_at = first_timestamp.map(|ts| ts.to_rfc3339());
-        let latest_event_at = latest_timestamp.map(|ts| ts.to_rfc3339());
+        let first_event_at = first_timestamp.map(|ts| ts.to_rfc3339());
+        let latest_event_at = latest_timestamp.map(|ts| ts.to_rfc3339());
🧹 Nitpick comments (9)
src/storage/object_storage.rs (1)

849-856: Clarify timestamp resolution and guarantees in docs.

Since timestamps are derived from date/hour/minute partitions, document that:

  • Resolution is minute-level (seconds inferred).
  • Directory names must follow date=YYYY-MM-DD/hour=HH/minute=MM.
  • Behavior when partitions exist but are empty/sparse.

This will prevent confusion for API consumers expecting manifest-level precision.

src/storage/azure_blob.rs (2)

701-722: LGTM; consider stricter filtering for valid hour partitions.

Current filter keeps any "hour=" prefix. Tighten to accept only 0–23 to avoid malformed directories influencing selection.

-            .filter(|dir| dir.starts_with("hour="))
+            .filter(|dir| {
+                dir.strip_prefix("hour=")
+                    .and_then(|h| h.parse::<u8>().ok())
+                    .map(|h| h <= 23)
+                    .unwrap_or(false)
+            })

724-746: Validate minute partitions to avoid malformed inputs.

Mirror the hour check for minutes to allow only 0–59.

-            .filter(|dir| dir.starts_with("minute="))
+            .filter(|dir| {
+                dir.strip_prefix("minute=")
+                    .and_then(|m| m.parse::<u8>().ok())
+                    .map(|m| m <= 59)
+                    .unwrap_or(false)
+            })
src/storage/localfs.rs (2)

425-441: Local listing is fine; add numeric guard for hour=HH.

To keep behavior consistent with cloud backends and protect from stray directories, filter hour values to 0–23.

-        Ok(hours
-            .into_iter()
-            .flatten()
-            .filter(|dir| dir.starts_with("hour="))
-            .collect())
+        Ok(hours
+            .into_iter()
+            .flatten()
+            .filter(|dir| {
+                dir.strip_prefix("hour=")
+                    .and_then(|h| h.parse::<u8>().ok())
+                    .map(|h| h <= 23)
+                    .unwrap_or(false)
+            })
+            .collect())

442-460: Mirror numeric guard for minute=MM.

-        Ok(minutes
-            .into_iter()
-            .flatten()
-            .filter(|dir| dir.starts_with("minute="))
-            .collect())
+        Ok(minutes
+            .into_iter()
+            .flatten()
+            .filter(|dir| {
+                dir.strip_prefix("minute=")
+                    .and_then(|m| m.parse::<u8>().ok())
+                    .map(|m| m <= 59)
+                    .unwrap_or(false)
+            })
+            .collect())
src/storage/gcs.rs (2)

608-629: Hour listing matches the pattern; add range validation to be robust.

Same reasoning as Azure/LocalFS—accept HH in 0–23 only.

-            .filter(|dir| dir.starts_with("hour="))
+            .filter(|dir| {
+                dir.strip_prefix("hour=")
+                    .and_then(|h| h.parse::<u8>().ok())
+                    .map(|h| h <= 23)
+                    .unwrap_or(false)
+            })

631-653: Validate minute directories (0–59).

-            .filter(|dir| dir.starts_with("minute="))
+            .filter(|dir| {
+                dir.strip_prefix("minute=")
+                    .and_then(|m| m.parse::<u8>().ok())
+                    .map(|m| m <= 59)
+                    .unwrap_or(false)
+            })
src/storage/s3.rs (2)

784-805: S3 hour listing is consistent; add HH range filter to avoid bad partitions.

-            .filter(|dir| dir.starts_with("hour="))
+            .filter(|dir| {
+                dir.strip_prefix("hour=")
+                    .and_then(|h| h.parse::<u8>().ok())
+                    .map(|h| h <= 23)
+                    .unwrap_or(false)
+            })

807-829: Validate minute partitions in S3 (0–59).

-            .filter(|dir| dir.starts_with("minute="))
+            .filter(|dir| {
+                dir.strip_prefix("minute=")
+                    .and_then(|m| m.parse::<u8>().ok())
+                    .map(|m| m <= 59)
+                    .unwrap_or(false)
+            })
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f0e7a17 and 7ee906e.

📒 Files selected for processing (5)
  • src/storage/azure_blob.rs (1 hunks)
  • src/storage/gcs.rs (1 hunks)
  • src/storage/localfs.rs (1 hunks)
  • src/storage/object_storage.rs (4 hunks)
  • src/storage/s3.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
src/storage/localfs.rs (4)
src/storage/azure_blob.rs (2)
  • list_hours (701-722)
  • list_minutes (724-746)
src/storage/gcs.rs (2)
  • list_hours (608-629)
  • list_minutes (631-653)
src/storage/object_storage.rs (2)
  • list_hours (309-313)
  • list_minutes (314-319)
src/storage/s3.rs (2)
  • list_hours (784-805)
  • list_minutes (807-829)
src/storage/azure_blob.rs (3)
src/storage/gcs.rs (6)
  • list_hours (608-629)
  • resp (251-257)
  • resp (303-309)
  • resp (692-697)
  • resp (707-712)
  • list_minutes (631-653)
src/storage/object_storage.rs (2)
  • list_hours (309-313)
  • list_minutes (314-319)
src/storage/s3.rs (9)
  • list_hours (784-805)
  • from (897-905)
  • from (909-911)
  • resp (413-419)
  • resp (465-471)
  • resp (868-873)
  • resp (883-888)
  • s (180-180)
  • list_minutes (807-829)
src/storage/s3.rs (4)
src/storage/azure_blob.rs (6)
  • list_hours (701-722)
  • resp (282-288)
  • resp (334-340)
  • resp (789-794)
  • resp (804-809)
  • list_minutes (724-746)
src/storage/gcs.rs (6)
  • list_hours (608-629)
  • resp (251-257)
  • resp (303-309)
  • resp (692-697)
  • resp (707-712)
  • list_minutes (631-653)
src/storage/localfs.rs (3)
  • list_hours (425-440)
  • from (600-602)
  • list_minutes (442-460)
src/storage/object_storage.rs (2)
  • list_hours (309-313)
  • list_minutes (314-319)
src/storage/gcs.rs (4)
src/storage/azure_blob.rs (6)
  • list_hours (701-722)
  • resp (282-288)
  • resp (334-340)
  • resp (789-794)
  • resp (804-809)
  • list_minutes (724-746)
src/storage/localfs.rs (3)
  • list_hours (425-440)
  • from (600-602)
  • list_minutes (442-460)
src/storage/object_storage.rs (2)
  • list_hours (309-313)
  • list_minutes (314-319)
src/storage/s3.rs (9)
  • list_hours (784-805)
  • from (897-905)
  • from (909-911)
  • resp (413-419)
  • resp (465-471)
  • resp (868-873)
  • resp (883-888)
  • s (180-180)
  • list_minutes (807-829)
src/storage/object_storage.rs (4)
src/storage/azure_blob.rs (2)
  • list_hours (701-722)
  • list_minutes (724-746)
src/storage/gcs.rs (2)
  • list_hours (608-629)
  • list_minutes (631-653)
src/storage/localfs.rs (2)
  • list_hours (425-440)
  • list_minutes (442-460)
src/storage/s3.rs (2)
  • list_hours (784-805)
  • list_minutes (807-829)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (1)
src/storage/object_storage.rs (1)

308-320: No stale manifest helpers found; trait change approved

I ran a targeted search for the removed helper load_manifest_from_path and confirmed there are no lingering references:

  • rg -nP '\bload_manifest_from_path\b' returned no matches.

The matches on DEFAULT_TIMESTAMP_KEY and ManifestItem are expected, as those types remain in use elsewhere and aren’t tied to the removed manifest‐helper logic. You’re clear to merge.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/storage/object_storage.rs (1)

912-991: Don’t fail on malformed partitions; add range checks; set seconds=59 for “latest”; remove duplicate and_hms_opt

This mirrors a prior comment on this PR. Current code returns an error when no valid hours/minutes are found, doesn’t bound hour/minute ranges, always sets seconds=0, and calls and_hms_opt redundantly. Prefer gracefully returning Ok(None), bound hour in 0..=23 and minute in 0..=59, and set seconds to 0 for earliest and 59 for latest.

Apply this focused diff:

-        // Find min/max hour and corresponding string without collecting all values
-        let (target_hour_value, target_hour_str) = hours
-            .iter()
-            .filter_map(|hour_str| {
-                hour_str.strip_prefix("hour=").and_then(|hour_part| {
-                    hour_part.parse::<u32>().ok().map(|hour| (hour, hour_str))
-                })
-            })
-            .reduce(|acc, curr| {
-                if find_min {
-                    if curr.0 < acc.0 { curr } else { acc }
-                } else if curr.0 > acc.0 {
-                    curr
-                } else {
-                    acc
-                }
-            })
-            .ok_or_else(|| ObjectStorageError::Custom("No valid hours found".to_string()))?;
+        // Find min/max hour; ignore malformed or out-of-range partitions
+        let maybe_hour = hours
+            .iter()
+            .filter_map(|hour_str| {
+                hour_str
+                    .strip_prefix("hour=")
+                    .and_then(|hour_part| hour_part.parse::<u32>().ok())
+                    .filter(|h| *h <= 23)
+                    .map(|h| (h, hour_str.as_str()))
+            })
+            .reduce(|acc, curr| {
+                if find_min {
+                    if curr.0 < acc.0 { curr } else { acc }
+                } else if curr.0 > acc.0 {
+                    curr
+                } else {
+                    acc
+                }
+            });
+        let Some((target_hour_value, target_hour_str)) = maybe_hour else {
+            return Ok(None);
+        };
@@
-        // Find min/max minute directly without collecting all values
-        let target_minute = minutes
-            .iter()
-            .filter_map(|minute_str| {
-                minute_str
-                    .strip_prefix("minute=")
-                    .and_then(|minute_part| minute_part.parse::<u32>().ok())
-            })
-            .reduce(|acc, curr| {
-                if find_min {
-                    if curr < acc { curr } else { acc }
-                } else if curr > acc {
-                    curr
-                } else {
-                    acc
-                }
-            })
-            .ok_or_else(|| ObjectStorageError::Custom("No valid minutes found".to_string()))?;
+        // Find min/max minute; ignore malformed or out-of-range partitions
+        let maybe_minute = minutes
+            .iter()
+            .filter_map(|minute_str| {
+                minute_str
+                    .strip_prefix("minute=")
+                    .and_then(|minute_part| minute_part.parse::<u32>().ok())
+                    .filter(|m| *m <= 59)
+            })
+            .reduce(|acc, curr| {
+                if find_min {
+                    if curr < acc { curr } else { acc }
+                } else if curr > acc {
+                    curr
+                } else {
+                    acc
+                }
+            });
+        let Some(target_minute) = maybe_minute else {
+            return Ok(None);
+        };
@@
-        if let Some(date_part) = date.strip_prefix("date=")
-            && let Ok(parsed_date) = chrono::NaiveDate::parse_from_str(date_part, "%Y-%m-%d")
-        {
-            // Create timestamp from date, hour, and minute with seconds hardcoded to 00
-            let naive_datetime = parsed_date
-                .and_hms_opt(target_hour_value, target_minute, 0)
-                .unwrap_or_else(|| {
-                    parsed_date
-                        .and_hms_opt(target_hour_value, target_minute, 0)
-                        .unwrap_or_else(|| parsed_date.and_hms_opt(0, 0, 0).unwrap())
-                });
-
-            return Ok(Some(DateTime::from_naive_utc_and_offset(
-                naive_datetime,
-                Utc,
-            )));
-        }
+        if let Some(date_part) = date.strip_prefix("date=") {
+            if let Ok(parsed_date) = chrono::NaiveDate::parse_from_str(date_part, "%Y-%m-%d") {
+                let seconds = if find_min { 0 } else { 59 };
+                if let Some(naive_datetime) =
+                    parsed_date.and_hms_opt(target_hour_value, target_minute, seconds)
+                {
+                    return Ok(Some(DateTime::from_naive_utc_and_offset(
+                        naive_datetime,
+                        Utc,
+                    )));
+                } else {
+                    // Should be unreachable due to range checks; treat as no data.
+                    return Ok(None);
+                }
+            }
+        }

Note: This also aligns the implementation with the repository’s filename/partition invariants we discussed previously (see retrieved learning from #1409 about enforced date/hour/minute patterns).

If helpful, I can add unit tests on LocalFS to cover malformed partitions, empty hours/minutes, and the seconds=59 behavior for latest. Want me to open a follow-up PR?

🧹 Nitpick comments (2)
src/storage/object_storage.rs (2)

869-889: Avoid full sort; derive min/max dates in a single pass

For large streams with many date partitions, allocating and sorting a Vec is unnecessary. You can compute min and max in one scan and skip storing references in a temporary Vec.

Apply this diff:

-        // Parse and sort dates to find min and max
-        let mut parsed_dates: Vec<_> = dates
-            .iter()
-            .filter_map(|date_str| {
-                // Extract date from "date=YYYY-MM-DD" format
-                if let Some(date_part) = date_str.strip_prefix("date=") {
-                    chrono::NaiveDate::parse_from_str(date_part, "%Y-%m-%d")
-                        .ok()
-                        .map(|date| (date, date_str))
-                } else {
-                    None
-                }
-            })
-            .collect();
-
-        if parsed_dates.is_empty() {
-            return Ok((None, None));
-        }
-
-        parsed_dates.sort_by_key(|(date, _)| *date);
-        let min_date = &parsed_dates[0].1;
-        let max_date = &parsed_dates[parsed_dates.len() - 1].1;
+        // Single-pass extraction of min/max date partitions
+        let mut min_pair: Option<(chrono::NaiveDate, &str)> = None;
+        let mut max_pair: Option<(chrono::NaiveDate, &str)> = None;
+        for date_str in &dates {
+            if let Some(date_part) = date_str.strip_prefix("date=") {
+                if let Ok(d) = chrono::NaiveDate::parse_from_str(date_part, "%Y-%m-%d") {
+                    match min_pair {
+                        Some((min_d, _)) if d >= min_d => {},
+                        _ => min_pair = Some((d, date_str.as_str())),
+                    }
+                    match max_pair {
+                        Some((max_d, _)) if d <= max_d => {},
+                        _ => max_pair = Some((d, date_str.as_str())),
+                    }
+                }
+            }
+        }
+        if min_pair.is_none() || max_pair.is_none() {
+            return Ok((None, None));
+        }
+        let min_date = min_pair.unwrap().1;
+        let max_date = max_pair.unwrap().1;

898-905: Parallelize earliest/latest extraction to shave latency

These two traversals are independent. Running them concurrently can reduce tail latency on object stores.

Apply this diff:

-        // Extract timestamps for min and max dates
-        let first_timestamp = self
-            .extract_timestamp_for_date(stream_name, min_date, true)
-            .await?;
-        let latest_timestamp = self
-            .extract_timestamp_for_date(stream_name, max_date, false)
-            .await?;
+        // Extract timestamps for min and max dates concurrently
+        let (first_res, latest_res) = tokio::join!(
+            self.extract_timestamp_for_date(stream_name, min_date, true),
+            self.extract_timestamp_for_date(stream_name, max_date, false)
+        );
+        let first_timestamp = first_res?;
+        let latest_timestamp = latest_res?;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 7ee906e and 935b9cf.

📒 Files selected for processing (1)
  • src/storage/object_storage.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-08-20T17:01:25.759Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1409
File: src/storage/field_stats.rs:429-456
Timestamp: 2025-08-20T17:01:25.759Z
Learning: In Parseable's field stats calculation (src/storage/field_stats.rs), the extract_datetime_from_parquet_path_regex function correctly works with filename-only parsing because Parseable's server-side filename generation guarantees the dot-separated format date=YYYY-MM-DD.hour=HH.minute=MM pattern in parquet filenames.

Applied to files:

  • src/storage/object_storage.rs
🧬 Code graph analysis (1)
src/storage/object_storage.rs (4)
src/storage/localfs.rs (2)
  • list_hours (425-440)
  • list_minutes (442-460)
src/storage/azure_blob.rs (2)
  • list_hours (701-722)
  • list_minutes (724-746)
src/storage/s3.rs (2)
  • list_hours (784-805)
  • list_minutes (807-829)
src/storage/gcs.rs (2)
  • list_hours (608-629)
  • list_minutes (631-653)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (2)
src/storage/object_storage.rs (2)

849-856: Docs read well and match the new directory-traversal approach

The shift from manifest downloads to hierarchical listing is clear and aligns with the PR objective to optimize first/latest fetches. Nice.


906-909: RFC3339 mapping is correct

Converting Option<DateTime> to RFC3339 strings at the edge is clean and keeps internals typed.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/storage/object_storage.rs (2)

932-976: Don’t error on “no valid hours/minutes”; return Ok(None) to keep the call resilient.

If a backend returns unexpected entries (or only malformed ones), .ok_or_else(..) bubbles up an error and fails the whole query. Prefer graceful Ok(None) so the caller can try adjacent dates (as per the previous comment). No extra range checks needed here given the invariant that partitions derive from valid filenames.

-        let (target_hour_value, target_hour_str) = hours
+        let maybe_hour = hours
             .iter()
             .filter_map(|hour_str| {
                 hour_str.strip_prefix("hour=").and_then(|hour_part| {
                     hour_part.parse::<u32>().ok().map(|hour| (hour, hour_str))
                 })
             })
             .reduce(|acc, curr| {
                 if find_min {
                     if curr.0 < acc.0 { curr } else { acc }
                 } else if curr.0 > acc.0 {
                     curr
                 } else {
                     acc
                 }
             })
-            .ok_or_else(|| ObjectStorageError::Custom("No valid hours found".to_string()))?;
+            ;
+        let Some((target_hour_value, target_hour_str)) = maybe_hour else {
+            // No usable hour partitions under this date
+            return Ok(None);
+        };
@@
-        let target_minute = minutes
+        let maybe_minute = minutes
             .iter()
             .filter_map(|minute_str| {
                 minute_str
                     .strip_prefix("minute=")
                     .and_then(|minute_part| minute_part.parse::<u32>().ok())
             })
             .reduce(|acc, curr| {
                 if find_min {
                     if curr < acc { curr } else { acc }
                 } else if curr > acc {
                     curr
                 } else {
                     acc
                 }
             })
-            .ok_or_else(|| ObjectStorageError::Custom("No valid minutes found".to_string()))?;
+            ;
+        let Some(target_minute) = maybe_minute else {
+            // No usable minute partitions under this hour
+            return Ok(None);
+        };

978-991: Set seconds based on find_min and remove fallback-to-midnight; return None instead.

Hardcoding 00 for seconds yields a too-early timestamp for “latest” and falling back to 00:00:00 can misrepresent the date when components are malformed. Choose seconds by find_min, and if and_hms_opt fails, treat it as “no data”.

-            // Create timestamp from date, hour, and minute with seconds hardcoded to 00
-            let naive_datetime = parsed_date
-                .and_hms_opt(target_hour_value, target_minute, 0)
-                .unwrap_or_else(|| parsed_date.and_hms_opt(0, 0, 0).unwrap());
-
-            return Ok(Some(DateTime::from_naive_utc_and_offset(
-                naive_datetime,
-                Utc,
-            )));
+            // Create timestamp; seconds = 0 for earliest, 59 for latest
+            let seconds = if find_min { 0 } else { 59 };
+            if let Some(naive_datetime) =
+                parsed_date.and_hms_opt(target_hour_value, target_minute, seconds)
+            {
+                return Ok(Some(DateTime::from_naive_utc_and_offset(
+                    naive_datetime,
+                    Utc,
+                )));
+            } else {
+                // Treat as no data rather than fabricating midnight
+                return Ok(None);
+            }
🧹 Nitpick comments (3)
src/storage/object_storage.rs (3)

309-326: Clarify trait contract for list_hours/list_minutes (tolerant semantics, empty Vec is OK).

Great to see the trait surface defined. Please make the error semantics explicit so all backends behave consistently and callers can treat “no partitions” as non-error. Add that implementations should skip unrelated/malformed entries and that returning an empty Vec is expected.

@@
-    /// Lists the immediate “hour=” partition directories under the given date.
-    /// Only immediate child entries named `hour=HH` should be returned (no trailing slash).
-    /// `HH` must be zero-padded two-digit numerals (`"hour=00"` through `"hour=23"`).
+    /// Lists the immediate “hour=” partition directories under the given date.
+    /// Only immediate child entries named `hour=HH` should be returned (no trailing slash).
+    /// `HH` must be zero-padded two-digit numerals (`"hour=00"` through `"hour=23"`).
+    /// Implementations must quietly skip unrelated or malformed entries (do not error).
+    /// Returning an empty Vec means no hour partitions exist; this is not an error.
@@
-    /// Lists the immediate “minute=” partition directories under the given date/hour.
-    /// Only immediate child entries named `minute=MM` should be returned (no trailing slash).
-    /// `MM` must be zero-padded two-digit numerals (`"minute=00"` through `"minute=59"`).
+    /// Lists the immediate “minute=” partition directories under the given date/hour.
+    /// Only immediate child entries named `minute=MM` should be returned (no trailing slash).
+    /// `MM` must be zero-padded two-digit numerals (`"minute=00"` through `"minute=59"`).
+    /// Implementations must quietly skip unrelated or malformed entries (do not error).
+    /// Returning an empty Vec means no minute partitions exist; this is not an error.

856-863: Docs: spell out “seconds” semantics for earliest/latest.

The behavior defaults seconds to 00; for “latest” it’s more intuitive to use 59 to represent the upper bound of that minute. Document this explicitly to avoid ambiguity for consumers.

@@
-    /// then constructs the actual timestamps from this directory information.
+    /// then constructs timestamps from this directory information.
+    /// Seconds semantics:
+    /// - earliest/first: second=0
+    /// - latest: second=59

883-904: Minor ergonomics: avoid double-references and panics.

Not blocking, but using indexing produces &&String and is a bit clunky. The refactor above removes it. If you keep the current shape, prefer first()/last() to avoid manual indexing.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between db0caef and 541744e.

📒 Files selected for processing (1)
  • src/storage/object_storage.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:0-0
Timestamp: 2025-08-21T11:47:01.251Z
Learning: In Parseable's object storage implementation (src/storage/object_storage.rs), the hour and minute directory prefixes (hour=XX, minute=YY) are generated from arrow file timestamps following proper datetime conventions, so they are guaranteed to be within valid ranges (0-23 for hours, 0-59 for minutes) and don't require additional range validation.
📚 Learning: 2025-08-21T11:47:01.251Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:0-0
Timestamp: 2025-08-21T11:47:01.251Z
Learning: In Parseable's object storage implementation (src/storage/object_storage.rs), the hour and minute directory prefixes (hour=XX, minute=YY) are generated from arrow file timestamps following proper datetime conventions, so they are guaranteed to be within valid ranges (0-23 for hours, 0-59 for minutes) and don't require additional range validation.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-08-20T17:01:25.759Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1409
File: src/storage/field_stats.rs:429-456
Timestamp: 2025-08-20T17:01:25.759Z
Learning: In Parseable's field stats calculation (src/storage/field_stats.rs), the extract_datetime_from_parquet_path_regex function correctly works with filename-only parsing because Parseable's server-side filename generation guarantees the dot-separated format date=YYYY-MM-DD.hour=HH.minute=MM pattern in parquet filenames.

Applied to files:

  • src/storage/object_storage.rs
📚 Learning: 2025-06-16T05:20:18.593Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1346
File: src/parseable/streams.rs:351-355
Timestamp: 2025-06-16T05:20:18.593Z
Learning: In the Parseable codebase, arrow files are expected to always have valid creation or modified timestamps as a basic system assumption. The conversion flow uses expect() on file metadata operations to enforce this invariant with fail-fast behavior rather than graceful error handling, as violations represent fundamental system issues that should cause immediate failures.

Applied to files:

  • src/storage/object_storage.rs
🧬 Code graph analysis (1)
src/storage/object_storage.rs (4)
src/storage/gcs.rs (2)
  • list_hours (608-634)
  • list_minutes (636-664)
src/storage/azure_blob.rs (2)
  • list_hours (701-727)
  • list_minutes (729-757)
src/storage/localfs.rs (2)
  • list_hours (425-440)
  • list_minutes (442-460)
src/storage/s3.rs (2)
  • list_hours (784-810)
  • list_minutes (812-840)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage

@nitisht nitisht merged commit 9f6556e into parseablehq:main Aug 21, 2025
13 checks passed
@nikhilsinhaparseable nikhilsinhaparseable deleted the optimise-info branch August 21, 2025 15:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants