-
Notifications
You must be signed in to change notification settings - Fork 83
feat(job-orchestration): Allow compression jobs to specify S3 object keys via S3InputConfig.
#1407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(job-orchestration): Allow compression jobs to specify S3 object keys via S3InputConfig.
#1407
Conversation
WalkthroughAdds conditional dispatch in Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller as Caller
participant API as s3_get_object_metadata
participant Prefix as _s3_get_object_metadata_from_single_prefix
participant Keys as _s3_get_object_metadata_from_keys
participant Head as _s3_get_object_metadata_from_key
participant S3 as Amazon S3
Caller->>API: s3_get_object_metadata(bucket, key_prefix, s3_input_config)
alt s3_input_config.keys is None
API->>Prefix: call prefix-based iterator
Prefix->>S3: ListObjectsV2 (paged, StartAfter)
S3-->>Prefix: pages of objects
Prefix-->>API: yields FileMetadata[] (skips "/" keys)
else keys provided
API->>Keys: call keys-based flow
Keys->>Keys: validate keys (non-empty, prefix match, no duplicates, ordering)
Keys->>Head: for each key -> head_object wrapper
Head->>S3: HeadObject(key)
S3-->>Head: metadata or ClientError
Head-->>Keys: metadata or raise ValueError
Keys-->>API: FileMetadata[] or raise ValueError (missing/out-of-order)
end
API-->>Caller: return FileMetadata[] or raise error
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: ASSERTIVE Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)components/clp-py-utils/clp_py_utils/s3_utils.py (1)
🪛 Ruff (0.14.0)components/clp-py-utils/clp_py_utils/s3_utils.py4-4: Import from Import from (UP035) 4-4: (UP035) 4-4: (UP035) 4-4: (UP035) 4-4: (UP035) 401-401: Unnecessary Rewrite as a literal (C408) 429-429: Avoid specifying long messages outside the exception class (TRY003) 434-434: Avoid specifying long messages outside the exception class (TRY003) 436-436: Avoid specifying long messages outside the exception class (TRY003) 438-438: Avoid specifying long messages outside the exception class (TRY003) 455-455: Avoid specifying long messages outside the exception class (TRY003) 469-471: Avoid specifying long messages outside the exception class (TRY003) 492-495: Avoid specifying long messages outside the exception class (TRY003) ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
🔇 Additional comments (4)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/clp-py-utils/clp_py_utils/s3_utils.py(3 hunks)components/job-orchestration/job_orchestration/scheduler/job_config.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)
components/clp-py-utils/clp_py_utils/core.py (1)
FileMetadata(7-22)
🪛 Ruff (0.14.0)
components/job-orchestration/job_orchestration/scheduler/job_config.py
39-39: Missing return type annotation for classmethod validate_keys
(ANN206)
41-41: Avoid specifying long messages outside the exception class
(TRY003)
components/clp-py-utils/clp_py_utils/s3_utils.py
403-403: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
442-442: Avoid specifying long messages outside the exception class
(TRY003)
447-447: Avoid specifying long messages outside the exception class
(TRY003)
449-449: Avoid specifying long messages outside the exception class
(TRY003)
453-453: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
477-477: Avoid specifying long messages outside the exception class
(TRY003)
491-493: Avoid specifying long messages outside the exception class
(TRY003)
514-517: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
514-517: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: package-image
| @field_validator("keys") | ||
| @classmethod | ||
| def validate_keys(cls, value): | ||
| if value is not None and len(value) == 0: | ||
| raise ValueError("Keys cannot be an empty list") | ||
| return value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Annotate the validator return type to keep Ruff happy
Ruff’s ANN206 rule flags this classmethod because it lacks a return-type annotation. Adding -> Optional[List[str]] keeps lint clean and documents intent.
- def validate_keys(cls, value):
+ def validate_keys(cls, value: Optional[List[str]]) -> Optional[List[str]]:🧰 Tools
🪛 Ruff (0.14.0)
39-39: Missing return type annotation for classmethod validate_keys
(ANN206)
41-41: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
In components/job-orchestration/job_orchestration/scheduler/job_config.py around
lines 37 to 42, the @field_validator classmethod validate_keys lacks a
return-type annotation which trips Ruff ANN206; add an explicit return type like
-> Optional[List[str]] to the method signature and, if not already imported,
import Optional and List from typing (or use from typing import Optional, List)
so the validator is properly annotated and the linter is satisfied.
junhaoliao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the overall logic lgtm and is consistent with what we discussed offline. i posted some questions about refactoring and edge case handling
Co-authored-by: Junhao Liao <[email protected]>
Co-authored-by: Junhao Liao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)
444-449: Fix in-place mutation of the keys list.The
keys.sort()call mutates the list attached tos3_input_config.keys, causing a visible side effect for the caller. Use a sorted copy instead to keep the config immutable.Apply this diff to fix the mutation issue:
- keys.sort() - for idx, key in enumerate(keys): + sorted_keys = sorted(keys) + for idx, key in enumerate(sorted_keys): if not key.startswith(key_prefix): raise ValueError(f"Key `{key}` doesn't start with the specified prefix `{key_prefix}`.") - if idx > 0 and key == keys[idx - 1]: + if idx > 0 and key == sorted_keys[idx - 1]: raise ValueError(f"Duplicate key found: `{key}`.")Then update line 451 to use
sorted_keys:- key_iterator = iter(keys) + key_iterator = iter(sorted_keys)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/clp-py-utils/clp_py_utils/s3_utils.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)
components/clp-py-utils/clp_py_utils/core.py (1)
FileMetadata(7-22)
🪛 Ruff (0.14.0)
components/clp-py-utils/clp_py_utils/s3_utils.py
403-403: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
442-442: Avoid specifying long messages outside the exception class
(TRY003)
447-447: Avoid specifying long messages outside the exception class
(TRY003)
449-449: Avoid specifying long messages outside the exception class
(TRY003)
453-453: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
477-477: Avoid specifying long messages outside the exception class
(TRY003)
491-493: Avoid specifying long messages outside the exception class
(TRY003)
514-517: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
514-517: Avoid specifying long messages outside the exception class
(TRY003)
🔇 Additional comments (4)
components/clp-py-utils/clp_py_utils/s3_utils.py (4)
7-7: LGTM!The
botocoreimport is necessary for exception handling in the new helper functions.
256-278: LGTM!The branching logic correctly delegates to the appropriate helper function based on whether explicit keys are provided. The updated docstring accurately documents the propagated exceptions.
451-458: LGTM once the mutation issue is fixed.The first-key handling logic is correct, using
head_objectfor efficient single-key retrieval and appropriately returning early when only one key is provided.
460-483: LGTM!The pagination logic correctly validates that all provided keys exist in the bucket, efficiently using
StartAfterto skip already-processed objects. The directory placeholder handling is consistent with the prefix-based approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)
436-447: Do not mutate the caller-provided key list in place
keys.sort()reordersS3InputConfig.keysdirectly. Callers pass this config expecting it to remain untouched, so this side effect may leak into subsequent code paths. Work on a sorted copy instead (e.g.,sorted_keys = sorted(keys)) and iterate over that copy.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/clp-py-utils/clp_py_utils/s3_utils.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)
components/clp-py-utils/clp_py_utils/core.py (1)
FileMetadata(7-22)
🪛 Ruff (0.14.0)
components/clp-py-utils/clp_py_utils/s3_utils.py
4-4: Import from collections.abc instead: Generator
Import from collections.abc
(UP035)
4-4: typing.Dict is deprecated, use dict instead
(UP035)
4-4: typing.List is deprecated, use list instead
(UP035)
4-4: typing.Set is deprecated, use set instead
(UP035)
4-4: typing.Tuple is deprecated, use tuple instead
(UP035)
403-403: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
434-434: Avoid specifying long messages outside the exception class
(TRY003)
439-439: Avoid specifying long messages outside the exception class
(TRY003)
441-441: Avoid specifying long messages outside the exception class
(TRY003)
445-445: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
462-462: Avoid specifying long messages outside the exception class
(TRY003)
476-478: Avoid specifying long messages outside the exception class
(TRY003)
499-502: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
499-502: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: lint-check (ubuntu-24.04)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)
389-407: Prefer literal list initialisation.Use [] for consistency and to satisfy Ruff C408.
- file_metadata_list: List[FileMetadata] = list() + file_metadata_list: List[FileMetadata] = []
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/clp-py-utils/clp_py_utils/s3_utils.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)
components/clp-py-utils/clp_py_utils/core.py (1)
FileMetadata(7-22)
🪛 Ruff (0.14.0)
components/clp-py-utils/clp_py_utils/s3_utils.py
4-4: Import from collections.abc instead: Generator
Import from collections.abc
(UP035)
4-4: typing.Dict is deprecated, use dict instead
(UP035)
4-4: typing.List is deprecated, use list instead
(UP035)
4-4: typing.Set is deprecated, use set instead
(UP035)
4-4: typing.Tuple is deprecated, use tuple instead
(UP035)
403-403: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
431-431: Avoid specifying long messages outside the exception class
(TRY003)
436-436: Avoid specifying long messages outside the exception class
(TRY003)
438-438: Avoid specifying long messages outside the exception class
(TRY003)
440-440: Avoid specifying long messages outside the exception class
(TRY003)
444-444: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
457-457: Avoid specifying long messages outside the exception class
(TRY003)
466-466: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
471-473: Avoid specifying long messages outside the exception class
(TRY003)
494-497: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: package-image
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (macos-15)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: lint-check (macos-15)
🔇 Additional comments (1)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)
476-498: HeadObject wrapper: LGTM.Good conversion and exception chaining.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
components/clp-py-utils/clp_py_utils/s3_utils.py (4)
401-405: Use [] over list() for initialisation (minor).Stylistic consistency and aligns with Ruff C408.
- file_metadata_list: List[FileMetadata] = list() + file_metadata_list: List[FileMetadata] = []
269-269: Apply adaptive retries for metadata retrieval (parity with put/delete).Use the same boto3 Config with retries to improve robustness of Head/List calls.
- s3_client = _create_s3_client(s3_input_config.region_code, s3_input_config.aws_authentication) + boto3_config = Config(retries=dict(total_max_attempts=3, mode="adaptive")) + s3_client = _create_s3_client( + s3_input_config.region_code, s3_input_config.aws_authentication, boto3_config + )
497-499: PEP 604 union breaks Python 3.9; use Optional[str].
str | Nonerequires Python 3.10+. This package targets >=3.9, so this will raise a SyntaxError on import.-def _iter_s3_objects( - s3_client: boto3.client, bucket: str, key_prefix: str, start_from: str | None = None -) -> Generator[Tuple[str, int], None, None]: +def _iter_s3_objects( + s3_client: boto3.client, bucket: str, key_prefix: str, start_from: Optional[str] = None +) -> Generator[Tuple[str, int], None, None]:
412-417: Docstring missing param for key_prefix.Add the entry for clarity.
- :param bucket: - :param keys: + :param bucket: + :param key_prefix: + :param keys:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/clp-py-utils/clp_py_utils/s3_utils.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
components/clp-py-utils/clp_py_utils/s3_utils.py (1)
components/clp-py-utils/clp_py_utils/core.py (1)
FileMetadata(7-22)
🪛 Ruff (0.14.0)
components/clp-py-utils/clp_py_utils/s3_utils.py
4-4: Import from collections.abc instead: Generator
Import from collections.abc
(UP035)
4-4: typing.Dict is deprecated, use dict instead
(UP035)
4-4: typing.List is deprecated, use list instead
(UP035)
4-4: typing.Set is deprecated, use set instead
(UP035)
4-4: typing.Tuple is deprecated, use tuple instead
(UP035)
401-401: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
428-428: Avoid specifying long messages outside the exception class
(TRY003)
433-433: Avoid specifying long messages outside the exception class
(TRY003)
435-435: Avoid specifying long messages outside the exception class
(TRY003)
437-437: Avoid specifying long messages outside the exception class
(TRY003)
441-441: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
454-454: Avoid specifying long messages outside the exception class
(TRY003)
463-463: Unnecessary list() call (rewrite as a literal)
Rewrite as a literal
(C408)
468-470: Avoid specifying long messages outside the exception class
(TRY003)
491-494: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: package-image
junhaoliao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. sorry for the delay in the review
for the title, how about:
feat(job-orchestration): Allow compression jobs to specify S3 object keys via `S3InputConfig`.
or
feat(job-orchestration): Support specifying explicit S3 object keys in `S3InputConfig` for compression.
S3InputConfig.S3InputConfig.
Description
NOTE: This is not a breaking change so any existing S3 compression should still work.
This was originally a part of #1383. However, we decided to remove the compression interface change and make a clean PR for updating the job input config.
This PR updates
S3InputConfigto add support for accepting a list of S3 object keys. Before this PR, the input config can only take a key prefix and ingest all logs under this prefix. With this PR, we make it more flexible so that users can define a list of keys under the given prefix. Notice that these keys have the following constraints:S3Config.To get metadata for the compression job partition, we take advantage of the shared prefix and do the following:
HeadObjectAPI to get the metadata for the first object within the sorted key.ListObjectAPI to get the metadata for the rest of keys, using the key-prefix for prefix filtering, and using the key of the first object (from last step) forStartAfterpartition.In this way, we obtain metadata in batches to improve performance.
Checklist
breaking change.
Validation performed
I used this ingestor prototype for testing: https://github.com/LinZhihao-723/LogIngestorServer with the following steps:
cargo run --release -- --db-url "mysql://clp-user:$CLP_DB_PASSWORD@localhost:3306/clp-db"lzh-testbucket for testing. YScope developers should have a valid credential to access this bucket)Summary by CodeRabbit
New Features
Bug Fixes
Documentation