Skip to content
Closed
Show file tree
Hide file tree
Changes from 91 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
d18dbda
Remove all static references of CLP_METADATA_TABLE_PREFIX
Bill-hbrhbr Apr 29, 2025
e43a802
Misc changes
Bill-hbrhbr Apr 29, 2025
0336d57
Properly parse database object
Bill-hbrhbr Apr 29, 2025
0a66aa6
Merge branch 'main' into use-table-constants
Bill-hbrhbr Apr 29, 2025
ce8dfe0
Use datasets where applicable
Bill-hbrhbr Apr 29, 2025
00716b2
Merge branch 'main' into enable-dataset
Bill-hbrhbr Apr 30, 2025
b790aea
Create datasets cache and dataset specific tables
Bill-hbrhbr Apr 30, 2025
9c0ffcb
Remove extra change
Bill-hbrhbr Apr 30, 2025
c80b4b0
Grammar fix
Bill-hbrhbr Apr 30, 2025
07de9e9
Remove creation of the default dataset tables because compression sch…
Bill-hbrhbr Apr 30, 2025
605956d
To be reverted: interface changes
Bill-hbrhbr Apr 30, 2025
b7a432f
Merge branch 'main' into enable-dataset
Bill-hbrhbr May 2, 2025
bb19bf1
Add support for s3 output
Bill-hbrhbr May 2, 2025
1187ea8
fix tab
Bill-hbrhbr May 2, 2025
1d5d0e3
Remove non-existent function
Bill-hbrhbr May 3, 2025
c003634
Properly update the datasets table archive dir column
Bill-hbrhbr May 3, 2025
85f208a
Add missing import
Bill-hbrhbr May 3, 2025
dc6830f
Update datasets cache generation
Bill-hbrhbr May 3, 2025
04457b4
Merge branch 'main' into enable-dataset
Bill-hbrhbr May 3, 2025
1ef6852
Update all places where table prefix needs to be updated with the dat…
Bill-hbrhbr May 3, 2025
b57e1d8
declare order fix
Bill-hbrhbr May 3, 2025
c7fd28f
Revert interface changes
Bill-hbrhbr May 3, 2025
e23bb55
Add docstring
Bill-hbrhbr May 5, 2025
19bc9a2
Remove unnecessary commit for fetch
Bill-hbrhbr May 5, 2025
b4d0106
Add a default value for the local datasets cache
Bill-hbrhbr May 5, 2025
8246abf
remove unnecessary arg in fetching existing dataset
Bill-hbrhbr May 5, 2025
8739653
Add safety handle for dataset registration in metadata db
Bill-hbrhbr May 5, 2025
04fe27f
Merge branch 'main' into enable-dataset
Bill-hbrhbr May 5, 2025
9a09fde
address review concern
Bill-hbrhbr May 5, 2025
e97b0d0
Remove trailing slash from archive_storage_directory in datasets table.
kirkrodrigues May 8, 2025
9dbb594
Merge branch 'main' into enable-dataset
kirkrodrigues May 8, 2025
11b66e0
Add archive_storage_type column; Switch archive_storage_directory to …
kirkrodrigues May 9, 2025
2bf743a
Move FileMetadata into clp_py_utils.core so we don't need to use Leve…
kirkrodrigues May 9, 2025
5a568cb
Shrink archive_storage_type column size.
kirkrodrigues May 27, 2025
7c7fb62
Remove URI protocol from archive_storage_directory.
kirkrodrigues May 27, 2025
059d075
Revert "Remove URI protocol from archive_storage_directory."
kirkrodrigues Jun 8, 2025
268f2a7
Revert "Shrink archive_storage_type column size."
kirkrodrigues Jun 8, 2025
cab0439
Revert "Move FileMetadata into clp_py_utils.core so we don't need to …
kirkrodrigues Jun 8, 2025
6f9b150
Revert "Add archive_storage_type column; Switch archive_storage_direc…
kirkrodrigues Jun 8, 2025
4f3a7cc
Merge branch 'main' into enable-dataset
kirkrodrigues Jun 8, 2025
2e0ba43
Update to reflect changes from #923.
kirkrodrigues Jun 9, 2025
5e54353
Merge branch 'main' into enable-dataset
Bill-hbrhbr Jun 10, 2025
0f0ecd3
Address review concern
Bill-hbrhbr Jun 10, 2025
8ed5aff
Drop unnecessary exception handling
Bill-hbrhbr Jun 11, 2025
be82558
Add dataset functionality to extract json query jobs
Bill-hbrhbr Jun 12, 2025
313cabd
Fix typo
Bill-hbrhbr Jun 12, 2025
5edb090
archive_manager
Bill-hbrhbr Jun 13, 2025
4f3d61c
compress
Bill-hbrhbr Jun 13, 2025
9297cd3
decompress
Bill-hbrhbr Jun 13, 2025
4f0506e
search
Bill-hbrhbr Jun 13, 2025
9440a9d
Fix too early parsing of SearchJobConfig
Bill-hbrhbr Jun 13, 2025
0383ef8
Fix too early parsing of SearchJobConfig
Bill-hbrhbr Jun 13, 2025
86a8512
MOve dataset field into the gneeral QueryJobConfig class
Bill-hbrhbr Jun 13, 2025
c4895cd
MOve dataset field into the gneeral QueryJobConfig class
Bill-hbrhbr Jun 13, 2025
9f5070f
Add missing config
Bill-hbrhbr Jun 13, 2025
7ffbed5
log viewer connector
Jun 16, 2025
cf6c6a3
Merge branch 'main' into enable-dataset
davemarco Jun 16, 2025
f304bc6
Merge branch 'enable-dataset' of https://github.com/Bill-hbrhbr/clp i…
Jun 16, 2025
69ddc3e
fix lint
Jun 16, 2025
a132cd6
Add missing config
Bill-hbrhbr Jun 13, 2025
fd34617
log viewer connector
Jun 16, 2025
ff40e0f
fix lint
Jun 16, 2025
d79e075
Merge branch 'main' into dataset-interface
Bill-hbrhbr Jun 16, 2025
60fb75d
Update components/clp-package-utils/clp_package_utils/scripts/native/…
Bill-hbrhbr Jun 18, 2025
0f4fd74
Merge branch 'main' into dataset-interface
Bill-hbrhbr Jun 18, 2025
5e7c20b
Set job to failed if queried dataset doesn't exist
Bill-hbrhbr Jun 18, 2025
7e6447a
Merge branch 'main' into enable-dataset
Bill-hbrhbr Jun 18, 2025
84065dd
Revert changes that append dataset name to stream output directory
Bill-hbrhbr Jun 19, 2025
6ec5ff7
Apply suggestions from code review
Bill-hbrhbr Jun 19, 2025
42658d0
revert get_orig_file_Id
Bill-hbrhbr Jun 19, 2025
499292a
Append missing dataset component to archive dir path in archive_manager
Bill-hbrhbr Jun 19, 2025
75ba22c
remove extra import
Bill-hbrhbr Jun 19, 2025
a76ba59
simply dataset registration call layers
Bill-hbrhbr Jun 19, 2025
380f16e
Merge branch 'main' into enable-dataset
Bill-hbrhbr Jun 19, 2025
316663c
ui
Jun 19, 2025
e02b1c3
Make create_metadata_db_tables function public again
Bill-hbrhbr Jun 19, 2025
a655588
merge
Jun 19, 2025
944cad5
merge
Jun 19, 2025
da3cd23
merge
Jun 19, 2025
43b74d2
latest
Jun 19, 2025
8ee0f2e
Merge branch 'main' into dataset-interface
Bill-hbrhbr Jun 19, 2025
074f7ad
latest
Jun 19, 2025
344c0c6
remove extra line
Bill-hbrhbr Jun 19, 2025
c045d7b
revert query ts changes
Bill-hbrhbr Jun 19, 2025
c9c4bfc
latest
Jun 19, 2025
a7f4332
latest
Jun 19, 2025
ac28463
latest
Jun 19, 2025
d0aebd9
latest
Jun 20, 2025
1ae6006
latest
Jun 20, 2025
549b0ef
latest
Jun 20, 2025
02687bd
latest
Jun 23, 2025
786e539
Merge branch 'main' into dataset-interface
Bill-hbrhbr Jun 24, 2025
848af5b
Revert merge error in decompress.py
Bill-hbrhbr Jun 24, 2025
528dfe2
Extract dataset checker function into common utils
Bill-hbrhbr Jun 25, 2025
18ebff6
Refactor validate dataset
Bill-hbrhbr Jun 26, 2025
94a231a
Resolve base branch conflicts
Bill-hbrhbr Jun 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import typing
from pathlib import Path

