Skip to content

Commit 5cb0800

Browse files
authored
refactor(clp-package): Refactor dataset-related logic: (#1023)
- Add helper methods to generate SQL table names. - Make `dataset` optional in job configs. - Change if-guards around dataset logic from checking for a specific storage engine to checking whether the dataset is set.
1 parent 0798100 commit 5cb0800

File tree

18 files changed

+288
-161
lines changed

18 files changed

+288
-161
lines changed

components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
from pathlib import Path
99

1010
from clp_py_utils.clp_config import (
11-
ARCHIVE_TAGS_TABLE_SUFFIX,
12-
ARCHIVES_TABLE_SUFFIX,
1311
CLP_DEFAULT_DATASET_NAME,
1412
Database,
15-
FILES_TABLE_SUFFIX,
1613
StorageEngine,
1714
)
15+
from clp_py_utils.clp_metadata_db_utils import (
16+
get_archive_tags_table_name,
17+
get_archives_table_name,
18+
get_files_table_name,
19+
)
1820
from clp_py_utils.sql_adapter import SQL_Adapter
1921

2022
from clp_package_utils.general import (
@@ -191,12 +193,15 @@ def main(argv: typing.List[str]) -> int:
191193
logger.error("`archive_output.directory` doesn't exist.")
192194
return -1
193195

196+
dataset: typing.Optional[str] = None
197+
if StorageEngine.CLP_S == storage_engine:
198+
dataset = CLP_DEFAULT_DATASET_NAME
199+
194200
if FIND_COMMAND == parsed_args.subcommand:
195201
return _find_archives(
196202
archives_dir,
197203
database_config,
198-
storage_engine,
199-
CLP_DEFAULT_DATASET_NAME,
204+
dataset,
200205
parsed_args.begin_ts,
201206
parsed_args.end_ts,
202207
)
@@ -207,8 +212,7 @@ def main(argv: typing.List[str]) -> int:
207212
return _delete_archives(
208213
archives_dir,
209214
database_config,
210-
storage_engine,
211-
CLP_DEFAULT_DATASET_NAME,
215+
dataset,
212216
delete_handler,
213217
parsed_args.dry_run,
214218
)
@@ -219,8 +223,7 @@ def main(argv: typing.List[str]) -> int:
219223
return _delete_archives(
220224
archives_dir,
221225
database_config,
222-
storage_engine,
223-
CLP_DEFAULT_DATASET_NAME,
226+
dataset,
224227
delete_handler,
225228
parsed_args.dry_run,
226229
)
@@ -235,8 +238,7 @@ def main(argv: typing.List[str]) -> int:
235238
def _find_archives(
236239
archives_dir: Path,
237240
database_config: Database,
238-
storage_engine: StorageEngine,
239-
dataset: str,
241+
dataset: typing.Optional[str],
240242
begin_ts: int,
241243
end_ts: int = typing.Optional[int],
242244
) -> int:
@@ -245,7 +247,6 @@ def _find_archives(
245247
`begin_ts <= archive.begin_timestamp` and `archive.end_timestamp <= end_ts`.
246248
:param archives_dir:
247249
:param database_config:
248-
:param storage_engine:
249250
:param dataset:
250251
:param begin_ts:
251252
:param end_ts:
@@ -259,16 +260,14 @@ def _find_archives(
259260
database_config.get_clp_connection_params_and_type(True)
260261
)
261262
table_prefix: str = clp_db_connection_params["table_prefix"]
262-
if StorageEngine.CLP_S == storage_engine:
263-
table_prefix = f"{table_prefix}{dataset}_"
264263

265264
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
266265
db_conn.cursor(dictionary=True)
267266
) as db_cursor:
268267
query_params: typing.List[int] = [begin_ts]
269268
query: str = (
270269
f"""
271-
SELECT id FROM `{table_prefix}{ARCHIVES_TABLE_SUFFIX}`
270+
SELECT id FROM `{get_archives_table_name(table_prefix, dataset)}`
272271
WHERE begin_timestamp >= %s
273272
"""
274273
)
@@ -285,9 +284,10 @@ def _find_archives(
285284
return 0
286285

287286
logger.info(f"Found {len(archive_ids)} archives within the specified time range.")
287+
archive_output_dir = archives_dir / dataset if dataset is not None else archives_dir
288288
for archive_id in archive_ids:
289289
logger.info(archive_id)
290-
archive_path: Path = archives_dir / dataset / archive_id
290+
archive_path = archive_output_dir / archive_id
291291
if not archive_path.is_dir():
292292
logger.warning(f"Archive {archive_id} in database not found on disk.")
293293

@@ -302,7 +302,6 @@ def _find_archives(
302302
def _delete_archives(
303303
archives_dir: Path,
304304
database_config: Database,
305-
storage_engine: StorageEngine,
306305
dataset: str,
307306
delete_handler: DeleteHandler,
308307
dry_run: bool = False,
@@ -312,7 +311,6 @@ def _delete_archives(
312311
313312
:param archives_dir:
314313
:param database_config:
315-
:param storage_engine:
316314
:param dataset:
317315
:param delete_handler: Object to handle differences between by-filter and by-ids delete types.
318316
:param dry_run: If True, no changes will be made to the database or disk.
@@ -327,8 +325,6 @@ def _delete_archives(
327325
database_config.get_clp_connection_params_and_type(True)
328326
)
329327
table_prefix = clp_db_connection_params["table_prefix"]
330-
if StorageEngine.CLP_S == storage_engine:
331-
table_prefix = f"{table_prefix}{dataset}_"
332328

333329
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
334330
db_conn.cursor(dictionary=True)
@@ -341,7 +337,7 @@ def _delete_archives(
341337

342338
db_cursor.execute(
343339
f"""
344-
DELETE FROM `{table_prefix}{ARCHIVES_TABLE_SUFFIX}`
340+
DELETE FROM `{get_archives_table_name(table_prefix, dataset)}`
345341
WHERE {query_criteria}
346342
RETURNING id
347343
""",
@@ -360,14 +356,14 @@ def _delete_archives(
360356

361357
db_cursor.execute(
362358
f"""
363-
DELETE FROM `{table_prefix}{FILES_TABLE_SUFFIX}`
359+
DELETE FROM `{get_files_table_name(table_prefix, dataset)}`
364360
WHERE archive_id in ({ids_list_string})
365361
"""
366362
)
367363

368364
db_cursor.execute(
369365
f"""
370-
DELETE FROM `{table_prefix}{ARCHIVE_TAGS_TABLE_SUFFIX}`
366+
DELETE FROM `{get_archive_tags_table_name(table_prefix, dataset)}`
371367
WHERE archive_id in ({ids_list_string})
372368
"""
373369
)
@@ -387,8 +383,9 @@ def _delete_archives(
387383

388384
logger.info(f"Finished deleting archives from the database.")
389385

386+
archive_output_dir: Path = archives_dir / dataset if dataset is not None else archives_dir
390387
for archive_id in archive_ids:
391-
archive_path: Path = archives_dir / dataset / archive_id
388+
archive_path = archive_output_dir / archive_id
392389
if not archive_path.is_dir():
393390
logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.")
394391
continue

components/clp-package-utils/clp_package_utils/scripts/native/compress.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@
44
import pathlib
55
import sys
66
import time
7-
import typing
87
from contextlib import closing
9-
from typing import List
8+
from typing import List, Optional, Union
109

1110
import brotli
1211
import msgpack
13-
from clp_py_utils.clp_config import CLPConfig, COMPRESSION_JOBS_TABLE_NAME
12+
from clp_py_utils.clp_config import (
13+
CLP_DEFAULT_DATASET_NAME,
14+
CLPConfig,
15+
COMPRESSION_JOBS_TABLE_NAME,
16+
StorageEngine,
17+
)
1418
from clp_py_utils.pretty_size import pretty_size
1519
from clp_py_utils.s3_utils import parse_s3_url
1620
from clp_py_utils.sql_adapter import SQL_Adapter
@@ -132,28 +136,37 @@ def handle_job(sql_adapter: SQL_Adapter, clp_io_config: ClpIoConfig, no_progress
132136

133137

134138
def _generate_clp_io_config(
135-
clp_config: CLPConfig, logs_to_compress: List[str], parsed_args: argparse.Namespace
136-
) -> typing.Union[S3InputConfig, FsInputConfig]:
137-
input_type = clp_config.logs_input.type
139+
clp_config: CLPConfig,
140+
logs_to_compress: List[str],
141+
parsed_args: argparse.Namespace,
142+
) -> Union[S3InputConfig, FsInputConfig]:
143+
dataset = (
144+
CLP_DEFAULT_DATASET_NAME
145+
if StorageEngine.CLP_S == clp_config.package.storage_engine
146+
else None
147+
)
138148

149+
input_type = clp_config.logs_input.type
139150
if InputType.FS == input_type:
140151
if len(logs_to_compress) == 0:
141-
raise ValueError(f"No input paths given.")
152+
raise ValueError("No input paths given.")
142153
return FsInputConfig(
154+
dataset=dataset,
143155
paths_to_compress=logs_to_compress,
144156
timestamp_key=parsed_args.timestamp_key,
145157
path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR),
146158
)
147159
elif InputType.S3 == input_type:
148160
if len(logs_to_compress) == 0:
149-
raise ValueError(f"No URLs given.")
161+
raise ValueError("No URLs given.")
150162
elif len(logs_to_compress) != 1:
151163
raise ValueError(f"Too many URLs: {len(logs_to_compress)} > 1")
152164

153165
s3_url = logs_to_compress[0]
154166
region_code, bucket_name, key_prefix = parse_s3_url(s3_url)
155167
aws_authentication = clp_config.logs_input.aws_authentication
156168
return S3InputConfig(
169+
dataset=dataset,
157170
region_code=region_code,
158171
bucket=bucket_name,
159172
key_prefix=key_prefix,

components/clp-package-utils/clp_package_utils/scripts/native/decompress.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
CLP_DEFAULT_DATASET_NAME,
1414
CLPConfig,
1515
Database,
16-
FILES_TABLE_SUFFIX,
1716
)
17+
from clp_py_utils.clp_metadata_db_utils import get_files_table_name
1818
from clp_py_utils.sql_adapter import SQL_Adapter
1919
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
2020
from job_orchestration.scheduler.job_config import (
@@ -54,8 +54,9 @@ def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
5454
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
5555
db_conn.cursor(dictionary=True)
5656
) as db_cursor:
57+
files_table_name = get_files_table_name(table_prefix, None)
5758
db_cursor.execute(
58-
f"SELECT orig_file_id FROM `{table_prefix}{FILES_TABLE_SUFFIX}` WHERE path = (%s)",
59+
f"SELECT orig_file_id FROM `{files_table_name}` WHERE path = (%s)",
5960
(path,),
6061
)
6162
results = db_cursor.fetchall()

components/clp-package-utils/clp_package_utils/scripts/native/search.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@
1010

1111
import msgpack
1212
import pymongo
13-
from clp_py_utils.clp_config import Database, ResultsCache
13+
from clp_py_utils.clp_config import (
14+
CLP_DEFAULT_DATASET_NAME,
15+
Database,
16+
ResultsCache,
17+
StorageEngine,
18+
)
1419
from clp_py_utils.sql_adapter import SQL_Adapter
1520
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
1621
from job_orchestration.scheduler.job_config import AggregationConfig, SearchJobConfig
@@ -32,6 +37,7 @@
3237
def create_and_monitor_job_in_db(
3338
db_config: Database,
3439
results_cache: ResultsCache,
40+
dataset: str | None,
3541
wildcard_query: str,
3642
tags: str | None,
3743
begin_timestamp: int | None,
@@ -43,6 +49,7 @@ def create_and_monitor_job_in_db(
4349
count_by_time_bucket_size: int | None,
4450
):
4551
search_config = SearchJobConfig(
52+
dataset=dataset,
4653
query_string=wildcard_query,
4754
begin_timestamp=begin_timestamp,
4855
end_timestamp=end_timestamp,
@@ -113,6 +120,7 @@ async def worker_connection_handler(reader: asyncio.StreamReader, writer: asynci
113120
async def do_search_without_aggregation(
114121
db_config: Database,
115122
results_cache: ResultsCache,
123+
dataset: str | None,
116124
wildcard_query: str,
117125
tags: str | None,
118126
begin_timestamp: int | None,
@@ -147,6 +155,7 @@ async def do_search_without_aggregation(
147155
create_and_monitor_job_in_db,
148156
db_config,
149157
results_cache,
158+
dataset,
150159
wildcard_query,
151160
tags,
152161
begin_timestamp,
@@ -184,6 +193,7 @@ async def do_search_without_aggregation(
184193
async def do_search(
185194
db_config: Database,
186195
results_cache: ResultsCache,
196+
dataset: str | None,
187197
wildcard_query: str,
188198
tags: str | None,
189199
begin_timestamp: int | None,
@@ -198,6 +208,7 @@ async def do_search(
198208
await do_search_without_aggregation(
199209
db_config,
200210
results_cache,
211+
dataset,
201212
wildcard_query,
202213
tags,
203214
begin_timestamp,
@@ -211,6 +222,7 @@ async def do_search(
211222
create_and_monitor_job_in_db,
212223
db_config,
213224
results_cache,
225+
dataset,
214226
wildcard_query,
215227
tags,
216228
begin_timestamp,
@@ -281,11 +293,17 @@ def main(argv):
281293
logger.exception("Failed to load config.")
282294
return -1
283295

296+
dataset = (
297+
CLP_DEFAULT_DATASET_NAME
298+
if StorageEngine.CLP_S == clp_config.package.storage_engine
299+
else None
300+
)
284301
try:
285302
asyncio.run(
286303
do_search(
287304
clp_config.database,
288305
clp_config.results_cache,
306+
dataset,
289307
parsed_args.wildcard_query,
290308
parsed_args.tags,
291309
parsed_args.begin_time,

components/clp-package-utils/clp_package_utils/scripts/start_clp.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import yaml
1515
from clp_py_utils.clp_config import (
1616
ALL_TARGET_NAME,
17-
ARCHIVES_TABLE_SUFFIX,
1817
AwsAuthType,
1918
CLP_DEFAULT_DATASET_NAME,
2019
CLPConfig,
@@ -23,7 +22,6 @@
2322
COMPRESSION_WORKER_COMPONENT_NAME,
2423
CONTROLLER_TARGET_NAME,
2524
DB_COMPONENT_NAME,
26-
FILES_TABLE_SUFFIX,
2725
QUERY_JOBS_TABLE_NAME,
2826
QUERY_SCHEDULER_COMPONENT_NAME,
2927
QUERY_WORKER_COMPONENT_NAME,
@@ -35,6 +33,10 @@
3533
StorageType,
3634
WEBUI_COMPONENT_NAME,
3735
)
36+
from clp_py_utils.clp_metadata_db_utils import (
37+
get_archives_table_name,
38+
get_files_table_name,
39+
)
3840
from clp_py_utils.s3_utils import generate_container_auth_options
3941
from job_orchestration.scheduler.constants import QueueName
4042
from pydantic import BaseModel
@@ -868,13 +870,14 @@ def start_webui(
868870
# Read, update, and write back client's and server's settings.json
869871
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
870872
table_prefix = clp_db_connection_params["table_prefix"]
873+
dataset: Optional[str] = None
871874
if StorageEngine.CLP_S == clp_config.package.storage_engine:
872-
table_prefix = f"{table_prefix}{CLP_DEFAULT_DATASET_NAME}_"
875+
dataset = CLP_DEFAULT_DATASET_NAME
873876
client_settings_json_updates = {
874877
"ClpStorageEngine": clp_config.package.storage_engine,
875878
"MongoDbSearchResultsMetadataCollectionName": clp_config.webui.results_metadata_collection_name,
876-
"SqlDbClpArchivesTableName": f"{table_prefix}{ARCHIVES_TABLE_SUFFIX}",
877-
"SqlDbClpFilesTableName": f"{table_prefix}{FILES_TABLE_SUFFIX}",
879+
"SqlDbClpArchivesTableName": get_archives_table_name(table_prefix, dataset),
880+
"SqlDbClpFilesTableName": get_files_table_name(table_prefix, dataset),
878881
"SqlDbCompressionJobsTableName": COMPRESSION_JOBS_TABLE_NAME,
879882
}
880883
client_settings_json = read_and_update_settings_json(
@@ -884,6 +887,7 @@ def start_webui(
884887
client_settings_json_file.write(json.dumps(client_settings_json))
885888

886889
server_settings_json_updates = {
890+
"ClpStorageEngine": clp_config.package.storage_engine,
887891
"SqlDbHost": clp_config.database.host,
888892
"SqlDbPort": clp_config.database.port,
889893
"SqlDbName": clp_config.database.name,

0 commit comments

Comments
 (0)