diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index a045de8521..d3f01a5220 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -8,13 +8,15 @@ from pathlib import Path from clp_py_utils.clp_config import ( - ARCHIVE_TAGS_TABLE_SUFFIX, - ARCHIVES_TABLE_SUFFIX, CLP_DEFAULT_DATASET_NAME, Database, - FILES_TABLE_SUFFIX, StorageEngine, ) +from clp_py_utils.clp_metadata_db_utils import ( + get_archive_tags_table_name, + get_archives_table_name, + get_files_table_name, +) from clp_py_utils.sql_adapter import SQL_Adapter from clp_package_utils.general import ( @@ -191,12 +193,15 @@ def main(argv: typing.List[str]) -> int: logger.error("`archive_output.directory` doesn't exist.") return -1 + dataset: typing.Optional[str] = None + if StorageEngine.CLP_S == storage_engine: + dataset = CLP_DEFAULT_DATASET_NAME + if FIND_COMMAND == parsed_args.subcommand: return _find_archives( archives_dir, database_config, - storage_engine, - CLP_DEFAULT_DATASET_NAME, + dataset, parsed_args.begin_ts, parsed_args.end_ts, ) @@ -207,8 +212,7 @@ def main(argv: typing.List[str]) -> int: return _delete_archives( archives_dir, database_config, - storage_engine, - CLP_DEFAULT_DATASET_NAME, + dataset, delete_handler, parsed_args.dry_run, ) @@ -219,8 +223,7 @@ def main(argv: typing.List[str]) -> int: return _delete_archives( archives_dir, database_config, - storage_engine, - CLP_DEFAULT_DATASET_NAME, + dataset, delete_handler, parsed_args.dry_run, ) @@ -235,8 +238,7 @@ def main(argv: typing.List[str]) -> int: def _find_archives( archives_dir: Path, database_config: Database, - storage_engine: StorageEngine, - dataset: str, + dataset: typing.Optional[str], begin_ts: int, end_ts: int = typing.Optional[int], ) -> int: @@ -245,7 +247,6 @@ def _find_archives( `begin_ts <= archive.begin_timestamp` and `archive.end_timestamp <= end_ts`. :param archives_dir: :param database_config: - :param storage_engine: :param dataset: :param begin_ts: :param end_ts: @@ -259,8 +260,6 @@ def _find_archives( database_config.get_clp_connection_params_and_type(True) ) table_prefix: str = clp_db_connection_params["table_prefix"] - if StorageEngine.CLP_S == storage_engine: - table_prefix = f"{table_prefix}{dataset}_" with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) @@ -268,7 +267,7 @@ def _find_archives( query_params: typing.List[int] = [begin_ts] query: str = ( f""" - SELECT id FROM `{table_prefix}{ARCHIVES_TABLE_SUFFIX}` + SELECT id FROM `{get_archives_table_name(table_prefix, dataset)}` WHERE begin_timestamp >= %s """ ) @@ -285,9 +284,10 @@ def _find_archives( return 0 logger.info(f"Found {len(archive_ids)} archives within the specified time range.") + archive_output_dir = archives_dir / dataset if dataset is not None else archives_dir for archive_id in archive_ids: logger.info(archive_id) - archive_path: Path = archives_dir / dataset / archive_id + archive_path = archive_output_dir / archive_id if not archive_path.is_dir(): logger.warning(f"Archive {archive_id} in database not found on disk.") @@ -302,7 +302,6 @@ def _find_archives( def _delete_archives( archives_dir: Path, database_config: Database, - storage_engine: StorageEngine, dataset: str, delete_handler: DeleteHandler, dry_run: bool = False, @@ -312,7 +311,6 @@ def _delete_archives( :param archives_dir: :param database_config: - :param storage_engine: :param dataset: :param delete_handler: Object to handle differences between by-filter and by-ids delete types. :param dry_run: If True, no changes will be made to the database or disk. @@ -327,8 +325,6 @@ def _delete_archives( database_config.get_clp_connection_params_and_type(True) ) table_prefix = clp_db_connection_params["table_prefix"] - if StorageEngine.CLP_S == storage_engine: - table_prefix = f"{table_prefix}{dataset}_" with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) @@ -341,7 +337,7 @@ def _delete_archives( db_cursor.execute( f""" - DELETE FROM `{table_prefix}{ARCHIVES_TABLE_SUFFIX}` + DELETE FROM `{get_archives_table_name(table_prefix, dataset)}` WHERE {query_criteria} RETURNING id """, @@ -360,14 +356,14 @@ def _delete_archives( db_cursor.execute( f""" - DELETE FROM `{table_prefix}{FILES_TABLE_SUFFIX}` + DELETE FROM `{get_files_table_name(table_prefix, dataset)}` WHERE archive_id in ({ids_list_string}) """ ) db_cursor.execute( f""" - DELETE FROM `{table_prefix}{ARCHIVE_TAGS_TABLE_SUFFIX}` + DELETE FROM `{get_archive_tags_table_name(table_prefix, dataset)}` WHERE archive_id in ({ids_list_string}) """ ) @@ -387,8 +383,9 @@ def _delete_archives( logger.info(f"Finished deleting archives from the database.") + archive_output_dir: Path = archives_dir / dataset if dataset is not None else archives_dir for archive_id in archive_ids: - archive_path: Path = archives_dir / dataset / archive_id + archive_path = archive_output_dir / archive_id if not archive_path.is_dir(): logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.") continue diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index ab1b5b2aea..b59a058605 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -4,13 +4,17 @@ import pathlib import sys import time -import typing from contextlib import closing -from typing import List +from typing import List, Optional, Union import brotli import msgpack -from clp_py_utils.clp_config import CLPConfig, COMPRESSION_JOBS_TABLE_NAME +from clp_py_utils.clp_config import ( + CLP_DEFAULT_DATASET_NAME, + CLPConfig, + COMPRESSION_JOBS_TABLE_NAME, + StorageEngine, +) from clp_py_utils.pretty_size import pretty_size from clp_py_utils.s3_utils import parse_s3_url from clp_py_utils.sql_adapter import SQL_Adapter @@ -132,21 +136,29 @@ def handle_job(sql_adapter: SQL_Adapter, clp_io_config: ClpIoConfig, no_progress def _generate_clp_io_config( - clp_config: CLPConfig, logs_to_compress: List[str], parsed_args: argparse.Namespace -) -> typing.Union[S3InputConfig, FsInputConfig]: - input_type = clp_config.logs_input.type + clp_config: CLPConfig, + logs_to_compress: List[str], + parsed_args: argparse.Namespace, +) -> Union[S3InputConfig, FsInputConfig]: + dataset = ( + CLP_DEFAULT_DATASET_NAME + if StorageEngine.CLP_S == clp_config.package.storage_engine + else None + ) + input_type = clp_config.logs_input.type if InputType.FS == input_type: if len(logs_to_compress) == 0: - raise ValueError(f"No input paths given.") + raise ValueError("No input paths given.") return FsInputConfig( + dataset=dataset, paths_to_compress=logs_to_compress, timestamp_key=parsed_args.timestamp_key, path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR), ) elif InputType.S3 == input_type: if len(logs_to_compress) == 0: - raise ValueError(f"No URLs given.") + raise ValueError("No URLs given.") elif len(logs_to_compress) != 1: raise ValueError(f"Too many URLs: {len(logs_to_compress)} > 1") @@ -154,6 +166,7 @@ def _generate_clp_io_config( region_code, bucket_name, key_prefix = parse_s3_url(s3_url) aws_authentication = clp_config.logs_input.aws_authentication return S3InputConfig( + dataset=dataset, region_code=region_code, bucket=bucket_name, key_prefix=key_prefix, diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py index 1f67300734..4c6647e30f 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py @@ -13,8 +13,8 @@ CLP_DEFAULT_DATASET_NAME, CLPConfig, Database, - FILES_TABLE_SUFFIX, ) +from clp_py_utils.clp_metadata_db_utils import get_files_table_name from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType from job_orchestration.scheduler.job_config import ( @@ -54,8 +54,9 @@ def get_orig_file_id(db_config: Database, path: str) -> Optional[str]: with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: + files_table_name = get_files_table_name(table_prefix, None) db_cursor.execute( - f"SELECT orig_file_id FROM `{table_prefix}{FILES_TABLE_SUFFIX}` WHERE path = (%s)", + f"SELECT orig_file_id FROM `{files_table_name}` WHERE path = (%s)", (path,), ) results = db_cursor.fetchall() diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/search.py b/components/clp-package-utils/clp_package_utils/scripts/native/search.py index d292656a77..1407b72ec2 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/search.py @@ -10,7 +10,12 @@ import msgpack import pymongo -from clp_py_utils.clp_config import Database, ResultsCache +from clp_py_utils.clp_config import ( + CLP_DEFAULT_DATASET_NAME, + Database, + ResultsCache, + StorageEngine, +) from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType from job_orchestration.scheduler.job_config import AggregationConfig, SearchJobConfig @@ -32,6 +37,7 @@ def create_and_monitor_job_in_db( db_config: Database, results_cache: ResultsCache, + dataset: str | None, wildcard_query: str, tags: str | None, begin_timestamp: int | None, @@ -43,6 +49,7 @@ def create_and_monitor_job_in_db( count_by_time_bucket_size: int | None, ): search_config = SearchJobConfig( + dataset=dataset, query_string=wildcard_query, begin_timestamp=begin_timestamp, end_timestamp=end_timestamp, @@ -113,6 +120,7 @@ async def worker_connection_handler(reader: asyncio.StreamReader, writer: asynci async def do_search_without_aggregation( db_config: Database, results_cache: ResultsCache, + dataset: str | None, wildcard_query: str, tags: str | None, begin_timestamp: int | None, @@ -147,6 +155,7 @@ async def do_search_without_aggregation( create_and_monitor_job_in_db, db_config, results_cache, + dataset, wildcard_query, tags, begin_timestamp, @@ -184,6 +193,7 @@ async def do_search_without_aggregation( async def do_search( db_config: Database, results_cache: ResultsCache, + dataset: str | None, wildcard_query: str, tags: str | None, begin_timestamp: int | None, @@ -198,6 +208,7 @@ async def do_search( await do_search_without_aggregation( db_config, results_cache, + dataset, wildcard_query, tags, begin_timestamp, @@ -211,6 +222,7 @@ async def do_search( create_and_monitor_job_in_db, db_config, results_cache, + dataset, wildcard_query, tags, begin_timestamp, @@ -281,11 +293,17 @@ def main(argv): logger.exception("Failed to load config.") return -1 + dataset = ( + CLP_DEFAULT_DATASET_NAME + if StorageEngine.CLP_S == clp_config.package.storage_engine + else None + ) try: asyncio.run( do_search( clp_config.database, clp_config.results_cache, + dataset, parsed_args.wildcard_query, parsed_args.tags, parsed_args.begin_time, diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 1bb6741a4c..db6f66bc8a 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -14,7 +14,6 @@ import yaml from clp_py_utils.clp_config import ( ALL_TARGET_NAME, - ARCHIVES_TABLE_SUFFIX, AwsAuthType, CLP_DEFAULT_DATASET_NAME, CLPConfig, @@ -23,7 +22,6 @@ COMPRESSION_WORKER_COMPONENT_NAME, CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME, - FILES_TABLE_SUFFIX, QUERY_JOBS_TABLE_NAME, QUERY_SCHEDULER_COMPONENT_NAME, QUERY_WORKER_COMPONENT_NAME, @@ -35,6 +33,10 @@ StorageType, WEBUI_COMPONENT_NAME, ) +from clp_py_utils.clp_metadata_db_utils import ( + get_archives_table_name, + get_files_table_name, +) from clp_py_utils.s3_utils import generate_container_auth_options from job_orchestration.scheduler.constants import QueueName from pydantic import BaseModel @@ -868,13 +870,14 @@ def start_webui( # Read, update, and write back client's and server's settings.json clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True) table_prefix = clp_db_connection_params["table_prefix"] + dataset: Optional[str] = None if StorageEngine.CLP_S == clp_config.package.storage_engine: - table_prefix = f"{table_prefix}{CLP_DEFAULT_DATASET_NAME}_" + dataset = CLP_DEFAULT_DATASET_NAME client_settings_json_updates = { "ClpStorageEngine": clp_config.package.storage_engine, "MongoDbSearchResultsMetadataCollectionName": clp_config.webui.results_metadata_collection_name, - "SqlDbClpArchivesTableName": f"{table_prefix}{ARCHIVES_TABLE_SUFFIX}", - "SqlDbClpFilesTableName": f"{table_prefix}{FILES_TABLE_SUFFIX}", + "SqlDbClpArchivesTableName": get_archives_table_name(table_prefix, dataset), + "SqlDbClpFilesTableName": get_files_table_name(table_prefix, dataset), "SqlDbCompressionJobsTableName": COMPRESSION_JOBS_TABLE_NAME, } client_settings_json = read_and_update_settings_json( @@ -884,6 +887,7 @@ def start_webui( client_settings_json_file.write(json.dumps(client_settings_json)) server_settings_json_updates = { + "ClpStorageEngine": clp_config.package.storage_engine, "SqlDbHost": clp_config.database.host, "SqlDbPort": clp_config.database.port, "SqlDbName": clp_config.database.name, diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index 0e4477dd82..5776b9c364 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -36,13 +36,6 @@ COMPRESSION_JOBS_TABLE_NAME = "compression_jobs" COMPRESSION_TASKS_TABLE_NAME = "compression_tasks" -ARCHIVE_TAGS_TABLE_SUFFIX = "archive_tags" -ARCHIVES_TABLE_SUFFIX = "archives" -COLUMN_METADATA_TABLE_SUFFIX = "column_metadata" -DATASETS_TABLE_SUFFIX = "datasets" -FILES_TABLE_SUFFIX = "files" -TAGS_TABLE_SUFFIX = "tags" - OS_RELEASE_FILE_PATH = pathlib.Path("etc") / "os-release" CLP_DEFAULT_CREDENTIALS_FILE_PATH = pathlib.Path("etc") / "credentials.yml" diff --git a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py index 4aeed84dc7..a5e43f9a52 100644 --- a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py +++ b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py @@ -4,14 +4,18 @@ from typing import Set from clp_py_utils.clp_config import ( - ARCHIVE_TAGS_TABLE_SUFFIX, - ARCHIVES_TABLE_SUFFIX, - COLUMN_METADATA_TABLE_SUFFIX, - DATASETS_TABLE_SUFFIX, - FILES_TABLE_SUFFIX, - TAGS_TABLE_SUFFIX, + ArchiveOutput, + StorageType, ) +# Constants +ARCHIVE_TAGS_TABLE_SUFFIX = "archive_tags" +ARCHIVES_TABLE_SUFFIX = "archives" +COLUMN_METADATA_TABLE_SUFFIX = "column_metadata" +DATASETS_TABLE_SUFFIX = "datasets" +FILES_TABLE_SUFFIX = "files" +TAGS_TABLE_SUFFIX = "tags" + def _create_archives_table(db_cursor, archives_table_name: str) -> None: db_cursor.execute( @@ -62,10 +66,10 @@ def _create_archive_tags_table( ) -def _create_files_table(db_cursor, table_prefix: str) -> None: +def _create_files_table(db_cursor, table_prefix: str, dataset: str | None) -> None: db_cursor.execute( f""" - CREATE TABLE IF NOT EXISTS `{table_prefix}{FILES_TABLE_SUFFIX}` ( + CREATE TABLE IF NOT EXISTS `{get_files_table_name(table_prefix, dataset)}` ( `id` VARCHAR(64) NOT NULL, `orig_file_id` VARCHAR(64) NOT NULL, `path` VARCHAR(12288) NOT NULL, @@ -83,10 +87,10 @@ def _create_files_table(db_cursor, table_prefix: str) -> None: ) -def _create_column_metadata_table(db_cursor, table_prefix: str) -> None: +def _create_column_metadata_table(db_cursor, table_prefix: str, dataset: str) -> None: db_cursor.execute( f""" - CREATE TABLE IF NOT EXISTS `{table_prefix}{COLUMN_METADATA_TABLE_SUFFIX}` ( + CREATE TABLE IF NOT EXISTS `{get_column_metadata_table_name(table_prefix, dataset)}` ( `name` VARCHAR(512) NOT NULL, `type` TINYINT NOT NULL, PRIMARY KEY (`name`, `type`) @@ -95,6 +99,20 @@ def _create_column_metadata_table(db_cursor, table_prefix: str) -> None: ) +def _get_table_name(prefix: str, suffix: str, dataset: str | None) -> str: + """ + :param prefix: + :param suffix: + :param dataset: + :return: The table name in the form of "[_]". + """ + table_name = prefix + if dataset is not None: + table_name += f"{dataset}_" + table_name += suffix + return table_name + + def create_datasets_table(db_cursor, table_prefix: str) -> None: """ Creates the datasets information table. @@ -107,7 +125,7 @@ def create_datasets_table(db_cursor, table_prefix: str) -> None: # `../../../docs/src/dev-guide/design-metadata-db.md` db_cursor.execute( f""" - CREATE TABLE IF NOT EXISTS `{table_prefix}{DATASETS_TABLE_SUFFIX}` ( + CREATE TABLE IF NOT EXISTS `{get_datasets_table_name(table_prefix)}` ( `name` VARCHAR(255) NOT NULL, `archive_storage_directory` VARCHAR(4096) NOT NULL, PRIMARY KEY (`name`) @@ -121,7 +139,7 @@ def add_dataset( db_cursor, table_prefix: str, dataset_name: str, - dataset_archive_storage_directory: Path, + archive_output: ArchiveOutput, ) -> None: """ Inserts a new dataset into the `datasets` table and creates the corresponding standard set of @@ -131,13 +149,23 @@ def add_dataset( :param db_cursor: The database cursor to execute the table row insertion. :param table_prefix: A string to prepend to the table name. :param dataset_name: - :param dataset_archive_storage_directory: + :param archive_output: """ - query = f"""INSERT INTO `{table_prefix}{DATASETS_TABLE_SUFFIX}` + archive_storage_directory: Path + if StorageType.S3 == archive_output.storage.type: + s3_config = archive_output.storage.s3_config + archive_storage_directory = Path(s3_config.key_prefix) + else: + archive_storage_directory = archive_output.get_directory() + + query = f"""INSERT INTO `{get_datasets_table_name(table_prefix)}` (name, archive_storage_directory) VALUES (%s, %s) """ - db_cursor.execute(query, (dataset_name, str(dataset_archive_storage_directory))) + db_cursor.execute( + query, + (dataset_name, str(archive_storage_directory / dataset_name)), + ) create_metadata_db_tables(db_cursor, table_prefix, dataset_name) db_conn.commit() @@ -152,7 +180,7 @@ def fetch_existing_datasets( :param db_cursor: :param table_prefix: """ - db_cursor.execute(f"SELECT name FROM `{table_prefix}{DATASETS_TABLE_SUFFIX}`") + db_cursor.execute(f"SELECT name FROM `{get_datasets_table_name(table_prefix)}`") rows = db_cursor.fetchall() return {row["name"] for row in rows} @@ -166,16 +194,39 @@ def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None :param dataset: If set, all tables will be named in a dataset-specific manner. """ if dataset is not None: - table_prefix = f"{table_prefix}{dataset}_" - _create_column_metadata_table(db_cursor, table_prefix) + _create_column_metadata_table(db_cursor, table_prefix, dataset) - archives_table_name = f"{table_prefix}{ARCHIVES_TABLE_SUFFIX}" - tags_table_name = f"{table_prefix}{TAGS_TABLE_SUFFIX}" - archive_tags_table_name = f"{table_prefix}{ARCHIVE_TAGS_TABLE_SUFFIX}" + archives_table_name = get_archives_table_name(table_prefix, dataset) + tags_table_name = get_tags_table_name(table_prefix, dataset) + archive_tags_table_name = get_archive_tags_table_name(table_prefix, dataset) _create_archives_table(db_cursor, archives_table_name) _create_tags_table(db_cursor, tags_table_name) _create_archive_tags_table( db_cursor, archive_tags_table_name, archives_table_name, tags_table_name ) - _create_files_table(db_cursor, table_prefix) + _create_files_table(db_cursor, table_prefix, dataset) + + +def get_archive_tags_table_name(table_prefix: str, dataset: str | None) -> str: + return _get_table_name(table_prefix, ARCHIVE_TAGS_TABLE_SUFFIX, dataset) + + +def get_archives_table_name(table_prefix: str, dataset: str | None) -> str: + return _get_table_name(table_prefix, ARCHIVES_TABLE_SUFFIX, dataset) + + +def get_column_metadata_table_name(table_prefix: str, dataset: str | None) -> str: + return _get_table_name(table_prefix, COLUMN_METADATA_TABLE_SUFFIX, dataset) + + +def get_datasets_table_name(table_prefix: str) -> str: + return _get_table_name(table_prefix, DATASETS_TABLE_SUFFIX, None) + + +def get_files_table_name(table_prefix: str, dataset: str | None) -> str: + return _get_table_name(table_prefix, FILES_TABLE_SUFFIX, dataset) + + +def get_tags_table_name(table_prefix: str, dataset: str | None) -> str: + return _get_table_name(table_prefix, TAGS_TABLE_SUFFIX, dataset) diff --git a/components/clp-py-utils/clp_py_utils/s3_utils.py b/components/clp-py-utils/clp_py_utils/s3_utils.py index dbc7d0981b..9600b3fc9d 100644 --- a/components/clp-py-utils/clp_py_utils/s3_utils.py +++ b/components/clp-py-utils/clp_py_utils/s3_utils.py @@ -279,12 +279,14 @@ def s3_get_object_metadata(s3_input_config: S3InputConfig) -> List[FileMetadata] return file_metadata_list -def s3_put(s3_config: S3Config, src_file: Path, dest_file_name: str) -> None: +def s3_put(s3_config: S3Config, src_file: Path, dest_path: str) -> None: """ Uploads a local file to an S3 bucket using AWS's PutObject operation. + :param s3_config: S3 configuration specifying the upload destination and credentials. :param src_file: Local file to upload. - :param dest_file_name: The name for the uploaded file in the S3 bucket. + :param dest_path: The destination path for the uploaded file in the S3 bucket, relative to + `s3_config.key_prefix` (the file's S3 key will be `s3_config.key_prefix` + `dest_path`). :raises: ValueError if `src_file` doesn't exist, doesn't resolve to a file or is larger than the S3 PutObject limit. :raises: Propagates `boto3.client`'s exceptions. @@ -304,5 +306,5 @@ def s3_put(s3_config: S3Config, src_file: Path, dest_file_name: str) -> None: with open(src_file, "rb") as file_data: s3_client.put_object( - Bucket=s3_config.bucket, Body=file_data, Key=s3_config.key_prefix + dest_file_name + Bucket=s3_config.bucket, Body=file_data, Key=s3_config.key_prefix + dest_path ) diff --git a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py index 70ef99a765..f5d23f4c74 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py @@ -10,8 +10,6 @@ from celery.app.task import Task from celery.utils.log import get_task_logger from clp_py_utils.clp_config import ( - ARCHIVE_TAGS_TABLE_SUFFIX, - ARCHIVES_TABLE_SUFFIX, COMPRESSION_JOBS_TABLE_NAME, COMPRESSION_TASKS_TABLE_NAME, Database, @@ -21,6 +19,10 @@ WorkerConfig, ) from clp_py_utils.clp_logging import set_logging_level +from clp_py_utils.clp_metadata_db_utils import ( + get_archive_tags_table_name, + get_archives_table_name, +) from clp_py_utils.core import read_yaml_config_file from clp_py_utils.s3_utils import ( generate_s3_virtual_hosted_style_url, @@ -68,19 +70,32 @@ def increment_compression_job_metadata(db_cursor, job_id, kv): db_cursor.execute(query) -def update_tags(db_cursor, table_prefix, archive_id, tag_ids): +def update_tags( + db_cursor, + table_prefix: str, + dataset: Optional[str], + archive_id: str, + tag_ids: List[int], +) -> None: db_cursor.executemany( f""" - INSERT INTO {table_prefix}{ARCHIVE_TAGS_TABLE_SUFFIX} (archive_id, tag_id) + INSERT INTO {get_archive_tags_table_name(table_prefix, dataset)} (archive_id, tag_id) VALUES (%s, %s) """, [(archive_id, tag_id) for tag_id in tag_ids], ) -def update_job_metadata_and_tags(db_cursor, job_id, table_prefix, tag_ids, archive_stats): +def update_job_metadata_and_tags( + db_cursor, + job_id: int, + table_prefix: str, + dataset: Optional[str], + tag_ids: List[int], + archive_stats: Dict[str, Any], +) -> None: if tag_ids is not None: - update_tags(db_cursor, table_prefix, archive_stats["id"], tag_ids) + update_tags(db_cursor, table_prefix, dataset, archive_stats["id"], tag_ids) increment_compression_job_metadata( db_cursor, job_id, @@ -91,7 +106,12 @@ def update_job_metadata_and_tags(db_cursor, job_id, table_prefix, tag_ids, archi ) -def update_archive_metadata(db_cursor, table_prefix, archive_stats): +def update_archive_metadata( + db_cursor, + table_prefix: str, + dataset: Optional[str], + archive_stats: Dict[str, Any], +) -> None: stats_to_update = { # Use defaults for values clp-s doesn't output "creation_ix": 0, @@ -115,9 +135,8 @@ def update_archive_metadata(db_cursor, table_prefix, archive_stats): keys = ", ".join(stats_to_update.keys()) value_placeholders = ", ".join(["%s"] * len(stats_to_update)) - query = ( - f"INSERT INTO {table_prefix}{ARCHIVES_TABLE_SUFFIX} ({keys}) VALUES ({value_placeholders})" - ) + archives_table_name = get_archives_table_name(table_prefix, dataset) + query = f"INSERT INTO {archives_table_name} ({keys}) VALUES ({value_placeholders})" db_cursor.execute(query, list(stats_to_update.values())) @@ -154,6 +173,16 @@ def _generate_s3_logs_list( file.write("\n") +def _upload_archive_to_s3( + s3_config: S3Config, + archive_src_path: pathlib.Path, + archive_id: str, + dataset: Optional[str], +): + dest_path = f"{dataset}/{archive_id}" if dataset is not None else archive_id + s3_put(s3_config, archive_src_path, dest_path) + + def _make_clp_command_and_env( clp_home: pathlib.Path, archive_output_dir: pathlib.Path, @@ -293,7 +322,7 @@ def run_clp( enable_s3_write = True table_prefix = clp_metadata_db_connection_config["table_prefix"] - input_dataset: str + dataset = clp_config.input.dataset if StorageEngine.CLP == clp_storage_engine: compression_cmd, compression_env = _make_clp_command_and_env( clp_home=clp_home, @@ -302,12 +331,7 @@ def run_clp( db_config_file_path=db_config_file_path, ) elif StorageEngine.CLP_S == clp_storage_engine: - input_dataset = clp_config.input.dataset - table_prefix = f"{table_prefix}{input_dataset}_" - archive_output_dir = archive_output_dir / input_dataset - if StorageType.S3 == storage_type: - s3_config.key_prefix = f"{s3_config.key_prefix}{input_dataset}/" - + archive_output_dir = archive_output_dir / dataset compression_cmd, compression_env = _make_clp_s_command_and_env( clp_home=clp_home, archive_output_dir=archive_output_dir, @@ -370,7 +394,7 @@ def run_clp( if s3_error is None: logger.info(f"Uploading archive {archive_id} to S3...") try: - s3_put(s3_config, archive_path, archive_id) + _upload_archive_to_s3(s3_config, archive_path, archive_id, dataset) logger.info(f"Finished uploading archive {archive_id} to S3.") except Exception as err: logger.exception(f"Failed to upload archive {archive_id}") @@ -388,11 +412,14 @@ def run_clp( db_conn.cursor(dictionary=True) ) as db_cursor: if StorageEngine.CLP_S == clp_storage_engine: - update_archive_metadata(db_cursor, table_prefix, last_archive_stats) + update_archive_metadata( + db_cursor, table_prefix, dataset, last_archive_stats + ) update_job_metadata_and_tags( db_cursor, job_id, table_prefix, + dataset, tag_ids, last_archive_stats, ) @@ -403,7 +430,7 @@ def run_clp( str(clp_home / "bin" / "indexer"), "--db-config-file", str(db_config_file_path), - input_dataset, + dataset, archive_path, ] try: diff --git a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py index 1fdb3839a9..9aa317b5e0 100644 --- a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py @@ -97,10 +97,10 @@ def _make_clp_s_command_and_env_vars( dataset = extract_json_config.dataset if StorageType.S3 == storage_type: s3_config = worker_config.archive_output.storage.s3_config - s3_config.key_prefix = f"{s3_config.key_prefix}{dataset}/" + s3_object_key = f"{s3_config.key_prefix}{dataset}/{archive_id}" try: s3_url = generate_s3_virtual_hosted_style_url( - s3_config.region_code, s3_config.bucket, f"{s3_config.key_prefix}{archive_id}" + s3_config.region_code, s3_config.bucket, s3_object_key ) except ValueError as ex: logger.error(f"Encountered error while generating S3 url: {ex}") diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index 171d64e125..8994ff0185 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -62,10 +62,10 @@ def _make_core_clp_s_command_and_env_vars( dataset = search_config.dataset if StorageType.S3 == worker_config.archive_output.storage.type: s3_config = worker_config.archive_output.storage.s3_config - s3_config.key_prefix = f"{s3_config.key_prefix}{dataset}/" + s3_object_key = f"{s3_config.key_prefix}{dataset}/{archive_id}" try: s3_url = generate_s3_virtual_hosted_style_url( - s3_config.region_code, s3_config.bucket, f"{s3_config.key_prefix}{archive_id}" + s3_config.region_code, s3_config.bucket, s3_object_key ) except ValueError as ex: logger.error(f"Encountered error while generating S3 url: {ex}") diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index 04ba5f5164..ec2018a2d6 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -18,13 +18,12 @@ COMPRESSION_JOBS_TABLE_NAME, COMPRESSION_TASKS_TABLE_NAME, StorageEngine, - StorageType, - TAGS_TABLE_SUFFIX, ) from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level from clp_py_utils.clp_metadata_db_utils import ( add_dataset, fetch_existing_datasets, + get_tags_table_name, ) from clp_py_utils.compression import validate_path_and_get_info from clp_py_utils.core import read_yaml_config_file @@ -158,7 +157,6 @@ def search_and_schedule_new_tasks( db_conn, db_cursor, clp_metadata_db_connection_config: Dict[str, Any], - clp_storage_engine: StorageEngine, clp_archive_output: ArchiveOutput, existing_datasets: Set[str], ): @@ -167,7 +165,6 @@ def search_and_schedule_new_tasks( :param db_conn: :param db_cursor: :param clp_metadata_db_connection_config: - :param clp_storage_engine: :param clp_archive_output: :param existing_datasets: """ @@ -186,24 +183,19 @@ def search_and_schedule_new_tasks( input_config = clp_io_config.input table_prefix = clp_metadata_db_connection_config["table_prefix"] - if StorageEngine.CLP_S == clp_storage_engine: - dataset_name = input_config.dataset - if dataset_name not in existing_datasets: - archive_storage_directory: Path - if StorageType.S3 == clp_archive_output.storage.type: - s3_config = clp_archive_output.storage.s3_config - archive_storage_directory = Path(s3_config.key_prefix) / dataset_name - else: - archive_storage_directory = clp_archive_output.get_directory() / dataset_name - add_dataset( - db_conn, - db_cursor, - table_prefix, - dataset_name, - archive_storage_directory, - ) - existing_datasets.add(dataset_name) - table_prefix = f"{table_prefix}{dataset_name}_" + dataset = input_config.dataset + + if dataset is not None and dataset not in existing_datasets: + add_dataset( + db_conn, + db_cursor, + table_prefix, + dataset, + clp_archive_output, + ) + + # NOTE: This assumes we never delete a dataset + existing_datasets.add(dataset) paths_to_compress_buffer = PathsToCompressBuffer( maintain_file_ordering=False, @@ -276,13 +268,14 @@ def search_and_schedule_new_tasks( tag_ids = None if clp_io_config.output.tags: + tags_table_name = get_tags_table_name(table_prefix, dataset) db_cursor.executemany( - f"INSERT IGNORE INTO {table_prefix}{TAGS_TABLE_SUFFIX} (tag_name) VALUES (%s)", + f"INSERT IGNORE INTO {tags_table_name} (tag_name) VALUES (%s)", [(tag,) for tag in clp_io_config.output.tags], ) db_conn.commit() db_cursor.execute( - f"SELECT tag_id FROM {table_prefix}{TAGS_TABLE_SUFFIX} WHERE tag_name IN (%s)" + f"SELECT tag_id FROM {tags_table_name} WHERE tag_name IN (%s)" % ", ".join(["%s"] * len(clp_io_config.output.tags)), clp_io_config.output.tags, ) @@ -428,9 +421,8 @@ def main(argv): clp_metadata_db_connection_config = ( sql_adapter.database_config.get_clp_connection_params_and_type(True) ) - clp_storage_engine = clp_config.package.storage_engine existing_datasets: Set[str] = set() - if StorageEngine.CLP_S == clp_storage_engine: + if StorageEngine.CLP_S == clp_config.package.storage_engine: existing_datasets = fetch_existing_datasets( db_cursor, clp_metadata_db_connection_config["table_prefix"] ) @@ -442,7 +434,6 @@ def main(argv): db_conn, db_cursor, clp_metadata_db_connection_config, - clp_storage_engine, clp_config.archive_output, existing_datasets, ) diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index 11155349df..b59076c22f 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -3,7 +3,7 @@ import typing from enum import auto -from clp_py_utils.clp_config import CLP_DEFAULT_DATASET_NAME, S3Config +from clp_py_utils.clp_config import S3Config from pydantic import BaseModel, validator from strenum import LowercaseStrEnum @@ -22,7 +22,7 @@ class PathsToCompress(BaseModel): class FsInputConfig(BaseModel): type: typing.Literal[InputType.FS.value] = InputType.FS.value - dataset: str = CLP_DEFAULT_DATASET_NAME + dataset: typing.Optional[str] = None paths_to_compress: typing.List[str] path_prefix_to_remove: str = None timestamp_key: typing.Optional[str] = None @@ -30,7 +30,7 @@ class FsInputConfig(BaseModel): class S3InputConfig(S3Config): type: typing.Literal[InputType.S3.value] = InputType.S3.value - dataset: str = CLP_DEFAULT_DATASET_NAME + dataset: typing.Optional[str] = None timestamp_key: typing.Optional[str] = None @@ -57,7 +57,7 @@ class AggregationConfig(BaseModel): class QueryJobConfig(BaseModel): - dataset: str = CLP_DEFAULT_DATASET_NAME + dataset: typing.Optional[str] = None class ExtractIrJobConfig(QueryJobConfig): diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 95beb0d109..c3789a28e1 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -31,17 +31,18 @@ import msgpack import pymongo from clp_py_utils.clp_config import ( - ARCHIVE_TAGS_TABLE_SUFFIX, - ARCHIVES_TABLE_SUFFIX, CLPConfig, - FILES_TABLE_SUFFIX, QUERY_JOBS_TABLE_NAME, QUERY_TASKS_TABLE_NAME, - StorageEngine, - TAGS_TABLE_SUFFIX, ) from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level -from clp_py_utils.clp_metadata_db_utils import fetch_existing_datasets +from clp_py_utils.clp_metadata_db_utils import ( + fetch_existing_datasets, + get_archive_tags_table_name, + get_archives_table_name, + get_files_table_name, + get_tags_table_name, +) from clp_py_utils.core import read_yaml_config_file from clp_py_utils.decorators import exception_default_value from clp_py_utils.sql_adapter import SQL_Adapter @@ -167,7 +168,7 @@ def __init__( super().__init__(job_id) self.__job_config = ExtractJsonJobConfig.parse_obj(job_config) self._archive_id = self.__job_config.archive_id - if not archive_exists(db_conn, table_prefix, self._archive_id): + if not archive_exists(db_conn, table_prefix, self.__job_config.dataset, self._archive_id): raise ValueError(f"Archive {self._archive_id} doesn't exist") def get_stream_id(self) -> str: @@ -389,8 +390,9 @@ def get_archives_for_search( table_prefix: str, search_config: SearchJobConfig, ): + dataset = search_config.dataset query = f"""SELECT id as archive_id, end_timestamp - FROM {table_prefix}{ARCHIVES_TABLE_SUFFIX} + FROM {get_archives_table_name(table_prefix, dataset)} """ filter_clauses = [] if search_config.end_timestamp is not None: @@ -398,9 +400,11 @@ def get_archives_for_search( if search_config.begin_timestamp is not None: filter_clauses.append(f"end_timestamp >= {search_config.begin_timestamp}") if search_config.tags is not None: + archive_tags_table_name = get_archive_tags_table_name(table_prefix, dataset) + tags_table_name = get_tags_table_name(table_prefix, dataset) filter_clauses.append( - f"id IN (SELECT archive_id FROM {table_prefix}{ARCHIVE_TAGS_TABLE_SUFFIX} WHERE " - f"tag_id IN (SELECT tag_id FROM {table_prefix}{TAGS_TABLE_SUFFIX} WHERE tag_name IN " + f"id IN (SELECT archive_id FROM {archive_tags_table_name} WHERE " + f"tag_id IN (SELECT tag_id FROM {tags_table_name} WHERE tag_name IN " f"(%s)))" % ", ".join(["%s" for _ in search_config.tags]) ) if len(filter_clauses) > 0: @@ -456,9 +460,8 @@ def get_archive_and_file_split_ids( :return: A list of (archive id, file split id) on success. An empty list if an exception occurs while interacting with the database. """ - query = f"""SELECT archive_id, id as file_split_id - FROM {table_prefix}{FILES_TABLE_SUFFIX} WHERE + FROM {get_files_table_name(table_prefix, None)} WHERE orig_file_id = '{orig_file_id}' AND begin_message_ix <= {msg_ix} AND (begin_message_ix + num_messages) > {msg_ix} @@ -474,9 +477,11 @@ def get_archive_and_file_split_ids( def archive_exists( db_conn, table_prefix: str, + dataset: Optional[str], archive_id: str, ) -> bool: - query = f"SELECT 1 FROM {table_prefix}{ARCHIVES_TABLE_SUFFIX} WHERE id = %s" + archives_table_name = get_archives_table_name(table_prefix, dataset) + query = f"SELECT 1 FROM {archives_table_name} WHERE id = %s" with contextlib.closing(db_conn.cursor(dictionary=True)) as cursor: cursor.execute(query, (archive_id,)) if cursor.fetchone(): @@ -622,14 +627,16 @@ def _validate_dataset( ) -> bool: if dataset in existing_datasets: return True - existing_datasets = fetch_existing_datasets(db_cursor, table_prefix) + + # NOTE: This assumes we never delete a dataset. + new_datasets = fetch_existing_datasets(db_cursor, table_prefix) + existing_datasets.update(new_datasets) return dataset in existing_datasets def handle_pending_query_jobs( db_conn_pool, clp_metadata_db_conn_params: Dict[str, any], - clp_storage_engine: StorageEngine, results_cache_uri: str, stream_collection_name: str, num_archives_to_search_per_sub_job: int, @@ -654,22 +661,22 @@ def handle_pending_query_jobs( job_config = msgpack.unpackb(job["job_config"]) table_prefix = clp_metadata_db_conn_params["table_prefix"] - if StorageEngine.CLP_S == clp_storage_engine: - dataset = QueryJobConfig.parse_obj(job_config).dataset - if not _validate_dataset(db_cursor, table_prefix, dataset, existing_datasets): - logger.error(f"Dataset `{dataset}` doesn't exist.") - if not set_job_or_task_status( - db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - QueryJobStatus.FAILED, - QueryJobStatus.PENDING, - start_time=datetime.datetime.now(), - duration=0, - ): - logger.error(f"Failed to set job {job_id} as failed.") - continue - table_prefix = f"{table_prefix}{dataset}_" + dataset = QueryJobConfig.parse_obj(job_config).dataset + if dataset is not None and not _validate_dataset( + db_cursor, table_prefix, dataset, existing_datasets + ): + logger.error(f"Dataset `{dataset}` doesn't exist.") + if not set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.FAILED, + QueryJobStatus.PENDING, + start_time=datetime.datetime.now(), + duration=0, + ): + logger.error(f"Failed to set job {job_id} as failed.") + continue if QueryJobType.SEARCH_OR_AGGREGATION == job_type: # Avoid double-dispatch when a job is WAITING_FOR_REDUCER @@ -1084,7 +1091,6 @@ async def handle_job_updates(db_conn_pool, results_cache_uri: str, jobs_poll_del async def handle_jobs( db_conn_pool, clp_metadata_db_conn_params: Dict[str, any], - clp_storage_engine: StorageEngine, results_cache_uri: str, stream_collection_name: str, jobs_poll_delay: float, @@ -1100,7 +1106,6 @@ async def handle_jobs( reducer_acquisition_tasks = handle_pending_query_jobs( db_conn_pool, clp_metadata_db_conn_params, - clp_storage_engine, results_cache_uri, stream_collection_name, num_archives_to_search_per_sub_job, @@ -1186,7 +1191,6 @@ async def main(argv: List[str]) -> int: clp_metadata_db_conn_params=clp_config.database.get_clp_connection_params_and_type( True ), - clp_storage_engine=clp_config.package.storage_engine, results_cache_uri=clp_config.results_cache.get_uri(), stream_collection_name=clp_config.results_cache.stream_collection_name, jobs_poll_delay=clp_config.query_scheduler.jobs_poll_delay, diff --git a/components/webui/server/settings.json b/components/webui/server/settings.json index 5f7f789364..877e8df295 100644 --- a/components/webui/server/settings.json +++ b/components/webui/server/settings.json @@ -1,4 +1,6 @@ { + "ClpStorageEngine": "clp", + "SqlDbHost": "localhost", "SqlDbPort": 3306, "SqlDbName": "clp-db", diff --git a/components/webui/server/src/configConstants.ts b/components/webui/server/src/configConstants.ts new file mode 100644 index 0000000000..5841b83378 --- /dev/null +++ b/components/webui/server/src/configConstants.ts @@ -0,0 +1,9 @@ +// NOTE: These settings are duplicated from components/webui/client/src/config/index.ts, but will be +// removed in the near future. +const CLP_STORAGE_ENGINE_CLP_S = "clp-s"; +const CLP_DEFAULT_DATASET_NAME = "default"; + +export { + CLP_DEFAULT_DATASET_NAME, + CLP_STORAGE_ENGINE_CLP_S, +}; diff --git a/components/webui/server/src/fastify-v2/routes/api/search/index.ts b/components/webui/server/src/fastify-v2/routes/api/search/index.ts index c13899e735..d58e65462a 100644 --- a/components/webui/server/src/fastify-v2/routes/api/search/index.ts +++ b/components/webui/server/src/fastify-v2/routes/api/search/index.ts @@ -9,6 +9,10 @@ import { type SearchResultsMetadataDocument, } from "../../../../../../common/index.js"; import settings from "../../../../../settings.json" with {type: "json"}; +import { + CLP_DEFAULT_DATASET_NAME, + CLP_STORAGE_ENGINE_CLP_S, +} from "../../../../configConstants.js"; import {ErrorSchema} from "../../../schemas/error.js"; import { QueryJobCreationSchema, @@ -69,6 +73,9 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => { const args = { begin_timestamp: timestampBegin, + dataset: CLP_STORAGE_ENGINE_CLP_S === settings.ClpStorageEngine ? + CLP_DEFAULT_DATASET_NAME : + null, end_timestamp: timestampEnd, ignore_case: ignoreCase, max_num_results: SEARCH_MAX_NUM_RESULTS, diff --git a/components/webui/server/src/plugins/DbManager.ts b/components/webui/server/src/plugins/DbManager.ts index ca2ed88f18..ea1e4e901a 100644 --- a/components/webui/server/src/plugins/DbManager.ts +++ b/components/webui/server/src/plugins/DbManager.ts @@ -9,6 +9,11 @@ import { ResultSetHeader, } from "mysql2/promise"; +import settings from "../../settings.json" with {type: "json"}; +import { + CLP_DEFAULT_DATASET_NAME, + CLP_STORAGE_ENGINE_CLP_S, +} from "../configConstants.js"; import {Nullable} from "../typings/common.js"; import { DbManagerOptions, @@ -125,6 +130,9 @@ class DbManager { }; } else if (QUERY_JOB_TYPE.EXTRACT_JSON === jobType) { jobConfig = { + dataset: CLP_STORAGE_ENGINE_CLP_S === settings.ClpStorageEngine ? + CLP_DEFAULT_DATASET_NAME : + null, archive_id: streamId, target_chunk_size: targetUncompressedSize, };