from clp_py_utils.clp_config import StorageType
from clp_py_utils.clp_config import (
CLP_DEFAULT_DATASET_NAME,
StorageType,
)

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand Down Expand Up @@ -61,6 +64,11 @@ def main(argv: typing.List[str]) -> int:
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--dataset",
default=CLP_DEFAULT_DATASET_NAME,
help="The name of the log category.",
)

# Top-level commands
subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = args_parser.add_subparsers(
Expand Down Expand Up @@ -196,6 +204,7 @@ def main(argv: typing.List[str]) -> int:
"python3",
"-m", "clp_package_utils.scripts.native.archive_manager",
"--config", str(generated_config_path_on_container),
"--dataset", str(parsed_args.dataset),
str(subcommand),
]
# fmt : on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import uuid
from typing import List

from clp_py_utils.clp_config import StorageEngine
from clp_py_utils.clp_config import (
CLP_DEFAULT_DATASET_NAME,
StorageEngine,
)
from job_orchestration.scheduler.job_config import InputType

from clp_package_utils.general import (
Expand Down Expand Up @@ -74,6 +77,8 @@ def _generate_compress_cmd(
"--config", str(config_path),
]
# fmt: on
compress_cmd.append("--dataset")
compress_cmd.append(str(parsed_args.dataset))
if parsed_args.timestamp_key is not None:
compress_cmd.append("--timestamp-key")
compress_cmd.append(parsed_args.timestamp_key)
Expand All @@ -82,7 +87,6 @@ def _generate_compress_cmd(
compress_cmd.append(parsed_args.tags)
if parsed_args.no_progress_reporting is True:
compress_cmd.append("--no-progress-reporting")

compress_cmd.append("--logs-list")
compress_cmd.append(str(logs_list_path))

Expand Down Expand Up @@ -131,6 +135,11 @@ def main(argv):
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--dataset",
default=CLP_DEFAULT_DATASET_NAME,
help="The name of the log category.",
)
args_parser.add_argument(
"--timestamp-key",
help="The path (e.g. x.y) for the field containing the log event's timestamp.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import sys
from typing import Optional

from clp_py_utils.clp_config import CLPConfig, StorageEngine, StorageType
from clp_py_utils.clp_config import (
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
StorageEngine,
StorageType,
)

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand Down Expand Up @@ -207,6 +212,8 @@ def handle_extract_stream_cmd(
extract_cmd.append(str(parsed_args.target_uncompressed_size))
elif EXTRACT_JSON_CMD == job_command:
extract_cmd.append(str(parsed_args.archive_id))
extract_cmd.append("--dataset")
extract_cmd.append(str(parsed_args.dataset))
if parsed_args.target_chunk_size:
extract_cmd.append("--target-chunk-size")
extract_cmd.append(str(parsed_args.target_chunk_size))
Expand Down Expand Up @@ -267,6 +274,12 @@ def main(argv):
# JSON extraction command parser
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--dataset",
type=str,
default=CLP_DEFAULT_DATASET_NAME,
help="The name of the log category.",
)
json_extraction_parser.add_argument(
"--target-chunk-size",
type=int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ def main(argv: typing.List[str]) -> int:
default=str(default_config_file_path),
help="CLP configuration file.",
)
args_parser.add_argument(
"--dataset",
default=CLP_DEFAULT_DATASET_NAME,
help="The name of the log category.",
)

# Top-level commands
subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = args_parser.add_subparsers(
Expand Down Expand Up @@ -196,7 +201,7 @@ def main(argv: typing.List[str]) -> int:
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
parsed_args.dataset,
parsed_args.begin_ts,
parsed_args.end_ts,
)
Expand All @@ -208,7 +213,7 @@ def main(argv: typing.List[str]) -> int:
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
parsed_args.dataset,
delete_handler,
parsed_args.dry_run,
)
Expand All @@ -220,7 +225,7 @@ def main(argv: typing.List[str]) -> int:
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
parsed_args.dataset,
delete_handler,
parsed_args.dry_run,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

import brotli
import msgpack
from clp_py_utils.clp_config import CLPConfig, COMPRESSION_JOBS_TABLE_NAME
from clp_py_utils.clp_config import (
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
COMPRESSION_JOBS_TABLE_NAME,
)
from clp_py_utils.pretty_size import pretty_size
from clp_py_utils.s3_utils import parse_s3_url
from clp_py_utils.sql_adapter import SQL_Adapter
Expand Down Expand Up @@ -140,6 +144,7 @@ def _generate_clp_io_config(
if len(logs_to_compress) == 0:
raise ValueError(f"No input paths given.")
return FsInputConfig(
dataset=parsed_args.dataset,
paths_to_compress=logs_to_compress,
timestamp_key=parsed_args.timestamp_key,
path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR),
Expand All @@ -154,6 +159,7 @@ def _generate_clp_io_config(
region_code, bucket_name, key_prefix = parse_s3_url(s3_url)
aws_authentication = clp_config.logs_input.aws_authentication
return S3InputConfig(
dataset=parsed_args.dataset,
region_code=region_code,
bucket=bucket_name,
key_prefix=key_prefix,
Expand Down Expand Up @@ -190,6 +196,11 @@ def main(argv):
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--dataset",
default=CLP_DEFAULT_DATASET_NAME,
help="The name of the log category.",
)
args_parser.add_argument(
"-f",
"--logs-list",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
CLPConfig,
Database,
FILES_TABLE_SUFFIX,
StorageEngine,
)
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
Expand All @@ -40,9 +41,13 @@
logger = logging.getLogger(__file__)


def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
def get_orig_file_id(
db_config: Database, storage_engine: StorageEngine, dataset: str, path: str
) -> Optional[str]:
"""
:param db_config:
:param storage_engine:
:param dataset:
:param path: Path of the original file.
:return: The ID of an original file which has the given path, or None if no such file exists.
NOTE: Multiple original files may have the same path in which case this method returns the ID of
Expand All @@ -51,6 +56,9 @@ def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
sql_adapter = SQL_Adapter(db_config)
clp_db_connection_params = db_config.get_clp_connection_params_and_type(True)
table_prefix = clp_db_connection_params["table_prefix"]
if StorageEngine.CLP_S == storage_engine:
table_prefix = f"{table_prefix}{dataset}_"

with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
Expand Down Expand Up @@ -128,7 +136,12 @@ def handle_extract_stream_cmd(
orig_file_id = parsed_args.orig_file_id
else:
orig_file_path = parsed_args.orig_file_path
orig_file_id = get_orig_file_id(clp_config.database, orig_file_path)
orig_file_id = get_orig_file_id(
clp_config.database,
clp_config.package.storage_engine,
CLP_DEFAULT_DATASET_NAME,
orig_file_path,
)
if orig_file_id is None:
logger.error(f"Cannot find orig_file_id corresponding to '{orig_file_path}'.")
return -1
Expand All @@ -140,7 +153,7 @@ def handle_extract_stream_cmd(
elif EXTRACT_JSON_CMD == command:
job_type = QueryJobType.EXTRACT_JSON
job_config = ExtractJsonJobConfig(
dataset=CLP_DEFAULT_DATASET_NAME,
dataset=parsed_args.dataset,
archive_id=parsed_args.archive_id,
target_chunk_size=parsed_args.target_chunk_size,
)
Expand Down Expand Up @@ -299,6 +312,12 @@ def main(argv):
# JSON extraction command parser
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--dataset",
type=str,
default=CLP_DEFAULT_DATASET_NAME,
help="The name of the log category.",
)
json_extraction_parser.add_argument(
"--target-chunk-size", type=int, help="Target chunk size (B)."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

import msgpack
import pymongo
from clp_py_utils.clp_config import Database, ResultsCache
from clp_py_utils.clp_config import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For archive manager and decompress, I feel we should validate the dataset argument provided by the users. If they give a dataset that doesn't exist in the database, we should return some error/warning instead of silently not returning results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the query scheduler, if user provides a dataset name that is not currently registered, then query scheduler would log an error. But I don't think that error shows up on the user interface yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I am aware that query scheduler will log an error, but archive_manager.sh and decompress.sh doesn't go through the query scheduler iirc. I think we should:

  1. For archive manager and decompress, returns an error message to user in the cmdline if the dataset doesn't exist
  2. For search.sh, it would be great if we can forward query scheduler's error to user, showing them that the dataset doesn't exists.
  3. For webui, I assume we will only be able to choose from the available datasets (i.e. user can't specify any random dataset)? if so then I guess it's ok if we don't do anything special.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for webui - you can only choose from available datasets. user cannot submit query if there is no dataset.

CLP_DEFAULT_DATASET_NAME,
Database,
ResultsCache,
)
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
from job_orchestration.scheduler.job_config import AggregationConfig, SearchJobConfig
Expand All @@ -32,6 +36,7 @@
def create_and_monitor_job_in_db(
db_config: Database,
results_cache: ResultsCache,
dataset: str,
wildcard_query: str,
tags: str | None,
begin_timestamp: int | None,
Expand All @@ -43,6 +48,7 @@ def create_and_monitor_job_in_db(
count_by_time_bucket_size: int | None,
):
search_config = SearchJobConfig(
dataset=dataset,
query_string=wildcard_query,
begin_timestamp=begin_timestamp,
end_timestamp=end_timestamp,
Expand Down Expand Up @@ -113,6 +119,7 @@ async def worker_connection_handler(reader: asyncio.StreamReader, writer: asynci
async def do_search_without_aggregation(
db_config: Database,
results_cache: ResultsCache,
dataset: str,
wildcard_query: str,
tags: str | None,
begin_timestamp: int | None,
Expand Down Expand Up @@ -147,6 +154,7 @@ async def do_search_without_aggregation(
create_and_monitor_job_in_db,
db_config,
results_cache,
dataset,
wildcard_query,
tags,
begin_timestamp,
Expand Down Expand Up @@ -184,6 +192,7 @@ async def do_search_without_aggregation(
async def do_search(
db_config: Database,
results_cache: ResultsCache,
dataset: str,
wildcard_query: str,
tags: str | None,
begin_timestamp: int | None,
Expand All @@ -198,6 +207,7 @@ async def do_search(
await do_search_without_aggregation(
db_config,
results_cache,
dataset,
wildcard_query,
tags,
begin_timestamp,
Expand All @@ -211,6 +221,7 @@ async def do_search(
create_and_monitor_job_in_db,
db_config,
results_cache,
dataset,
wildcard_query,
tags,
begin_timestamp,
Expand All @@ -229,6 +240,11 @@ def main(argv):

args_parser = argparse.ArgumentParser(description="Searches the compressed logs.")
args_parser.add_argument("--config", "-c", required=True, help="CLP configuration file.")
args_parser.add_argument(
"--dataset",
default=CLP_DEFAULT_DATASET_NAME,
help="The name of the log category.",
)
args_parser.add_argument("wildcard_query", help="Wildcard query.")
args_parser.add_argument(
"-t", "--tags", help="Comma-separated list of tags of archives to search."
Expand Down Expand Up @@ -286,6 +302,7 @@ def main(argv):
do_search(
clp_config.database,
clp_config.results_cache,
parsed_args.dataset,
parsed_args.wildcard_query,
parsed_args.tags,
parsed_args.begin_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
import uuid

import yaml
from clp_py_utils.clp_config import StorageEngine, StorageType
from clp_py_utils.clp_config import (
CLP_DEFAULT_DATASET_NAME,
StorageEngine,
StorageType,
)

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand Down Expand Up @@ -35,6 +39,11 @@ def main(argv):
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--dataset",
default=CLP_DEFAULT_DATASET_NAME,
help="The name of the log category.",
)
args_parser.add_argument("wildcard_query", help="Wildcard query.")
args_parser.add_argument(
"-t", "--tags", help="Comma-separated list of tags of archives to search."
Expand Down Expand Up @@ -104,6 +113,7 @@ def main(argv):
"python3",
"-m", "clp_package_utils.scripts.native.search",
"--config", str(generated_config_path_on_container),
"--dataset", str(parsed_args.dataset),
parsed_args.wildcard_query,
]
# fmt: on
Expand Down
Loading