Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0d186e6
Refactor dataset related code
haiqi96 Jun 20, 2025
75ac0ff
further refactor
haiqi96 Jun 20, 2025
bb1e5f4
Linter
haiqi96 Jun 20, 2025
ba7cfe1
A few more fixes
haiqi96 Jun 20, 2025
68454c6
Linter fixes
haiqi96 Jun 20, 2025
c1de746
missing fixes
haiqi96 Jun 20, 2025
d797198
Fix mistake
haiqi96 Jun 20, 2025
8c39e77
actually fixing
haiqi96 Jun 20, 2025
d570ab6
Linter again
haiqi96 Jun 20, 2025
398ab5e
Merge branch 'main' into DatasetRefactor
haiqi96 Jun 25, 2025
7759a7a
Merge remote-tracking branch 'origin/main' into DatasetRefactor
haiqi96 Jun 27, 2025
e6b8cc7
Linter
haiqi96 Jun 27, 2025
7a468c3
Merge branch 'main' into DatasetRefactor
Bill-hbrhbr Jun 29, 2025
1dd1cea
Move default dataset metadata table creation to start_clp
Bill-hbrhbr Jun 29, 2025
a0c3c29
Remove unused import
Bill-hbrhbr Jun 29, 2025
a9bf615
Address review comments
Bill-hbrhbr Jun 30, 2025
fe05f5f
Replace the missing SUFFIX
Bill-hbrhbr Jun 30, 2025
39a9278
Move suffix constants from clp_config to clp_metadata_db_utils local …
Bill-hbrhbr Jun 30, 2025
7124828
Refactor archive_manager.py.
kirkrodrigues Jun 30, 2025
eb80992
Refactor s3_utils.py.
kirkrodrigues Jun 30, 2025
5ed44e7
compression_task.py: Fix typing errors and minor refactoring.
kirkrodrigues Jun 30, 2025
af6b508
compression_scheduler.py: Remove exception swallow which will hide un…
kirkrodrigues Jun 30, 2025
67fb01f
Refactor query_scheduler.py.
kirkrodrigues Jun 30, 2025
d6ad4de
clp_metadata_db_utils.py: Minor refactoring.
kirkrodrigues Jun 30, 2025
ff7d700
clp_metadata_db_utils.py: Rename _generic_get_table_name -> _get_tabl…
kirkrodrigues Jun 30, 2025
7ffc77c
clp_metadata_db_utils.py: Alphabetize new public functions.
kirkrodrigues Jun 30, 2025
0255cbd
clp_metadata_db_utils.py: Reorder public and private functions for co…
kirkrodrigues Jun 30, 2025
1076a3f
initialize-clp-metadata-db.py: Remove changes unrelated to PR.
kirkrodrigues Jun 30, 2025
71c4d82
Move default dataset creation into compression_scheduler so that it r…
kirkrodrigues Jun 30, 2025
6bd9372
Apply suggestions from code review
kirkrodrigues Jul 1, 2025
84df2e2
Merge branch 'main' into DatasetRefactor
kirkrodrigues Jul 1, 2025
983bea1
Remove bug fix that's no longer necessary.
kirkrodrigues Jul 1, 2025
bdb7817
Fix bug where dataset has a default value instead of None when using …
Bill-hbrhbr Jul 1, 2025
a82a267
Correctly feed in the input config dataset names
Bill-hbrhbr Jul 1, 2025
f699496
Remove unnecessary changes
Bill-hbrhbr Jul 1, 2025
90ce0a4
Update the webui to pass the dataset name in the clp-json code path (…
kirkrodrigues Jul 2, 2025
d6f9e5a
Move dataset into the user function
haiqi96 Jul 2, 2025
dc6a706
Merge branch 'DatasetRefactor' of https://github.com/haiqi96/clp_fork…
haiqi96 Jul 2, 2025
76bcb4a
Remove unnecessary f string specifier
haiqi96 Jul 2, 2025
a4e6f83
Apply suggestions from code review
haiqi96 Jul 2, 2025
7b42568
Add import type.
kirkrodrigues Jul 2, 2025
afe43ce
Merge branch 'main' into DatasetRefactor
haiqi96 Jul 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
from pathlib import Path

from clp_py_utils.clp_config import (
ARCHIVE_TAGS_TABLE_SUFFIX,
ARCHIVES_TABLE_SUFFIX,
CLP_DEFAULT_DATASET_NAME,
Database,
FILES_TABLE_SUFFIX,
StorageEngine,
)
from clp_py_utils.clp_metadata_db_utils import (
get_archive_tags_table_name,
get_archives_table_name,
get_files_table_name,
)
from clp_py_utils.sql_adapter import SQL_Adapter

from clp_package_utils.general import (
Expand Down Expand Up @@ -191,12 +193,15 @@ def main(argv: typing.List[str]) -> int:
logger.error("`archive_output.directory` doesn't exist.")
return -1

dataset: typing.Optional[str] = None
if StorageEngine.CLP_S == storage_engine:
dataset = CLP_DEFAULT_DATASET_NAME

if FIND_COMMAND == parsed_args.subcommand:
return _find_archives(
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
dataset,
parsed_args.begin_ts,
parsed_args.end_ts,
)
Expand All @@ -207,8 +212,7 @@ def main(argv: typing.List[str]) -> int:
return _delete_archives(
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
dataset,
delete_handler,
parsed_args.dry_run,
)
Expand All @@ -219,8 +223,7 @@ def main(argv: typing.List[str]) -> int:
return _delete_archives(
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
dataset,
delete_handler,
parsed_args.dry_run,
)
Expand All @@ -235,8 +238,7 @@ def main(argv: typing.List[str]) -> int:
def _find_archives(
archives_dir: Path,
database_config: Database,
storage_engine: StorageEngine,
dataset: str,
dataset: typing.Optional[str],
begin_ts: int,
end_ts: int = typing.Optional[int],
) -> int:
Expand All @@ -245,7 +247,6 @@ def _find_archives(
`begin_ts <= archive.begin_timestamp` and `archive.end_timestamp <= end_ts`.
:param archives_dir:
:param database_config:
:param storage_engine:
:param dataset:
:param begin_ts:
:param end_ts:
Expand All @@ -259,16 +260,15 @@ def _find_archives(
database_config.get_clp_connection_params_and_type(True)
)
table_prefix: str = clp_db_connection_params["table_prefix"]
if StorageEngine.CLP_S == storage_engine:
table_prefix = f"{table_prefix}{dataset}_"

with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
query_params: typing.List[int] = [begin_ts]
archives_table_name = get_archives_table_name(table_prefix, dataset)
query: str = (
f"""
SELECT id FROM `{table_prefix}{ARCHIVES_TABLE_SUFFIX}`
SELECT id FROM `{archives_table_name}`
WHERE begin_timestamp >= %s
"""
)
Expand All @@ -285,9 +285,12 @@ def _find_archives(
return 0

logger.info(f"Found {len(archive_ids)} archives within the specified time range.")
archive_output_dir: Path = (
archives_dir / dataset if dataset is not None else archives_dir
)
for archive_id in archive_ids:
logger.info(archive_id)
archive_path: Path = archives_dir / dataset / archive_id
archive_path = archive_output_dir / archive_id
if not archive_path.is_dir():
logger.warning(f"Archive {archive_id} in database not found on disk.")

Expand All @@ -302,7 +305,6 @@ def _find_archives(
def _delete_archives(
archives_dir: Path,
database_config: Database,
storage_engine: StorageEngine,
dataset: str,
delete_handler: DeleteHandler,
dry_run: bool = False,
Expand All @@ -312,7 +314,6 @@ def _delete_archives(

:param archives_dir:
:param database_config:
:param storage_engine:
:param dataset:
:param delete_handler: Object to handle differences between by-filter and by-ids delete types.
:param dry_run: If True, no changes will be made to the database or disk.
Expand All @@ -327,8 +328,6 @@ def _delete_archives(
database_config.get_clp_connection_params_and_type(True)
)
table_prefix = clp_db_connection_params["table_prefix"]
if StorageEngine.CLP_S == storage_engine:
table_prefix = f"{table_prefix}{dataset}_"

with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
Expand All @@ -338,10 +337,11 @@ def _delete_archives(

query_criteria: str = delete_handler.get_criteria()
query_params: typing.List[str] = delete_handler.get_params()
archives_table_name = get_archives_table_name(table_prefix, dataset)

db_cursor.execute(
f"""
DELETE FROM `{table_prefix}{ARCHIVES_TABLE_SUFFIX}`
DELETE FROM `{archives_table_name}`
WHERE {query_criteria}
RETURNING id
""",
Expand All @@ -357,17 +357,19 @@ def _delete_archives(
delete_handler.validate_results(archive_ids)

ids_list_string: str = ", ".join(["'%s'"] * len(archive_ids))
files_table_name = get_files_table_name(table_prefix, dataset)
archive_tags_table_name = get_archive_tags_table_name(table_prefix, dataset)

db_cursor.execute(
f"""
DELETE FROM `{table_prefix}{FILES_TABLE_SUFFIX}`
DELETE FROM `{files_table_name}`
WHERE archive_id in ({ids_list_string})
"""
)

db_cursor.execute(
f"""
DELETE FROM `{table_prefix}{ARCHIVE_TAGS_TABLE_SUFFIX}`
DELETE FROM `{archive_tags_table_name}`
WHERE archive_id in ({ids_list_string})
"""
)
Expand All @@ -387,8 +389,9 @@ def _delete_archives(

logger.info(f"Finished deleting archives from the database.")

archive_output_dir: Path = archives_dir / dataset if dataset is not None else archives_dir
for archive_id in archive_ids:
archive_path: Path = archives_dir / dataset / archive_id
archive_path = archive_output_dir / archive_id
if not archive_path.is_dir():
logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.")
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
Database,
FILES_TABLE_SUFFIX,
)
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import yaml
from clp_py_utils.clp_config import (
ALL_TARGET_NAME,
ARCHIVES_TABLE_SUFFIX,
AwsAuthType,
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
Expand All @@ -23,7 +22,6 @@
COMPRESSION_WORKER_COMPONENT_NAME,
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
FILES_TABLE_SUFFIX,
LOG_VIEWER_WEBUI_COMPONENT_NAME,
QUERY_JOBS_TABLE_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
Expand All @@ -36,6 +34,7 @@
StorageType,
WEBUI_COMPONENT_NAME,
)
from clp_py_utils.clp_metadata_db_utils import get_archives_table_name, get_files_table_name
from clp_py_utils.s3_utils import generate_container_auth_options
from job_orchestration.scheduler.constants import QueueName
from pydantic import BaseModel
Expand Down Expand Up @@ -866,15 +865,16 @@ def start_webui(instance_id: str, clp_config: CLPConfig, mounts: CLPDockerMounts
# Read and update settings.json
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
table_prefix = clp_db_connection_params["table_prefix"]
dataset: Optional[str] = None
if StorageEngine.CLP_S == clp_config.package.storage_engine:
table_prefix = f"{table_prefix}{CLP_DEFAULT_DATASET_NAME}_"
dataset = CLP_DEFAULT_DATASET_NAME
Comment on lines +873 to +875
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Minor nit: collapse the two-step dataset assignment

The explicit type annotation is helpful, but the two-step pattern is verbose. You could inline it without losing clarity:

-    dataset: Optional[str] = None
-    if StorageEngine.CLP_S == clp_config.package.storage_engine:
-        dataset = CLP_DEFAULT_DATASET_NAME
+    dataset: Optional[str] = (
+        CLP_DEFAULT_DATASET_NAME
+        if StorageEngine.CLP_S == clp_config.package.storage_engine
+        else None
+    )

Reduces cognitive load and keeps the flow tight.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dataset: Optional[str] = None
if StorageEngine.CLP_S == clp_config.package.storage_engine:
table_prefix = f"{table_prefix}{CLP_DEFAULT_DATASET_NAME}_"
dataset = CLP_DEFAULT_DATASET_NAME
dataset: Optional[str] = (
CLP_DEFAULT_DATASET_NAME
if StorageEngine.CLP_S == clp_config.package.storage_engine
else None
)
🤖 Prompt for AI Agents
In components/clp-package-utils/clp_package_utils/scripts/start_clp.py around
lines 873 to 875, the variable 'dataset' is assigned in two steps with an
explicit type annotation followed by a conditional assignment. Simplify this by
combining the declaration and conditional assignment into a single line using a
conditional expression, preserving the Optional[str] type clarity while reducing
verbosity and improving readability.

meteor_settings_updates = {
"private": {
"SqlDbHost": clp_config.database.host,
"SqlDbPort": clp_config.database.port,
"SqlDbName": clp_config.database.name,
"SqlDbClpArchivesTableName": f"{table_prefix}{ARCHIVES_TABLE_SUFFIX}",
"SqlDbClpFilesTableName": f"{table_prefix}{FILES_TABLE_SUFFIX}",
"SqlDbClpArchivesTableName": get_archives_table_name(table_prefix, dataset),
"SqlDbClpFilesTableName": get_files_table_name(table_prefix, dataset),
"SqlDbCompressionJobsTableName": COMPRESSION_JOBS_TABLE_NAME,
"SqlDbQueryJobsTableName": QUERY_JOBS_TABLE_NAME,
},
Expand Down
62 changes: 49 additions & 13 deletions components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,38 @@
)


def _generic_get_table_name(prefix: str, suffix: str, dataset: str | None) -> str:
table_name = prefix
if dataset is not None:
table_name += f"{dataset}_"
table_name += suffix
return table_name


def get_archives_table_name(table_prefix: str, dataset: str | None) -> str:
return _generic_get_table_name(table_prefix, ARCHIVES_TABLE_SUFFIX, dataset)


def get_tags_table_name(table_prefix: str, dataset: str | None) -> str:
return _generic_get_table_name(table_prefix, TAGS_TABLE_SUFFIX, dataset)


def get_archive_tags_table_name(table_prefix: str, dataset: str | None) -> str:
return _generic_get_table_name(table_prefix, ARCHIVE_TAGS_TABLE_SUFFIX, dataset)


def get_files_table_name(table_prefix: str, dataset: str | None) -> str:
return _generic_get_table_name(table_prefix, FILES_TABLE_SUFFIX, dataset)


def get_column_metadata_table_name(table_prefix: str, dataset: str | None) -> str:
return _generic_get_table_name(table_prefix, COLUMN_METADATA_TABLE_SUFFIX, dataset)


def get_datasets_table_name(table_prefix: str) -> str:
return _generic_get_table_name(table_prefix, DATASETS_TABLE_SUFFIX, None)


def _create_archives_table(db_cursor, archives_table_name: str) -> None:
db_cursor.execute(
f"""
Expand Down Expand Up @@ -63,10 +95,11 @@ def _create_archive_tags_table(
)


def _create_files_table(db_cursor, table_prefix: str) -> None:
def _create_files_table(db_cursor, table_prefix: str, dataset: str | None) -> None:
files_table_name = get_files_table_name(table_prefix, dataset)
db_cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS `{table_prefix}{FILES_TABLE_SUFFIX}` (
CREATE TABLE IF NOT EXISTS `{files_table_name}` (
`id` VARCHAR(64) NOT NULL,
`orig_file_id` VARCHAR(64) NOT NULL,
`path` VARCHAR(12288) NOT NULL,
Expand All @@ -84,10 +117,11 @@ def _create_files_table(db_cursor, table_prefix: str) -> None:
)


def _create_column_metadata_table(db_cursor, table_prefix: str) -> None:
def _create_column_metadata_table(db_cursor, table_prefix: str, dataset: str) -> None:
column_metadata_table_name = get_column_metadata_table_name(table_prefix, dataset)
db_cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS `{table_prefix}{COLUMN_METADATA_TABLE_SUFFIX}` (
CREATE TABLE IF NOT EXISTS `{column_metadata_table_name}` (
`name` VARCHAR(512) NOT NULL,
`type` TINYINT NOT NULL,
PRIMARY KEY (`name`, `type`)
Expand All @@ -106,9 +140,10 @@ def create_datasets_table(db_cursor, table_prefix: str) -> None:

# For a description of the table, see
# `../../../docs/src/dev-guide/design-metadata-db.md`
datasets_table_name = get_datasets_table_name(table_prefix)
db_cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS `{table_prefix}{DATASETS_TABLE_SUFFIX}` (
CREATE TABLE IF NOT EXISTS `{datasets_table_name}` (
`name` VARCHAR(255) NOT NULL,
`archive_storage_type` VARCHAR(64) NOT NULL,
`archive_storage_directory` VARCHAR(4096) NOT NULL,
Expand Down Expand Up @@ -137,12 +172,14 @@ def add_dataset(
:param archive_storage_type:
:param dataset_archive_storage_directory:
"""
query = f"""INSERT INTO `{table_prefix}{DATASETS_TABLE_SUFFIX}`
datasets_table_name = get_datasets_table_name(table_prefix)
query = f"""INSERT INTO `{datasets_table_name}`
(name, archive_storage_type, archive_storage_directory)
VALUES (%s, %s, %s)
"""
db_cursor.execute(
query, (dataset_name, archive_storage_type, str(dataset_archive_storage_directory))
query,
(dataset_name, archive_storage_type, str(dataset_archive_storage_directory / dataset_name)),
)
create_metadata_db_tables(db_cursor, table_prefix, dataset_name)
db_conn.commit()
Expand Down Expand Up @@ -172,16 +209,15 @@ def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None
:param dataset: If set, all tables will be named in a dataset-specific manner.
"""
if dataset is not None:
table_prefix = f"{table_prefix}{dataset}_"
_create_column_metadata_table(db_cursor, table_prefix)
_create_column_metadata_table(db_cursor, table_prefix, dataset)

archives_table_name = f"{table_prefix}{ARCHIVES_TABLE_SUFFIX}"
tags_table_name = f"{table_prefix}{TAGS_TABLE_SUFFIX}"
archive_tags_table_name = f"{table_prefix}{ARCHIVE_TAGS_TABLE_SUFFIX}"
archives_table_name = get_archives_table_name(table_prefix, dataset)
tags_table_name = get_tags_table_name(table_prefix, dataset)
archive_tags_table_name = get_archive_tags_table_name(table_prefix, dataset)

_create_archives_table(db_cursor, archives_table_name)
_create_tags_table(db_cursor, tags_table_name)
_create_archive_tags_table(
db_cursor, archive_tags_table_name, archives_table_name, tags_table_name
)
_create_files_table(db_cursor, table_prefix)
_create_files_table(db_cursor, table_prefix, dataset)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sql_adapter import SQL_Adapter

from clp_py_utils.clp_config import (
CLP_DEFAULT_DATASET_NAME,
Database,
StorageEngine,
)
Expand Down Expand Up @@ -55,6 +56,10 @@ def main(argv):
) as metadata_db_cursor:
if StorageEngine.CLP_S == storage_engine:
create_datasets_table(metadata_db_cursor, table_prefix)
# Note: the webui still expect tables with default dataset
Copy link
Contributor Author

@haiqi96 haiqi96 Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still a small issue: the CLP_DEFAULT_DATASET_NAME is not added to the dataset table until the first compression job with dataset = CLP_DEFAULT_DATASET_NAME.
This behavior will not cause any functional bug so I feel it is acceptable, but we can also explicility add the CLP_DEFAULT_DATASET_NAME to the datasets table here by calling add_dataset

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's added when we compress the first archive that uses the default dataset. You guys can add the set of metadata tables for default dataset here if the webui requires it upon launching.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, this change creates the tables for CLP_DEFAULT_DATASET_NAME but does not add CLP_DEFAULT_DATASET_NAME to the clp_datasets table.
It's more proper to use add_dataset( but that would require bringing in clp_config.archive_output and introduce a ripple of change in the argparsers.

@kirkrodrigues what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call add_datasets in start_clp.py? That way we wouldn't need to pass in the config file.

create_metadata_db_tables(
metadata_db_cursor, table_prefix, CLP_DEFAULT_DATASET_NAME
)
else:
create_metadata_db_tables(metadata_db_cursor, table_prefix)
metadata_db.commit()
Expand Down
10 changes: 6 additions & 4 deletions components/clp-py-utils/clp_py_utils/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,14 @@ def s3_get_object_metadata(s3_input_config: S3InputConfig) -> List[FileMetadata]
return file_metadata_list


def s3_put(s3_config: S3Config, src_file: Path, dest_file_name: str) -> None:
def s3_put(s3_config: S3Config, src_file: Path, dest_path: str) -> None:
"""
Uploads a local file to an S3 bucket using AWS's PutObject operation.
Uploads a local file to an S3 bucket using AWS's PutObject operation. The file uploaded will
have `key` = `s3_config.key_prefix` + `dest_path`.
:param s3_config: S3 configuration specifying the upload destination and credentials.
:param src_file: Local file to upload.
:param dest_file_name: The name for the uploaded file in the S3 bucket.
:param dest_path: The destination path for the uploaded file in the S3 bucket. The path will be
relative to `s3_config.key_prefix`.
:raises: ValueError if `src_file` doesn't exist, doesn't resolve to a file or is larger than the
S3 PutObject limit.
:raises: Propagates `boto3.client`'s exceptions.
Expand All @@ -304,5 +306,5 @@ def s3_put(s3_config: S3Config, src_file: Path, dest_file_name: str) -> None:

with open(src_file, "rb") as file_data:
s3_client.put_object(
Bucket=s3_config.bucket, Body=file_data, Key=s3_config.key_prefix + dest_file_name
Bucket=s3_config.bucket, Body=file_data, Key=s3_config.key_prefix + dest_path
)
Loading