-
Notifications
You must be signed in to change notification settings - Fork 83
fix(package): Refactor dataset handling in the package. #1023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 36 commits
0d186e6
75ac0ff
bb1e5f4
ba7cfe1
68454c6
c1de746
d797198
8c39e77
d570ab6
398ab5e
7759a7a
e6b8cc7
7a468c3
1dd1cea
a0c3c29
a9bf615
fe05f5f
39a9278
7124828
eb80992
5ed44e7
af6b508
67fb01f
d6ad4de
ff7d700
7ffc77c
0255cbd
1076a3f
71c4d82
6bd9372
84df2e2
983bea1
bdb7817
a82a267
f699496
90ce0a4
d6f9e5a
dc6a706
76bcb4a
a4e6f83
7b42568
afe43ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -4,13 +4,17 @@ | |||||||
| import pathlib | ||||||||
| import sys | ||||||||
| import time | ||||||||
| import typing | ||||||||
| from contextlib import closing | ||||||||
| from typing import List | ||||||||
| from typing import List, Optional, Union | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) Remove unused import. The -from typing import List, Optional, Union
+from typing import List, Union📝 Committable suggestion
Suggested change
🧰 Tools🪛 Flake8 (7.2.0)[error] 8-8: 'typing.Optional' imported but unused (F401) 🤖 Prompt for AI Agents |
||||||||
|
|
||||||||
| import brotli | ||||||||
| import msgpack | ||||||||
| from clp_py_utils.clp_config import CLPConfig, COMPRESSION_JOBS_TABLE_NAME | ||||||||
| from clp_py_utils.clp_config import ( | ||||||||
| CLP_DEFAULT_DATASET_NAME, | ||||||||
| CLPConfig, | ||||||||
| COMPRESSION_JOBS_TABLE_NAME, | ||||||||
| StorageEngine, | ||||||||
| ) | ||||||||
| from clp_py_utils.pretty_size import pretty_size | ||||||||
| from clp_py_utils.s3_utils import parse_s3_url | ||||||||
| from clp_py_utils.sql_adapter import SQL_Adapter | ||||||||
|
|
@@ -132,14 +136,18 @@ def handle_job(sql_adapter: SQL_Adapter, clp_io_config: ClpIoConfig, no_progress | |||||||
|
|
||||||||
|
|
||||||||
| def _generate_clp_io_config( | ||||||||
| clp_config: CLPConfig, logs_to_compress: List[str], parsed_args: argparse.Namespace | ||||||||
| ) -> typing.Union[S3InputConfig, FsInputConfig]: | ||||||||
| clp_config: CLPConfig, | ||||||||
| logs_to_compress: List[str], | ||||||||
| parsed_args: argparse.Namespace, | ||||||||
| dataset: Optional[str], | ||||||||
| ) -> Union[S3InputConfig, FsInputConfig]: | ||||||||
| input_type = clp_config.logs_input.type | ||||||||
|
|
||||||||
| if InputType.FS == input_type: | ||||||||
| if len(logs_to_compress) == 0: | ||||||||
| raise ValueError(f"No input paths given.") | ||||||||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||
| return FsInputConfig( | ||||||||
| dataset=dataset, | ||||||||
| paths_to_compress=logs_to_compress, | ||||||||
| timestamp_key=parsed_args.timestamp_key, | ||||||||
| path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR), | ||||||||
|
|
@@ -154,6 +162,7 @@ def _generate_clp_io_config( | |||||||
| region_code, bucket_name, key_prefix = parse_s3_url(s3_url) | ||||||||
| aws_authentication = clp_config.logs_input.aws_authentication | ||||||||
| return S3InputConfig( | ||||||||
| dataset=dataset, | ||||||||
| region_code=region_code, | ||||||||
| bucket=bucket_name, | ||||||||
| key_prefix=key_prefix, | ||||||||
|
|
@@ -224,7 +233,12 @@ def main(argv): | |||||||
|
|
||||||||
| logs_to_compress = _get_logs_to_compress(pathlib.Path(parsed_args.logs_list).resolve()) | ||||||||
|
|
||||||||
| clp_input_config = _generate_clp_io_config(clp_config, logs_to_compress, parsed_args) | ||||||||
| dataset = ( | ||||||||
| CLP_DEFAULT_DATASET_NAME | ||||||||
| if StorageEngine.CLP_S == clp_config.package.storage_engine | ||||||||
| else None | ||||||||
| ) | ||||||||
|
||||||||
| clp_input_config = _generate_clp_io_config(clp_config, logs_to_compress, parsed_args, dataset) | ||||||||
| clp_output_config = OutputConfig.parse_obj(clp_config.archive_output) | ||||||||
| if parsed_args.tags: | ||||||||
| tag_list = [tag.strip().lower() for tag in parsed_args.tags.split(",") if tag] | ||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,7 +14,6 @@ | |||||||||||||||||||
| import yaml | ||||||||||||||||||||
| from clp_py_utils.clp_config import ( | ||||||||||||||||||||
| ALL_TARGET_NAME, | ||||||||||||||||||||
| ARCHIVES_TABLE_SUFFIX, | ||||||||||||||||||||
| AwsAuthType, | ||||||||||||||||||||
| CLP_DEFAULT_DATASET_NAME, | ||||||||||||||||||||
| CLPConfig, | ||||||||||||||||||||
|
|
@@ -23,7 +22,6 @@ | |||||||||||||||||||
| COMPRESSION_WORKER_COMPONENT_NAME, | ||||||||||||||||||||
| CONTROLLER_TARGET_NAME, | ||||||||||||||||||||
| DB_COMPONENT_NAME, | ||||||||||||||||||||
| FILES_TABLE_SUFFIX, | ||||||||||||||||||||
| QUERY_JOBS_TABLE_NAME, | ||||||||||||||||||||
| QUERY_SCHEDULER_COMPONENT_NAME, | ||||||||||||||||||||
| QUERY_WORKER_COMPONENT_NAME, | ||||||||||||||||||||
|
|
@@ -35,6 +33,10 @@ | |||||||||||||||||||
| StorageType, | ||||||||||||||||||||
| WEBUI_COMPONENT_NAME, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| from clp_py_utils.clp_metadata_db_utils import ( | ||||||||||||||||||||
| get_archives_table_name, | ||||||||||||||||||||
| get_files_table_name, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| from clp_py_utils.s3_utils import generate_container_auth_options | ||||||||||||||||||||
| from job_orchestration.scheduler.constants import QueueName | ||||||||||||||||||||
| from pydantic import BaseModel | ||||||||||||||||||||
|
|
@@ -868,13 +870,14 @@ def start_webui( | |||||||||||||||||||
| # Read, update, and write back client's and server's settings.json | ||||||||||||||||||||
| clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True) | ||||||||||||||||||||
| table_prefix = clp_db_connection_params["table_prefix"] | ||||||||||||||||||||
| dataset: Optional[str] = None | ||||||||||||||||||||
| if StorageEngine.CLP_S == clp_config.package.storage_engine: | ||||||||||||||||||||
| table_prefix = f"{table_prefix}{CLP_DEFAULT_DATASET_NAME}_" | ||||||||||||||||||||
| dataset = CLP_DEFAULT_DATASET_NAME | ||||||||||||||||||||
|
Comment on lines
+873
to
+875
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) Minor nit: collapse the two-step The explicit type annotation is helpful, but the two-step pattern is verbose. You could inline it without losing clarity: - dataset: Optional[str] = None
- if StorageEngine.CLP_S == clp_config.package.storage_engine:
- dataset = CLP_DEFAULT_DATASET_NAME
+ dataset: Optional[str] = (
+ CLP_DEFAULT_DATASET_NAME
+ if StorageEngine.CLP_S == clp_config.package.storage_engine
+ else None
+ )Reduces cognitive load and keeps the flow tight. 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||
| client_settings_json_updates = { | ||||||||||||||||||||
| "ClpStorageEngine": clp_config.package.storage_engine, | ||||||||||||||||||||
| "MongoDbSearchResultsMetadataCollectionName": clp_config.webui.results_metadata_collection_name, | ||||||||||||||||||||
| "SqlDbClpArchivesTableName": f"{table_prefix}{ARCHIVES_TABLE_SUFFIX}", | ||||||||||||||||||||
| "SqlDbClpFilesTableName": f"{table_prefix}{FILES_TABLE_SUFFIX}", | ||||||||||||||||||||
| "SqlDbClpArchivesTableName": get_archives_table_name(table_prefix, dataset), | ||||||||||||||||||||
| "SqlDbClpFilesTableName": get_files_table_name(table_prefix, dataset), | ||||||||||||||||||||
| "SqlDbCompressionJobsTableName": COMPRESSION_JOBS_TABLE_NAME, | ||||||||||||||||||||
| } | ||||||||||||||||||||
| client_settings_json = read_and_update_settings_json( | ||||||||||||||||||||
|
|
@@ -884,6 +887,7 @@ def start_webui( | |||||||||||||||||||
| client_settings_json_file.write(json.dumps(client_settings_json)) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| server_settings_json_updates = { | ||||||||||||||||||||
| "ClpStorageEngine": clp_config.package.storage_engine, | ||||||||||||||||||||
| "SqlDbHost": clp_config.database.host, | ||||||||||||||||||||
| "SqlDbPort": clp_config.database.port, | ||||||||||||||||||||
| "SqlDbName": clp_config.database.name, | ||||||||||||||||||||
|
|
||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.