diff --git a/.github/workflows/clp-uv-checks.yaml b/.github/workflows/clp-uv-checks.yaml new file mode 100644 index 0000000000..6f60233b00 --- /dev/null +++ b/.github/workflows/clp-uv-checks.yaml @@ -0,0 +1,52 @@ +name: "clp-uv-checks" + +on: + pull_request: + paths: &monitored_paths + - ".github/workflows/clp-uv-checks.yaml" + - "components/clp-mcp-server/pyproject.toml" + - "components/clp-mcp-server/uv.lock" + - "components/clp-package-utils/pyproject.toml" + - "components/clp-package-utils/uv.lock" + - "components/clp-py-utils/pyproject.toml" + - "components/clp-py-utils/uv.lock" + - "components/job-orchestration/pyproject.toml" + - "components/job-orchestration/uv.lock" + - "integration-tests/pyproject.toml" + - "integration-tests/uv.lock" + push: + paths: *monitored_paths + schedule: + # Run daily at 00:15 UTC (the 15 is to avoid periods of high load) + - cron: "15 0 * * *" + workflow_dispatch: + +concurrency: + group: "${{github.workflow}}-${{github.ref}}" + # Cancel in-progress jobs for efficiency + cancel-in-progress: true + +jobs: + uv-checks: + strategy: + matrix: + os: + - "macos-15" + - "ubuntu-22.04" + - "ubuntu-24.04" + runs-on: "${{matrix.os}}" + steps: + - uses: "actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8" # v5.0.0 + with: + submodules: "recursive" + + - name: "Install task" + shell: "bash" + run: "npm install -g @go-task/cli@3.44.0" + + - name: "Install the latest version of uv" + uses: "astral-sh/setup-uv@85856786d1ce8acfbcc2f13a5f3fbd6b938f9f41" # v7.1.2 + + - name: "Validate lock files" + shell: "bash" + run: "task deps:lock:check-uv" diff --git a/components/clp-mcp-server/clp_mcp_server/server/utils.py b/components/clp-mcp-server/clp_mcp_server/server/utils.py index 9360f9c4e1..0cab2d6ee5 100644 --- a/components/clp-mcp-server/clp_mcp_server/server/utils.py +++ b/components/clp-mcp-server/clp_mcp_server/server/utils.py @@ -83,8 +83,9 @@ def format_query_results(query_results: list[dict[str, Any]]) -> list[str]: kv-pairs: - "timestamp": An integer representing the epoch timestamp in milliseconds. - "message": A string representing the log message. + - "link": A string representing the link to open the log viewer displaying the message. - The message will be formatted as `timestamp: , message: `: + The message will be formatted as `timestamp: , message: , link: `. :param query_results: A list of dictionaries representing kv-pair log events. :return: A list of strings representing formatted log events. @@ -105,7 +106,9 @@ def format_query_results(query_results: list[dict[str, Any]]) -> list[str]: logger.warning("Empty message attached to a log event: %s.", obj) continue - formatted_log_events.append(f"timestamp: {timestamp_str}, message: {message}") + link = obj["link"] + + formatted_log_events.append(f"timestamp: {timestamp_str}, message: {message}, link: {link}") return formatted_log_events diff --git a/components/clp-mcp-server/tests/server/test_utils.py b/components/clp-mcp-server/tests/server/test_utils.py index b7936a210c..f6a2fb16dd 100644 --- a/components/clp-mcp-server/tests/server/test_utils.py +++ b/components/clp-mcp-server/tests/server/test_utils.py @@ -13,6 +13,8 @@ class TestUtils: """Test suite for utility functions.""" + LINK = "http://localhost:4000/" + # Error Messages: INVALID_DATE_STRING_ERROR = "Invalid date string" INVALID_DATE_STRING_FORMAT_ERROR = "Timestamp must end with 'Z' to indicate UTC." @@ -24,20 +26,23 @@ class TestUtils: { "timestamp": None, "message": '{"message":"Log with None timestamp"}\n', + "link": LINK }, { "timestamp": "1729267200000", # str instead of int "message": '{"message":"Log with str timestamp"}\n', + "link": LINK }, { "timestamp": 1729267200000.0, # float instead of int "message": '{"message":"Log with float timestamp"}\n', + "link": LINK }, ] EXPECTED_INVALID_TYPE = [ - 'timestamp: N/A, message: {"message":"Log with None timestamp"}\n', - 'timestamp: N/A, message: {"message":"Log with str timestamp"}\n', - 'timestamp: N/A, message: {"message":"Log with float timestamp"}\n', + f'timestamp: N/A, message: {{"message":"Log with None timestamp"}}\n, link: {LINK}', + f'timestamp: N/A, message: {{"message":"Log with str timestamp"}}\n, link: {LINK}', + f'timestamp: N/A, message: {{"message":"Log with float timestamp"}}\n, link: {LINK}' ] # Test case: invalid timestamp values. @@ -45,33 +50,44 @@ class TestUtils: { "timestamp": 9999999999999999, "message": '{"message":"Log with overflow timestamp"}\n', + "link": LINK }, { "timestamp": -9999999999999999, "message": '{"message":"Log with negative overflow timestamp"}\n', + "link": LINK }, ] EXPECTED_INVALID_VALUE = [ - 'timestamp: N/A, message: {"message":"Log with overflow timestamp"}\n', - 'timestamp: N/A, message: {"message":"Log with negative overflow timestamp"}\n', + ( + f'timestamp: N/A, message: {{"message":"Log with overflow timestamp"}}\n,' + f' link: {LINK}' + ), + ( + f'timestamp: N/A, message: {{"message":"Log with negative overflow timestamp"}}\n,' + f' link: {LINK}' + ) ] # Test case: missing timestamp and message fields. MISSING_TIMESTAMP_AND_MESSAGE_ENTRY = [ { "_id": "test001", + "link": LINK }, { "_id": "test002", "message": '{"message":"Log with no timestamp"}\n', + "link": LINK }, { "_id": "test003", "timestamp": 0, + "link": LINK } ] EXPECTED_MISSING_TIMESTAMP_AND_MESSAGE = [ - 'timestamp: N/A, message: {"message":"Log with no timestamp"}\n', + f'timestamp: N/A, message: {{"message":"Log with no timestamp"}}\n, link: {LINK}', ] # Testing basic functionality. @@ -83,6 +99,7 @@ class TestUtils: "orig_file_path": "/var/log/app.log", "archive_id": "abc123", "log_event_ix": 99, + "link": LINK }, { "_id": "test001", @@ -91,6 +108,7 @@ class TestUtils: "orig_file_path": "/var/log/app.log", "archive_id": "abc123", "log_event_ix": 100, + "link": LINK }, { "_id": "test002", @@ -102,6 +120,7 @@ class TestUtils: "orig_file_path": "/var/log/app.log", "archive_id": "abc124", "log_event_ix": 101, + "link": LINK }, { "_id": "test003", @@ -113,6 +132,13 @@ class TestUtils: "orig_file_path": "/var/log/app.log", "archive_id": "abc125", "log_event_ix": 102, + "link": ( + "http://localhost:4000/streamFile" + "?dataset=default" + '&type=json' + "&streamId=abc125" + "&logEventIdx=102" + ), }, ] @@ -120,22 +146,28 @@ class TestUtils: ( 'timestamp: 2024-10-18T16:00:00.123Z, message: ' '{"ts":1729267200123,"pid":1234,"tid":5678,' - '"message":"Log with millisecond precision"}\n' + '"message":"Log with millisecond precision"}\n, ' + "link: http://localhost:4000/streamFile" + "?dataset=default" + '&type=json' + "&streamId=abc125" + "&logEventIdx=102" + ), ( - 'timestamp: 2024-10-18T16:00:00.000Z, message: ' - '{"ts":1729267200000,"pid":1234,"tid":5678,' - '"message":"Log with zero milliseconds"}\n' + f'timestamp: 2024-10-18T16:00:00.000Z, message: ' + f'{{"ts":1729267200000,"pid":1234,"tid":5678,' + f'"message":"Log with zero milliseconds"}}\n, link: {LINK}' ), ( - 'timestamp: 1970-01-01T00:00:00.000Z, message: ' - '{"ts":0,"pid":null,"tid":null,' - '"message":"Log at epoch zero"}\n' + f'timestamp: 1970-01-01T00:00:00.000Z, message: ' + f'{{"ts":0,"pid":null,"tid":null,' + f'"message":"Log at epoch zero"}}\n, link: {LINK}' ), ( - 'timestamp: N/A, message: ' - '{"pid":null,"tid":null,' - '"message":"Log at epoch none"}\n' + f'timestamp: N/A, message: ' + f'{{"pid":null,"tid":null,' + f'"message":"Log at epoch none"}}\n, link: {LINK}' ), ] diff --git a/components/clp-package-utils/clp_package_utils/controller.py b/components/clp-package-utils/clp_package_utils/controller.py index 740767b26f..6566fab12f 100644 --- a/components/clp-package-utils/clp_package_utils/controller.py +++ b/components/clp-package-utils/clp_package_utils/controller.py @@ -87,22 +87,22 @@ def __init__(self, clp_config: CLPConfig) -> None: self._conf_dir = self._clp_home / "etc" @abstractmethod - def start(self) -> None: + def set_up_env(self) -> None: """ - Starts the components. + Sets up all components to run by preparing environment variables, directories, and + configuration files. """ @abstractmethod - def stop(self) -> None: + def start(self) -> None: """ - Stops the components. + Starts the components. """ @abstractmethod - def _set_up_env(self) -> None: + def stop(self) -> None: """ - Sets up all components to run by preparing environment variables, directories, and - configuration files. + Stops the components. """ def _set_up_env_for_database(self) -> EnvVarsDict: @@ -503,8 +503,8 @@ def _set_up_env_for_webui(self, container_clp_config: CLPConfig) -> EnvVarsDict: query_engine = self._clp_config.package.query_engine if QueryEngine.PRESTO == query_engine: - server_settings_json_updates["PrestoHost"] = self._clp_config.presto.host - server_settings_json_updates["PrestoPort"] = self._clp_config.presto.port + server_settings_json_updates["PrestoHost"] = container_clp_config.presto.host + server_settings_json_updates["PrestoPort"] = container_clp_config.presto.port else: server_settings_json_updates["PrestoHost"] = None server_settings_json_updates["PrestoPort"] = None @@ -642,75 +642,7 @@ def __init__(self, clp_config: CLPConfig, instance_id: str) -> None: self._project_name = f"clp-package-{instance_id}" super().__init__(clp_config) - def start(self) -> None: - """ - Starts CLP's components using Docker Compose. - - :raise: Propagates `check_docker_dependencies`'s exceptions. - :raise: Propagates `subprocess.run`'s exceptions. - """ - check_docker_dependencies( - should_compose_project_be_running=False, project_name=self._project_name - ) - self._set_up_env() - - deployment_type = self._clp_config.get_deployment_type() - logger.info(f"Starting CLP using Docker Compose ({deployment_type} deployment)...") - - cmd = ["docker", "compose", "--project-name", self._project_name] - if deployment_type == DeploymentType.BASE: - cmd += ["--file", "docker-compose.base.yaml"] - if self._clp_config.mcp_server is not None: - cmd += ["--profile", "mcp"] - cmd += ["up", "--detach", "--wait"] - subprocess.run( - cmd, - cwd=self._clp_home, - check=True, - ) - logger.info("Started CLP.") - - def stop(self) -> None: - """ - Stops CLP components deployed via Docker Compose. - - :raise: Propagates `subprocess.run`'s exceptions. - """ - try: - check_docker_dependencies( - should_compose_project_be_running=True, project_name=self._project_name - ) - except DockerComposeProjectNotRunningError: - logger.info( - "Docker Compose project '%s' is not running. Nothing to stop.", - self._project_name, - ) - return - except DockerDependencyError as e: - logger.warning( - 'Docker dependencies check failed: "%s". Attempting to stop CLP containers ' - "anyway...", - e, - ) - else: - logger.info("Stopping all CLP containers using Docker Compose...") - - subprocess.run( - ["docker", "compose", "--project-name", self._project_name, "down"], - cwd=self._clp_home, - check=True, - ) - logger.info("Stopped CLP.") - - @staticmethod - def _get_num_workers() -> int: - """ - :return: Number of worker processes to run. - """ - # This will change when we move from single to multi-container workers. See y-scope/clp#1424 - return multiprocessing.cpu_count() // 2 - - def _set_up_env(self) -> None: + def set_up_env(self) -> None: # Generate container-specific config. container_clp_config = generate_docker_compose_container_config(self._clp_config) num_workers = self._get_num_workers() @@ -796,6 +728,73 @@ def _set_up_env(self) -> None: continue env_file.write(f"{key}={value}\n") + def start(self) -> None: + """ + Starts CLP's components using Docker Compose. + + :raise: Propagates `check_docker_dependencies`'s exceptions. + :raise: Propagates `subprocess.run`'s exceptions. + """ + check_docker_dependencies( + should_compose_project_be_running=False, project_name=self._project_name + ) + + deployment_type = self._clp_config.get_deployment_type() + logger.info(f"Starting CLP using Docker Compose ({deployment_type} deployment)...") + + cmd = ["docker", "compose", "--project-name", self._project_name] + if deployment_type == DeploymentType.BASE: + cmd += ["--file", "docker-compose.base.yaml"] + if self._clp_config.mcp_server is not None: + cmd += ["--profile", "mcp"] + cmd += ["up", "--detach", "--wait"] + subprocess.run( + cmd, + cwd=self._clp_home, + check=True, + ) + logger.info("Started CLP.") + + def stop(self) -> None: + """ + Stops CLP components deployed via Docker Compose. + + :raise: Propagates `subprocess.run`'s exceptions. + """ + try: + check_docker_dependencies( + should_compose_project_be_running=True, project_name=self._project_name + ) + except DockerComposeProjectNotRunningError: + logger.info( + "Docker Compose project '%s' is not running. Nothing to stop.", + self._project_name, + ) + return + except DockerDependencyError as e: + logger.warning( + 'Docker dependencies check failed: "%s". Attempting to stop CLP containers ' + "anyway...", + e, + ) + else: + logger.info("Stopping all CLP containers using Docker Compose...") + + subprocess.run( + ["docker", "compose", "--project-name", self._project_name, "down"], + cwd=self._clp_home, + check=True, + ) + logger.info("Stopped CLP.") + + @staticmethod + def _get_num_workers() -> int: + """ + :return: Number of worker processes to run. + """ + # This will change when we move from single to multi-container workers. See y-scope/clp#1424 + return multiprocessing.cpu_count() // 2 + def get_or_create_instance_id(clp_config: CLPConfig) -> str: """ diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index cfd4372761..a1a35dcd9b 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -49,6 +49,9 @@ DOCKER_MOUNT_TYPE_STRINGS = ["bind"] +S3_KEY_PREFIX_COMPRESSION = "s3-key-prefix" +S3_OBJECT_COMPRESSION = "s3-object" + class DockerDependencyError(OSError): """Base class for errors related to Docker dependencies.""" diff --git a/components/clp-package-utils/clp_package_utils/scripts/compress.py b/components/clp-package-utils/clp_package_utils/scripts/compress.py index b8c1275c59..cd6f591a29 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/compress.py @@ -13,8 +13,8 @@ CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLP_DEFAULT_DATASET_NAME, StorageEngine, + StorageType, ) -from job_orchestration.scheduler.job_config import InputType from clp_package_utils.general import ( CONTAINER_INPUT_LOGS_ROOT_DIR, @@ -30,43 +30,45 @@ validate_dataset_name, ) -logger = logging.getLogger(__file__) +logger = logging.getLogger(__name__) def _generate_logs_list( - input_type: InputType, container_logs_list_path: pathlib.Path, parsed_args: argparse.Namespace, -) -> None: - if InputType.FS == input_type: - host_logs_list_path = parsed_args.path_list - with open(container_logs_list_path, "w") as container_logs_list_file: - if host_logs_list_path is not None: - with open(host_logs_list_path, "r") as host_logs_list_file: - for line in host_logs_list_file: - stripped_path_str = line.rstrip() - if "" == stripped_path_str: - # Skip empty paths - continue - resolved_path = pathlib.Path(stripped_path_str).resolve() - mounted_path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to( - resolved_path.anchor - ) - container_logs_list_file.write(f"{mounted_path}\n") - +) -> bool: + """ + Generates logs list file for the native compression script. + + :param container_logs_list_path: Path to write logs list. + :param parsed_args: Parsed command-line arguments. + :return: Whether any paths were written to the logs list. + """ + host_logs_list_path = parsed_args.path_list + with open(container_logs_list_path, "w") as container_logs_list_file: + if host_logs_list_path is None: for path in parsed_args.paths: resolved_path = pathlib.Path(path).resolve() mounted_path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to( resolved_path.anchor ) container_logs_list_file.write(f"{mounted_path}\n") - - elif InputType.S3 == input_type: - with open(container_logs_list_path, "w") as container_logs_list_file: - container_logs_list_file.write(f"{parsed_args.paths[0]}\n") - - else: - raise ValueError(f"Unsupported input type: {input_type}.") + return len(parsed_args.paths) != 0 + + no_path_found = True + with open(host_logs_list_path, "r") as host_logs_list_file: + for line in host_logs_list_file: + stripped_path_str = line.rstrip() + if "" == stripped_path_str: + # Skip empty paths + continue + no_path_found = False + resolved_path = pathlib.Path(stripped_path_str).resolve() + mounted_path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to( + resolved_path.anchor + ) + container_logs_list_file.write(f"{mounted_path}\n") + return not no_path_found def _generate_compress_cmd( @@ -81,6 +83,7 @@ def _generate_compress_cmd( "python3", "-m", "clp_package_utils.scripts.native.compress", "--config", str(config_path), + "--input-type", "fs", ] # fmt: on if parsed_args.verbose: @@ -116,22 +119,6 @@ def _validate_fs_input_args( args_parser.error("Paths cannot be specified on the command line AND through a file.") -def _validate_s3_input_args( - parsed_args: argparse.Namespace, - args_parser: argparse.ArgumentParser, - storage_engine: StorageEngine, -) -> None: - if StorageEngine.CLP_S != storage_engine: - args_parser.error( - f"Input type {InputType.S3} is only supported for the storage engine" - f" {StorageEngine.CLP_S}." - ) - if len(parsed_args.paths) != 1: - args_parser.error(f"Only one URL can be specified for input type {InputType.S3}.") - if parsed_args.path_list is not None: - args_parser.error(f"Path list file is unsupported for input type {InputType.S3}.") - - def main(argv): clp_home = get_clp_home() default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH @@ -167,9 +154,7 @@ def main(argv): args_parser.add_argument( "--no-progress-reporting", action="store_true", help="Disables progress reporting." ) - args_parser.add_argument( - "paths", metavar="PATH", nargs="*", help="Paths or an S3 URL to compress." - ) + args_parser.add_argument("paths", metavar="PATH", nargs="*", help="Paths to compress.") args_parser.add_argument( "-f", "--path-list", dest="path_list", help="A file listing all paths to compress." ) @@ -188,10 +173,20 @@ def main(argv): # Validate and load necessary credentials validate_and_load_db_credentials_file(clp_config, clp_home, False) - except: + except Exception: logger.exception("Failed to load config.") return -1 + # Validate logs_input type is FS + if clp_config.logs_input.type != StorageType.FS: + logger.error( + "Filesystem compression expects `logs_input.type` to be `%s`, but `%s` is found. For S3" + " compression, use `compress-from-s3.sh` instead.", + StorageType.FS, + clp_config.logs_input.type, + ) + return -1 + storage_engine: StorageEngine = clp_config.package.storage_engine dataset = parsed_args.dataset if StorageEngine.CLP_S == storage_engine: @@ -212,13 +207,8 @@ def main(argv): logger.error(f"Dataset selection is not supported for storage engine: {storage_engine}.") return -1 - input_type = clp_config.logs_input.type - if InputType.FS == input_type: - _validate_fs_input_args(parsed_args, args_parser) - elif InputType.S3 == input_type: - _validate_s3_input_args(parsed_args, args_parser, storage_engine) - else: - raise ValueError(f"Unsupported input type: {input_type}.") + # Validate filesystem input arguments + _validate_fs_input_args(parsed_args, args_parser) container_name = generate_container_name(str(JobType.COMPRESSION)) @@ -227,9 +217,7 @@ def main(argv): container_clp_config, clp_config, get_container_config_filename(container_name) ) - necessary_mounts = [mounts.clp_home, mounts.data_dir, mounts.logs_dir] - if InputType.FS == input_type: - necessary_mounts.append(mounts.input_logs_dir) + necessary_mounts = [mounts.clp_home, mounts.data_dir, mounts.logs_dir, mounts.input_logs_dir] # Write compression logs to a file while True: @@ -242,7 +230,9 @@ def main(argv): if not container_logs_list_path.exists(): break - _generate_logs_list(clp_config.logs_input.type, container_logs_list_path, parsed_args) + if not _generate_logs_list(container_logs_list_path, parsed_args): + logger.error("No filesystem paths given for compression.") + return -1 extra_env_vars = { CLP_DB_USER_ENV_VAR_NAME: clp_config.database.username, @@ -259,13 +249,13 @@ def main(argv): proc = subprocess.run(cmd) ret_code = proc.returncode - if 0 != ret_code: + if ret_code != 0: logger.error("Compression failed.") logger.debug(f"Docker command failed: {shlex.join(cmd)}") + else: + container_logs_list_path.unlink() - # Remove generated files generated_config_path_on_host.unlink() - return ret_code diff --git a/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py b/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py new file mode 100644 index 0000000000..a2435f93fd --- /dev/null +++ b/components/clp-package-utils/clp_package_utils/scripts/compress_from_s3.py @@ -0,0 +1,317 @@ +import argparse +import logging +import pathlib +import shlex +import subprocess +import sys +import uuid + +from clp_py_utils.clp_config import ( + CLP_DB_PASS_ENV_VAR_NAME, + CLP_DB_USER_ENV_VAR_NAME, + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + CLP_DEFAULT_DATASET_NAME, + StorageEngine, + StorageType, +) + +from clp_package_utils.general import ( + dump_container_config, + generate_container_config, + generate_container_name, + generate_container_start_cmd, + get_clp_home, + get_container_config_filename, + JobType, + load_config_file, + S3_KEY_PREFIX_COMPRESSION, + S3_OBJECT_COMPRESSION, + validate_and_load_db_credentials_file, + validate_dataset_name, +) + +logger = logging.getLogger(__name__) + + +def _generate_url_list( + subcommand: str, + container_url_list_path: pathlib.Path, + parsed_args: argparse.Namespace, +) -> bool: + """ + Generates URL list file for the native compression script. + + :param subcommand: S3 compression subcommand. Must be `S3_OBJECT_COMPRESSION` or + `S3_KEY_PREFIX_COMPRESSION`. + :param container_url_list_path: Path to write URL list. + :param parsed_args: Parsed command-line arguments. + :return: Whether any URLs were written to the file. + """ + with open(container_url_list_path, "w") as url_list_file: + url_list_file.write(f"{subcommand}\n") + + if parsed_args.inputs_from is None: + for url in parsed_args.inputs: + url_list_file.write(f"{url}\n") + return len(parsed_args.inputs) != 0 + + no_url_found = True + with open(parsed_args.inputs_from, "r") as input_file: + for line in input_file: + stripped_url = line.strip() + if "" == stripped_url: + continue + no_url_found = False + url_list_file.write(f"{stripped_url}\n") + return not no_url_found + + +def _generate_compress_cmd( + parsed_args: argparse.Namespace, + dataset: str | None, + config_path: pathlib.Path, + url_list_path: pathlib.Path, +) -> list[str]: + """ + Generates command to run the native compression script. + + :param parsed_args: + :param dataset: + :param config_path: Path to the config file (in the container). + :param url_list_path: Path to the URL list file (in the container). + :return: The generated command. + """ + # fmt: off + compress_cmd = [ + "python3", + "-m", "clp_package_utils.scripts.native.compress", + "--config", str(config_path), + "--input-type", "s3", + ] + # fmt: on + if parsed_args.verbose: + compress_cmd.append("--verbose") + if dataset is not None: + compress_cmd.append("--dataset") + compress_cmd.append(dataset) + if parsed_args.timestamp_key is not None: + compress_cmd.append("--timestamp-key") + compress_cmd.append(parsed_args.timestamp_key) + if parsed_args.tags is not None: + compress_cmd.append("--tags") + compress_cmd.append(parsed_args.tags) + if parsed_args.no_progress_reporting is True: + compress_cmd.append("--no-progress-reporting") + + compress_cmd.append("--logs-list") + compress_cmd.append(str(url_list_path)) + + return compress_cmd + + +def _validate_s3_object_args( + parsed_args: argparse.Namespace, + args_parser: argparse.ArgumentParser, +) -> None: + """ + Validates `S3_OBJECT_COMPRESSION` subcommand arguments. + + :param parsed_args: + :param args_parser: + """ + if len(parsed_args.inputs) == 0 and parsed_args.inputs_from is None: + args_parser.error("No URLs specified.") + + # Validate URLs were specified using only one method + if len(parsed_args.inputs) > 0 and parsed_args.inputs_from is not None: + args_parser.error("URLs cannot be specified on the command line AND through a file.") + + +def _validate_s3_key_prefix_args( + parsed_args: argparse.Namespace, + args_parser: argparse.ArgumentParser, +) -> None: + """ + Validates `S3_KEY_PREFIX_COMPRESSION` subcommand arguments. + + :param parsed_args: + :param args_parser: + """ + + if parsed_args.inputs_from is None: + if len(parsed_args.inputs) == 0: + args_parser.error("No URL specified.") + if len(parsed_args.inputs) != 1: + args_parser.error( + f"{S3_KEY_PREFIX_COMPRESSION} accepts exactly one URL, got {len(parsed_args.inputs)}." + ) + + if len(parsed_args.inputs) > 0 and parsed_args.inputs_from is not None: + args_parser.error("URL cannot be specified on the command line AND through a file.") + + +def main(argv): + clp_home = get_clp_home() + default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH + + args_parser = argparse.ArgumentParser(description="Compresses logs from S3") + + args_parser.add_argument( + "--config", + "-c", + default=str(default_config_file_path), + help="CLP package configuration file.", + ) + args_parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Enable debug logging.", + ) + args_parser.add_argument( + "--dataset", + type=str, + default=None, + help="The dataset that the archives belong to.", + ) + args_parser.add_argument( + "--timestamp-key", + help="The path (e.g. x.y) for the field containing the log event's timestamp.", + ) + args_parser.add_argument( + "-t", "--tags", help="A comma-separated list of tags to apply to the compressed archives." + ) + args_parser.add_argument( + "--no-progress-reporting", action="store_true", help="Disables progress reporting." + ) + + subparsers = args_parser.add_subparsers(dest="subcommand", required=True) + + object_compression_option_parser = subparsers.add_parser( + S3_OBJECT_COMPRESSION, help="Compress specific S3 objects identified by their full URLs." + ) + object_compression_option_parser.add_argument( + "inputs", metavar="URL", nargs="*", help="S3 object URLs." + ) + object_compression_option_parser.add_argument( + "--inputs-from", type=str, help="A file containing all S3 object URLs to compress." + ) + + prefix_compression_option_parser = subparsers.add_parser( + S3_KEY_PREFIX_COMPRESSION, help="Compress all S3 objects under the key prefix." + ) + prefix_compression_option_parser.add_argument( + "inputs", metavar="URL", nargs="*", help="S3 prefix URL." + ) + prefix_compression_option_parser.add_argument( + "--inputs-from", type=str, help="A file containing S3 key prefix to compress." + ) + + parsed_args = args_parser.parse_args(argv[1:]) + if parsed_args.verbose: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + try: + config_file_path = pathlib.Path(parsed_args.config) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) + clp_config.validate_logs_dir() + + validate_and_load_db_credentials_file(clp_config, clp_home, False) + except Exception: + logger.exception("Failed to load config.") + return -1 + + # Validate logs_input type is S3 + if clp_config.logs_input.type != StorageType.S3: + logger.error( + "S3 compression expects `logs_input.type` to be `%s`, but `%s` is found. Please update" + " `clp-config.yml`.", + StorageType.S3, + clp_config.logs_input.type, + ) + return -1 + + storage_engine: StorageEngine = clp_config.package.storage_engine + dataset = parsed_args.dataset + + if StorageEngine.CLP_S != storage_engine: + logger.error( + f"S3 compression requires storage engine {StorageEngine.CLP_S}, but configured engine" + f" is {storage_engine}." + ) + return -1 + + # TODO: The following dataset validation is duplicated in `compress.py`. We should extract it + # into a common utility function. + dataset = CLP_DEFAULT_DATASET_NAME if dataset is None else dataset + try: + clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True) + validate_dataset_name(clp_db_connection_params["table_prefix"], dataset) + except Exception as e: + logger.error(e) + return -1 + + if parsed_args.timestamp_key is None: + logger.warning( + "`--timestamp-key` not specified. Events will not have assigned timestamps and can" + " only be searched from the command line without a timestamp filter." + ) + + if parsed_args.subcommand == S3_OBJECT_COMPRESSION: + _validate_s3_object_args(parsed_args, args_parser) + else: + _validate_s3_key_prefix_args(parsed_args, args_parser) + + # TODO: The following container setup code is duplicated in `compress.py`. We should extract it + # into a common utility function. + container_name = generate_container_name(str(JobType.COMPRESSION)) + + container_clp_config, mounts = generate_container_config(clp_config, clp_home) + generated_config_path_on_container, generated_config_path_on_host = dump_container_config( + container_clp_config, clp_config, get_container_config_filename(container_name) + ) + + necessary_mounts = [mounts.clp_home, mounts.data_dir, mounts.logs_dir] + + while True: + container_url_list_filename = f"{uuid.uuid4()}.txt" + container_url_list_path = clp_config.logs_directory / container_url_list_filename + url_list_path_on_container = ( + container_clp_config.logs_directory / container_url_list_filename + ) + if not container_url_list_path.exists(): + break + + if not _generate_url_list(parsed_args.subcommand, container_url_list_path, parsed_args): + logger.error("No S3 URLs given for compression.") + return -1 + + extra_env_vars = { + CLP_DB_USER_ENV_VAR_NAME: clp_config.database.username, + CLP_DB_PASS_ENV_VAR_NAME: clp_config.database.password, + } + container_start_cmd = generate_container_start_cmd( + container_name, necessary_mounts, clp_config.container_image_ref, extra_env_vars + ) + compress_cmd = _generate_compress_cmd( + parsed_args, dataset, generated_config_path_on_container, url_list_path_on_container + ) + + cmd = container_start_cmd + compress_cmd + + proc = subprocess.run(cmd) + ret_code = proc.returncode + if ret_code != 0: + logger.error("Compression failed.") + logger.debug(f"Docker command failed: {shlex.join(cmd)}") + else: + container_url_list_path.unlink() + + generated_config_path_on_host.unlink() + return ret_code + + +if "__main__" == __name__: + sys.exit(main(sys.argv)) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index ac4df2aa18..08c3fe7bb8 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -1,18 +1,21 @@ import argparse import datetime import logging +import os import pathlib import sys import time from contextlib import closing -from typing import List, Union +from typing import Union import brotli import msgpack from clp_py_utils.clp_config import ( + AwsAuthentication, CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, CLPConfig, COMPRESSION_JOBS_TABLE_NAME, + StorageType, ) from clp_py_utils.pretty_size import pretty_size from clp_py_utils.s3_utils import parse_s3_url @@ -24,7 +27,6 @@ from job_orchestration.scheduler.job_config import ( ClpIoConfig, FsInputConfig, - InputType, OutputConfig, S3InputConfig, ) @@ -33,6 +35,8 @@ CONTAINER_INPUT_LOGS_ROOT_DIR, get_clp_home, load_config_file, + S3_KEY_PREFIX_COMPRESSION, + S3_OBJECT_COMPRESSION, ) logger = logging.getLogger(__file__) @@ -140,11 +144,12 @@ def handle_job(sql_adapter: SQL_Adapter, clp_io_config: ClpIoConfig, no_progress def _generate_clp_io_config( clp_config: CLPConfig, - logs_to_compress: List[str], + logs_to_compress: list[str], parsed_args: argparse.Namespace, ) -> Union[S3InputConfig, FsInputConfig]: - input_type = clp_config.logs_input.type - if InputType.FS == input_type: + input_type = parsed_args.input_type + + if input_type == "fs": if len(logs_to_compress) == 0: raise ValueError("No input paths given.") return FsInputConfig( @@ -153,41 +158,139 @@ def _generate_clp_io_config( timestamp_key=parsed_args.timestamp_key, path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR), ) - elif InputType.S3 == input_type: - if len(logs_to_compress) == 0: - raise ValueError("No URLs given.") - elif len(logs_to_compress) != 1: - raise ValueError(f"Too many URLs: {len(logs_to_compress)} > 1") + elif input_type != "s3": + raise ValueError(f"Unsupported input type: `{input_type}`.") + + # Handle S3 inputs + if len(logs_to_compress) < 2: + raise ValueError("No URLs given.") + + aws_authentication = _get_aws_authentication_from_config(clp_config) - s3_url = logs_to_compress[0] - region_code, bucket_name, key_prefix = parse_s3_url(s3_url) - aws_authentication = clp_config.logs_input.aws_authentication + s3_compress_subcommand = logs_to_compress[0] + urls = logs_to_compress[1:] + + if s3_compress_subcommand == S3_OBJECT_COMPRESSION: + region_code, bucket, key_prefix, keys = _parse_and_validate_s3_object_urls(urls) + return S3InputConfig( + dataset=parsed_args.dataset, + region_code=region_code, + bucket=bucket, + key_prefix=key_prefix, + keys=keys, + aws_authentication=aws_authentication, + timestamp_key=parsed_args.timestamp_key, + ) + elif s3_compress_subcommand == S3_KEY_PREFIX_COMPRESSION: + if len(urls) != 1: + raise ValueError( + f"`{S3_KEY_PREFIX_COMPRESSION}` requires exactly one URL, got {len(urls)}" + ) + region_code, bucket, key_prefix = parse_s3_url(urls[0]) return S3InputConfig( dataset=parsed_args.dataset, region_code=region_code, - bucket=bucket_name, + bucket=bucket, key_prefix=key_prefix, + keys=None, aws_authentication=aws_authentication, timestamp_key=parsed_args.timestamp_key, ) else: - raise ValueError(f"Unsupported input type: {input_type}") + raise ValueError(f"Unsupported S3 compress subcommand: `{s3_compress_subcommand}`.") + +def _get_logs_to_compress(logs_list_path: pathlib.Path) -> list[str]: + """ + Reads logs or URLs from the input file. -def _get_logs_to_compress(logs_list_path: pathlib.Path) -> List[str]: - # Read logs from the input file + :param logs_list_path: + :return: List of paths/URLs. + """ logs_to_compress = [] with open(logs_list_path, "r") as f: - for path in f: - stripped_path_str = path.strip() - if "" == stripped_path_str: - # Skip empty paths + for line in f: + stripped_line = line.strip() + if "" == stripped_line: + # Skip empty lines continue - logs_to_compress.append(stripped_path_str) + logs_to_compress.append(stripped_line) return logs_to_compress +def _parse_and_validate_s3_object_urls( + urls: list[str], +) -> tuple[str, str, str, list[str]]: + """ + Parses and validates S3 object URLs. + + The validation will ensure: + - All URLs have the same region and bucket. + - No duplicate keys among the URLs. + - The URLs share a non-empty common prefix. + + :param urls: + :return: A tuple containing: + - The region code. + - The bucket. + - The common key prefix. + - The list of keys. + :raises ValueError: If the validation fails. + """ + if len(urls) == 0: + raise ValueError("No URLs provided.") + + region_code: str | None = None + bucket_name: str | None = None + keys = set() + + for url in urls: + parsed_region_code, parsed_bucket_name, key = parse_s3_url(url) + + if region_code is None: + region_code = parsed_region_code + elif region_code != parsed_region_code: + raise ValueError( + "All S3 URLs must be in the same region." + f" Found {region_code} and {parsed_region_code}." + ) + + if bucket_name is None: + bucket_name = parsed_bucket_name + elif bucket_name != parsed_bucket_name: + raise ValueError( + "All S3 URLs must be in the same bucket." + f" Found {bucket_name} and {parsed_bucket_name}." + ) + + if key in keys: + raise ValueError(f"Duplicate S3 key found: {key}.") + keys.add(key) + + key_list: list[str] = list(keys) + key_prefix = os.path.commonprefix(key_list) + + if len(key_prefix) == 0: + raise ValueError("The given S3 URLs have no common prefix.") + + return region_code, bucket_name, key_prefix, key_list + + +def _get_aws_authentication_from_config(clp_config: CLPConfig) -> AwsAuthentication: + """ + Gets AWS authentication configuration. + + :param clp_config: + :return: The AWS authentication configuration extracted from the CLP config. + :raise ValueError: If no authentication provided in `clp_config`. + """ + if StorageType.S3 == clp_config.logs_input.type: + return clp_config.logs_input.aws_authentication + + raise ValueError("No AWS authentication provided in `logs_input`.") + + def main(argv): clp_home = get_clp_home() default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH @@ -212,6 +315,14 @@ def main(argv): default=None, help="The dataset that the archives belong to.", ) + args_parser.add_argument( + "--input-type", + dest="input_type", + type=str, + choices=["fs", "s3"], + default="fs", + help="Input type: 'fs' for filesystem paths, 's3' for S3 URLs.", + ) args_parser.add_argument( "-f", "--logs-list", @@ -249,9 +360,13 @@ def main(argv): comp_jobs_dir = clp_config.logs_directory / "comp-jobs" comp_jobs_dir.mkdir(parents=True, exist_ok=True) - logs_to_compress = _get_logs_to_compress(pathlib.Path(parsed_args.logs_list).resolve()) + try: + logs_to_compress = _get_logs_to_compress(pathlib.Path(parsed_args.logs_list).resolve()) + clp_input_config = _generate_clp_io_config(clp_config, logs_to_compress, parsed_args) + except Exception: + logger.exception(f"Failed to process input.") + return -1 - clp_input_config = _generate_clp_io_config(clp_config, logs_to_compress, parsed_args) clp_output_config = OutputConfig.model_validate(clp_config.archive_output.model_dump()) if parsed_args.tags: tag_list = [tag.strip().lower() for tag in parsed_args.tags.split(",") if tag] diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 89ddf2f56b..15a44b7458 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -37,6 +37,11 @@ def main(argv): action="store_true", help="Enable debug logging.", ) + args_parser.add_argument( + "--setup-only", + action="store_true", + help="Validate configuration and prepare directories without starting services.", + ) parsed_args = args_parser.parse_args(argv[1:]) @@ -79,6 +84,12 @@ def main(argv): try: instance_id = get_or_create_instance_id(clp_config) controller = DockerComposeController(clp_config, instance_id) + controller.set_up_env() + if parsed_args.setup_only: + logger.info( + "Completed setup. Services not started because `--setup-only` was specified." + ) + return 0 controller.start() except Exception as ex: if type(ex) == ValueError: diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index 8c07cc98a1..0da114232b 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -32,6 +32,7 @@ RESULTS_CACHE_COMPONENT_NAME = "results_cache" COMPRESSION_SCHEDULER_COMPONENT_NAME = "compression_scheduler" QUERY_SCHEDULER_COMPONENT_NAME = "query_scheduler" +PRESTO_COORDINATOR_COMPONENT_NAME = "presto-coordinator" COMPRESSION_WORKER_COMPONENT_NAME = "compression_worker" QUERY_WORKER_COMPONENT_NAME = "query_worker" WEBUI_COMPONENT_NAME = "webui" @@ -591,9 +592,15 @@ class GarbageCollector(BaseModel): class Presto(BaseModel): + DEFAULT_PORT: ClassVar[int] = 8080 + host: DomainStr port: Port + def transform_for_container(self): + self.host = PRESTO_COORDINATOR_COMPONENT_NAME + self.port = self.DEFAULT_PORT + def _get_env_var(name: str) -> str: value = os.getenv(name) @@ -815,6 +822,8 @@ def transform_for_container(self): self.results_cache.transform_for_container() self.query_scheduler.transform_for_container() self.reducer.transform_for_container() + if self.package.query_engine == QueryEngine.PRESTO and self.presto is not None: + self.presto.transform_for_container() class WorkerConfig(BaseModel): diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index bb45ea8f4f..1d79516bdb 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(indexer) +add_subdirectory(log_converter) add_subdirectory(search) set( diff --git a/components/core/src/clp_s/FileWriter.hpp b/components/core/src/clp_s/FileWriter.hpp index edcd3a2c45..6e4c19cdbc 100644 --- a/components/core/src/clp_s/FileWriter.hpp +++ b/components/core/src/clp_s/FileWriter.hpp @@ -5,6 +5,7 @@ #include #include +#include #include "ErrorCode.hpp" #include "TraceableException.hpp" @@ -29,6 +30,24 @@ class FileWriter { // Constructors FileWriter() : m_file(nullptr), m_fd(-1) {} + // Delete copy constructor and assignment operator + FileWriter(FileWriter const&) = delete; + auto operator=(FileWriter const&) -> FileWriter& = delete; + + // Define custom move constructor and assignment operator + FileWriter(FileWriter&& writer) + : m_file{std::exchange(writer.m_file, nullptr)}, + m_fd{std::exchange(writer.m_fd, -1)} {} + + auto operator=(FileWriter&& writer) -> FileWriter& { + if (this == &writer) { + return *this; + } + m_file = std::exchange(writer.m_file, nullptr); + m_fd = std::exchange(writer.m_fd, -1); + return *this; + } + // Destructor ~FileWriter(); diff --git a/components/core/src/clp_s/log_converter/CMakeLists.txt b/components/core/src/clp_s/log_converter/CMakeLists.txt new file mode 100644 index 0000000000..3d78443412 --- /dev/null +++ b/components/core/src/clp_s/log_converter/CMakeLists.txt @@ -0,0 +1,37 @@ +set( + CLP_S_LOG_CONVERTER_SOURCES + CommandLineArguments.cpp + CommandLineArguments.hpp + LogConverter.cpp + LogConverter.hpp + LogSerializer.cpp + LogSerializer.hpp +) + +if(CLP_BUILD_EXECUTABLES) + add_executable( + log-converter + log_converter.cpp + ${CLP_S_LOG_CONVERTER_SOURCES} + ) + target_compile_features(log-converter PRIVATE cxx_std_20) + target_link_libraries( + log-converter + PRIVATE + Boost::program_options + clp_s::clp_dependencies + clp_s::io + fmt::fmt + log_surgeon::log_surgeon + msgpack-cxx + nlohmann_json::nlohmann_json + spdlog::spdlog + ystdlib::containers + ystdlib::error_handling + ) + set_target_properties( + log-converter + PROPERTIES + RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}" + ) +endif() diff --git a/components/core/src/clp_s/log_converter/CommandLineArguments.cpp b/components/core/src/clp_s/log_converter/CommandLineArguments.cpp new file mode 100644 index 0000000000..f9e7a57aab --- /dev/null +++ b/components/core/src/clp_s/log_converter/CommandLineArguments.cpp @@ -0,0 +1,204 @@ +#include "CommandLineArguments.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "../ErrorCode.hpp" +#include "../FileReader.hpp" +#include "../InputConfig.hpp" + +namespace po = boost::program_options; + +namespace clp_s::log_converter { +namespace { +// Authorization method constants +constexpr std::string_view cNoAuth{"none"}; +constexpr std::string_view cS3Auth{"s3"}; + +/** + * Reads and returns a list of paths from a file containing newline-delimited paths. + * @param input_path_list_file_path Path to the file containing the list of paths. + * @param path_destination The vector that the paths are pushed into. + * @return Whether paths were read successfully or not. + */ +[[nodiscard]] auto read_paths_from_file( + std::string const& input_path_list_file_path, + std::vector& path_destination +) -> bool; + +/** + * Validates and populates network authorization options. + * @param auth_method + * @param auth + * @throws std::invalid_argument if the authorization option is invalid + */ +void validate_network_auth(std::string_view auth_method, NetworkAuthOption& auth); + +auto read_paths_from_file( + std::string const& input_path_list_file_path, + std::vector& path_destination +) -> bool { + FileReader reader; + auto error_code = reader.try_open(input_path_list_file_path); + if (ErrorCodeFileNotFound == error_code) { + SPDLOG_ERROR( + "Failed to open input path list file {} - file not found", + input_path_list_file_path + ); + return false; + } + if (ErrorCodeSuccess != error_code) { + SPDLOG_ERROR("Error opening input path list file {}", input_path_list_file_path); + return false; + } + + std::string line; + while (true) { + error_code = reader.try_read_to_delimiter('\n', false, false, line); + if (ErrorCodeSuccess != error_code) { + break; + } + if (false == line.empty()) { + path_destination.push_back(line); + } + } + + if (ErrorCodeEndOfFile != error_code) { + return false; + } + return true; +} + +void validate_network_auth(std::string_view auth_method, NetworkAuthOption& auth) { + if (cS3Auth == auth_method) { + auth.method = AuthMethod::S3PresignedUrlV4; + } else if (cNoAuth != auth_method) { + throw std::invalid_argument(fmt::format("Invalid authentication type \"{}\"", auth_method)); + } +} +} // namespace + +auto CommandLineArguments::parse_arguments(int argc, char const** argv) + -> CommandLineArguments::ParsingResult { + if (1 == argc) { + print_basic_usage(); + return ParsingResult::Failure; + } + + try { + po::variables_map parsed_command_line_options; + + po::options_description general_options("General options"); + general_options.add_options()("help,h", "Print help"); + + po::options_description conversion_positional_options; + std::vector input_paths; + // clang-format off + conversion_positional_options.add_options()( + "input-paths", + po::value>(&input_paths)->value_name("PATHS"), + "input paths" + ); + // clang-format on + + po::options_description conversion_options("Conversion options"); + std::string input_path_list_file_path; + std::string auth{cNoAuth}; + // clang-format off + conversion_options.add_options()( + "inputs-from,f", + po::value(&input_path_list_file_path) + ->value_name("INPUTS_FILE") + ->default_value(input_path_list_file_path), + "Convert inputs specified in INPUTS_FILE." + )( + "output-dir", + po::value(&m_output_dir) + ->value_name("OUTPUT_DIR") + ->default_value(m_output_dir), + "Output directory for converted inputs." + )( + "auth", + po::value(&auth) + ->value_name("AUTH_METHOD") + ->default_value(auth), + "Type of authentication required for network requests (s3 | none). Authentication" + " with s3 requires the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment" + " variables, and optionally the AWS_SESSION_TOKEN environment variable." + ); + // clang-format on + + po::positional_options_description positional_options; + positional_options.add("input-paths", -1); + + po::options_description all_conversion_options; + all_conversion_options.add(general_options); + all_conversion_options.add(conversion_options); + all_conversion_options.add(conversion_positional_options); + + po::store( + po::command_line_parser(argc, argv) + .options(all_conversion_options) + .positional(positional_options) + .run(), + parsed_command_line_options + ); + po::notify(parsed_command_line_options); + + if (parsed_command_line_options.contains("help")) { + if (argc > 2) { + SPDLOG_WARN("Ignoring all options besides --help."); + } + + print_basic_usage(); + po::options_description visible_options; + visible_options.add(general_options); + visible_options.add(conversion_options); + std::cerr << visible_options << '\n'; + return ParsingResult::InfoCommand; + } + + if (false == input_path_list_file_path.empty()) { + if (false == read_paths_from_file(input_path_list_file_path, input_paths)) { + SPDLOG_ERROR("Failed to read paths from {}", input_path_list_file_path); + return ParsingResult::Failure; + } + } + + for (auto const& path : input_paths) { + if (false == get_input_files_for_raw_path(path, m_input_paths)) { + throw std::invalid_argument(fmt::format("Invalid input path \"{}\".", path)); + } + } + + if (m_input_paths.empty()) { + throw std::invalid_argument("No input paths specified."); + } + + validate_network_auth(auth, m_network_auth); + } catch (std::exception& e) { + SPDLOG_ERROR("{}", e.what()); + print_basic_usage(); + std::cerr << "Try " << get_program_name() << " --help for detailed usage instructions\n"; + return ParsingResult::Failure; + } + + return ParsingResult::Success; +} + +void CommandLineArguments::print_basic_usage() const { + std::cerr << "Usage: " << get_program_name() << " [INPUT_PATHS] [OPTIONS]\n"; +} +} // namespace clp_s::log_converter diff --git a/components/core/src/clp_s/log_converter/CommandLineArguments.hpp b/components/core/src/clp_s/log_converter/CommandLineArguments.hpp new file mode 100644 index 0000000000..51ce0daf0e --- /dev/null +++ b/components/core/src/clp_s/log_converter/CommandLineArguments.hpp @@ -0,0 +1,49 @@ +#ifndef CLP_S_COMMANDLINEARGUMENTS_HPP +#define CLP_S_COMMANDLINEARGUMENTS_HPP + +#include +#include +#include +#include + +#include "../InputConfig.hpp" + +namespace clp_s::log_converter { +class CommandLineArguments { +public: + // Types + enum class ParsingResult : uint8_t { + Success = 0, + InfoCommand, + Failure + }; + + // Constructors + explicit CommandLineArguments(std::string_view program_name) : m_program_name{program_name} {} + + // Methods + [[nodiscard]] auto parse_arguments(int argc, char const** argv) -> ParsingResult; + + [[nodiscard]] auto get_program_name() const -> std::string const& { return m_program_name; } + + [[nodiscard]] auto get_input_paths() const -> std::vector const& { return m_input_paths; } + + [[nodiscard]] auto get_network_auth() const -> NetworkAuthOption const& { + return m_network_auth; + } + + [[nodiscard]] auto get_output_dir() const -> std::string const& { return m_output_dir; } + +private: + // Methods + void print_basic_usage() const; + + // Variables + std::string m_program_name; + std::vector m_input_paths; + NetworkAuthOption m_network_auth{}; + std::string m_output_dir{"./"}; +}; +} // namespace clp_s::log_converter + +#endif // CLP_S_COMMANDLINEARGUMENTS_HPP diff --git a/components/core/src/clp_s/log_converter/LogConverter.cpp b/components/core/src/clp_s/log_converter/LogConverter.cpp new file mode 100644 index 0000000000..50ce5cd2c6 --- /dev/null +++ b/components/core/src/clp_s/log_converter/LogConverter.cpp @@ -0,0 +1,147 @@ +#include "LogConverter.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "../../clp/ErrorCode.hpp" +#include "../../clp/ReaderInterface.hpp" +#include "../InputConfig.hpp" +#include "LogSerializer.hpp" + +namespace clp_s::log_converter { +namespace { +/** + * Non-exhaustive timestamp schema which covers many common patterns. + * + * Once log-surgeon has better unicode support, we should also allow \u2202 as an alternative + * minus sign for timezone offsets. + */ +constexpr std::string_view cTimestampSchema{ + R"(timestamp:(\d{2,4}[ /\-]{0,1}[ 0-9]{2}[ /\-][ 0-9]{2})|([ 0-9]{2}[ /\-])" + R"(((Jan(uary){0,1})|(Feb(ruary){0,1})|(Mar(ch){0,1})|(Apr(il){0,1})|(May)|(Jun(e){0,1})|)" + R"((Jul(y){0,1})|(Aug(ust){0,1})|(Sep(tember){0,1})|(Oct(ober){0,1})|(Nov(ember){0,1})|)" + R"((Dec(ember){0,1}))[ /\-]\d{2,4})[ T:][ 0-9]{2}:[ 0-9]{2}:[ 0-9]{2})" + R"(([,\.:]\d{1,9}){0,1}([ ]{0,1}(UTC){0,1}[\+\-]\d{2}(:{0,1}\d{2}){0,1}Z{0,1}){0,1})" +}; +constexpr std::string_view cDelimiters{R"(delimiters: \t\r\n\[\(:)"}; +} // namespace + +auto LogConverter::convert_file( + clp_s::Path const& path, + clp::ReaderInterface* reader, + std::string_view output_dir +) -> ystdlib::error_handling::Result { + log_surgeon::Schema schema; + schema.add_delimiters(cDelimiters); + schema.add_variable(cTimestampSchema, -1); + log_surgeon::BufferParser parser{std::move(schema.release_schema_ast_ptr())}; + parser.reset(); + + // Reset internal buffer state. + m_parser_offset = 0ULL; + m_num_bytes_buffered = 0ULL; + + auto serializer{YSTDLIB_ERROR_HANDLING_TRYX(LogSerializer::create(output_dir, path.path))}; + + bool reached_end_of_stream{false}; + while (false == reached_end_of_stream) { + auto const num_bytes_read{YSTDLIB_ERROR_HANDLING_TRYX(refill_buffer(reader))}; + reached_end_of_stream = 0ULL == num_bytes_read; + + while (m_parser_offset < m_num_bytes_buffered) { + auto const err{parser.parse_next_event( + m_buffer.data(), + m_num_bytes_buffered, + m_parser_offset, + reached_end_of_stream + )}; + if (log_surgeon::ErrorCode::BufferOutOfBounds == err) { + break; + } + if (log_surgeon::ErrorCode::Success != err) { + return std::errc::no_message; + } + + auto const& event{parser.get_log_parser().get_log_event_view()}; + auto const message{event.to_string()}; + if (nullptr != event.get_timestamp()) { + auto const timestamp{event.get_timestamp()->to_string_view()}; + auto const message_without_timestamp{ + std::string_view{message}.substr(timestamp.length()) + }; + YSTDLIB_ERROR_HANDLING_TRYV( + serializer.add_message(timestamp, message_without_timestamp) + ); + } else { + YSTDLIB_ERROR_HANDLING_TRYV(serializer.add_message(message)); + } + } + } + serializer.close(); + return ystdlib::error_handling::success(); +} + +auto LogConverter::refill_buffer(clp::ReaderInterface* reader) + -> ystdlib::error_handling::Result { + compact_buffer(); + YSTDLIB_ERROR_HANDLING_TRYV(grow_buffer_if_full()); + + size_t num_bytes_read{}; + // NOLINTBEGIN(cppcoreguidelines-pro-bounds-pointer-arithmetic) + auto const rc{reader->try_read( + m_buffer.data() + m_num_bytes_buffered, + m_buffer.size() - m_num_bytes_buffered, + num_bytes_read + )}; + // NOLINTEND(cppcoreguidelines-pro-bounds-pointer-arithmetic) + m_num_bytes_buffered += num_bytes_read; + if (clp::ErrorCode_EndOfFile == rc) { + return num_bytes_read; + } + if (clp::ErrorCode_Success != rc) { + return std::errc::not_enough_memory; + } + + return num_bytes_read; +} + +void LogConverter::compact_buffer() { + if (0 == m_parser_offset) { + return; + } + + // NOLINTBEGIN(cppcoreguidelines-pro-bounds-pointer-arithmetic) + std::memmove( + m_buffer.data(), + m_buffer.data() + m_parser_offset, + m_num_bytes_buffered - m_parser_offset + ); + // NOLINTEND(cppcoreguidelines-pro-bounds-pointer-arithmetic) + m_num_bytes_buffered -= m_parser_offset; + m_parser_offset = 0; +} + +auto LogConverter::grow_buffer_if_full() -> ystdlib::error_handling::Result { + if (m_buffer.size() != m_num_bytes_buffered) { + return ystdlib::error_handling::success(); + } + + size_t const new_size{2 * m_buffer.size()}; + if (new_size > cMaxBufferSize) { + return std::errc::result_out_of_range; + } + ystdlib::containers::Array new_buffer(new_size); + std::memcpy(new_buffer.data(), m_buffer.data(), m_num_bytes_buffered); + m_buffer = std::move(new_buffer); + return ystdlib::error_handling::success(); +} +} // namespace clp_s::log_converter diff --git a/components/core/src/clp_s/log_converter/LogConverter.hpp b/components/core/src/clp_s/log_converter/LogConverter.hpp new file mode 100644 index 0000000000..d149fa907f --- /dev/null +++ b/components/core/src/clp_s/log_converter/LogConverter.hpp @@ -0,0 +1,73 @@ +#ifndef CLP_S_LOG_CONVERTER_LOGCONVERTER_HPP +#define CLP_S_LOG_CONVERTER_LOGCONVERTER_HPP + +#include +#include + +#include +#include + +#include "../../clp/ReaderInterface.hpp" +#include "../InputConfig.hpp" + +namespace clp_s::log_converter { +/** + * Utility class that converts unstructured text logs into KV-IR streams. + */ +class LogConverter { +public: + // Constructors + LogConverter() : m_buffer(cDefaultBufferSize) {} + + // Methods + /** + * Converts a file into KV-IR and outputs the generated file to a given directory. + * @param path The input path for the unstructured text file. + * @param reader A reader positioned at the start of the input stream. + * @param output_dir The output directory for generated KV-IR files. + * @return A void result on success, or an error code indicating the failure: + * - std::errc::no_message if `log_surgeon::BufferParser::parse_next_event` returns an error. + * - Forwards `LogSerializer::create()`'s return values. + * - Forwards `refill_buffer()`'s return values. + * - Forwards `LogSerializer::add_message()`'s return values. + */ + [[nodiscard]] auto + convert_file(clp_s::Path const& path, clp::ReaderInterface* reader, std::string_view output_dir) + -> ystdlib::error_handling::Result; + +private: + // Constants + static constexpr size_t cDefaultBufferSize{64ULL * 1024ULL}; // 64 KiB + static constexpr size_t cMaxBufferSize{64ULL * 1024ULL * 1024ULL}; // 64 MiB + + // Methods + /** + * Refills the internal buffer by consuming bytes from a reader, growing the buffer if it is + * already full. + * @param reader + * @return A result containing the number of new bytes consumed from `reader`, or an error code + * indicating the failure: + * - std::errc::not_enough_memory if `clp::ReaderInterface::try_read()` returns an error. + * - Forwards `grow_buffer_if_full()`'s return values. + */ + [[nodiscard]] auto refill_buffer(clp::ReaderInterface* reader) + -> ystdlib::error_handling::Result; + + /** + * Compacts unconsumed content to the start of the buffer. + */ + void compact_buffer(); + + /** + * Grows the buffer if it is full. + * @return A void result on success, or an error code indicating the failure: + * - std::errc::result_out_of_range if the grown buffer size exceeds the maximum allowed size. + */ + [[nodiscard]] auto grow_buffer_if_full() -> ystdlib::error_handling::Result; + + ystdlib::containers::Array m_buffer; + size_t m_num_bytes_buffered{}; + size_t m_parser_offset{}; +}; +} // namespace clp_s::log_converter +#endif // CLP_S_LOG_CONVERTER_LOGCONVERTER_HPP diff --git a/components/core/src/clp_s/log_converter/LogSerializer.cpp b/components/core/src/clp_s/log_converter/LogSerializer.cpp new file mode 100644 index 0000000000..ea44c0f170 --- /dev/null +++ b/components/core/src/clp_s/log_converter/LogSerializer.cpp @@ -0,0 +1,87 @@ +#include "LogSerializer.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "../../clp/ffi/ir_stream/Serializer.hpp" +#include "../../clp/ir/types.hpp" +#include "../FileWriter.hpp" + +namespace clp_s::log_converter { +namespace { +constexpr msgpack::object_map cEmptyMap{.size = 0U, .ptr = nullptr}; +} // namespace + +auto LogSerializer::create(std::string_view output_dir, std::string_view original_file_path) + -> ystdlib::error_handling::Result { + nlohmann::json metadata; + metadata.emplace(cOriginalFileMetadataKey, original_file_path); + auto serializer{YSTDLIB_ERROR_HANDLING_TRYX( + clp::ffi::ir_stream::Serializer::create( + metadata + ) + )}; + + boost::uuids::random_generator uuid_generator; + std::string const file_name{boost::uuids::to_string(uuid_generator()) + ".clp"}; + auto const converted_path{std::filesystem::path{output_dir} / file_name}; + clp_s::FileWriter writer; + try { + writer.open(converted_path, clp_s::FileWriter::OpenMode::CreateForWriting); + } catch (std::exception const&) { + return std::errc::no_such_file_or_directory; + } + + return LogSerializer{std::move(serializer), std::move(writer)}; +} + +auto LogSerializer::add_message(std::string_view timestamp, std::string_view message) + -> ystdlib::error_handling::Result { + std::array fields{ + msgpack::object_kv{ + .key = msgpack::object{cTimestampKey}, + .val = msgpack::object{timestamp} + }, + msgpack::object_kv{.key = msgpack::object{cMessageKey}, .val = msgpack::object{message}} + }; + msgpack::object_map const record{ + .size = static_cast(fields.size()), + .ptr = fields.data() + }; + if (false == m_serializer.serialize_msgpack_map(cEmptyMap, record)) { + return std::errc::invalid_argument; + } + if (m_serializer.get_ir_buf_view().size() > cMaxIrBufSize) { + flush_buffer(); + } + return ystdlib::error_handling::success(); +} + +auto LogSerializer::add_message(std::string_view message) -> ystdlib::error_handling::Result { + msgpack::object_kv message_field{ + .key = msgpack::object{cMessageKey}, + .val = msgpack::object{message} + }; + msgpack::object_map const record{.size = 1U, .ptr = &message_field}; + if (false == m_serializer.serialize_msgpack_map(cEmptyMap, record)) { + return std::errc::invalid_argument; + } + if (m_serializer.get_ir_buf_view().size() > cMaxIrBufSize) { + flush_buffer(); + } + return ystdlib::error_handling::success(); +} +} // namespace clp_s::log_converter diff --git a/components/core/src/clp_s/log_converter/LogSerializer.hpp b/components/core/src/clp_s/log_converter/LogSerializer.hpp new file mode 100644 index 0000000000..dbae501f6b --- /dev/null +++ b/components/core/src/clp_s/log_converter/LogSerializer.hpp @@ -0,0 +1,116 @@ +#ifndef CLP_S_LOG_CONVERTER_LOGSERIALIZER_HPP +#define CLP_S_LOG_CONVERTER_LOGSERIALIZER_HPP + +#include +#include +#include + +#include + +#include "../../clp/ffi/ir_stream/protocol_constants.hpp" +#include "../../clp/ffi/ir_stream/Serializer.hpp" +#include "../../clp/ir/types.hpp" +#include "../../clp/type_utils.hpp" +#include "../FileWriter.hpp" + +namespace clp_s::log_converter { +/** + * Utility class that generates KV-IR corresponding to a converted input file. + */ +class LogSerializer { +public: + // Factory function + /** + * Creates an instance of `LogSerializer`. + * @param output_dir The destination directory for generated KV-IR. + * @param original_file_path The original path for the file being converted to KV-IR. + * @return A result containing a `LogSerializer` on success, or an error code indicating the + * failure: + * - std::errc::no_such_file_or_directory if a `clp_s::FileWriter` fails to open an output file. + * - Forwards `clp::ffi::ir_stream::Serializer<>::create()`'s return values. + */ + [[nodiscard]] static auto + create(std::string_view output_dir, std::string_view original_file_path) + -> ystdlib::error_handling::Result; + + // Constructors + // Delete copy constructor and assignment operator + LogSerializer(LogSerializer const&) = delete; + [[nodiscard]] auto operator=(LogSerializer const&) -> LogSerializer& = delete; + + // Default move constructor and assignment operator + LogSerializer(LogSerializer&&) noexcept = default; + [[nodiscard]] auto operator=(LogSerializer&&) -> LogSerializer& = default; + + // Destructor + ~LogSerializer() = default; + + // Methods + /** + * Adds a message with a timestamp to the serialized output. + * + * The timestamp is serialized as a string so that the original timestamp format can be + * preserved during clp-s ingestion. + * + * @param timestamp + * @param message + * @return A void result on success, or an error code indicating the failure: + * - std::errc::invalid_argument if `clp::ffi::ir_stream::Serializer<>::serialize_msgpack_map` + * returns on failure. + */ + [[nodiscard]] auto add_message(std::string_view timestamp, std::string_view message) + -> ystdlib::error_handling::Result; + + /** + * Adds a message without a timestamp to the serialized output. + * @param message + * @return A void result on success, or an error code indicating the failure: + * - std::errc::invalid_argument if `clp::ffi::ir_stream::Serializer<>::serialize_msgpack_map` + * returns on failure. + */ + [[nodiscard]] auto add_message(std::string_view message) + -> ystdlib::error_handling::Result; + + /** + * Closes and flushes the serialized output. + */ + void close() { + flush_buffer(); + m_writer.write_numeric_value(clp::ffi::ir_stream::cProtocol::Eof); + m_writer.close(); + } + +private: + // Constants + static constexpr std::string_view cOriginalFileMetadataKey{"original_file"}; + static constexpr std::string_view cTimestampKey{"timestamp"}; + static constexpr std::string_view cMessageKey{"message"}; + static constexpr size_t cMaxIrBufSize{64ULL * 1024ULL}; // 64 KiB + + // Constructors + explicit LogSerializer( + clp::ffi::ir_stream::Serializer&& serializer, + clp_s::FileWriter&& writer + ) + : m_serializer{std::move(serializer)}, + m_writer{std::move(writer)} {} + + // Methods + /** + * Flushes the buffer from the serializer to the output file. + */ + void flush_buffer() { + auto const buffer{m_serializer.get_ir_buf_view()}; + m_writer.write( + clp::size_checked_pointer_cast(buffer.data()), + buffer.size_bytes() + ); + m_serializer.clear_ir_buf(); + } + + clp::ffi::ir_stream::Serializer m_serializer; + clp_s::FileWriter m_writer; +}; +} // namespace clp_s::log_converter + +#endif // CLP_S_LOG_CONVERTER_LOGSERIALIZER_HPP diff --git a/components/core/src/clp_s/log_converter/log_converter.cpp b/components/core/src/clp_s/log_converter/log_converter.cpp new file mode 100644 index 0000000000..108383b287 --- /dev/null +++ b/components/core/src/clp_s/log_converter/log_converter.cpp @@ -0,0 +1,143 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "../../clp/NetworkReader.hpp" +#include "../../clp/ReaderInterface.hpp" +#include "../InputConfig.hpp" +#include "CommandLineArguments.hpp" +#include "LogConverter.hpp" + +using clp_s::log_converter::CommandLineArguments; +using clp_s::log_converter::LogConverter; + +namespace { +/** + * Checks for and logs CURL errors on a reader. + * @param path The path that the reader has opened. + * @param reader The open reader which may have experienced a CURL error. + * @return Whether a CURL error has occurred on the reader. + */ +[[nodiscard]] auto +check_and_log_curl_error(clp_s::Path const& path, clp::ReaderInterface const* reader) -> bool; + +/** + * Converts all files according to the command line arguments. + * @param command_line_arguments + * @return Whether conversion was successful. + */ +[[nodiscard]] auto convert_files(CommandLineArguments const& command_line_arguments) -> bool; + +auto check_and_log_curl_error(clp_s::Path const& path, clp::ReaderInterface const* reader) -> bool { + auto const* network_reader{dynamic_cast(reader)}; + if (nullptr == network_reader) { + return false; + } + if (auto const rc = network_reader->get_curl_ret_code(); + rc.has_value() && CURLcode::CURLE_OK != rc.value()) + { + auto const curl_error_message = network_reader->get_curl_error_msg(); + SPDLOG_ERROR( + "Encountered curl error while converting {} - Code: {} - Message: {}", + path.path, + static_cast(rc.value()), + curl_error_message.value_or("Unknown error.") + ); + return true; + } + return false; +} + +auto convert_files(CommandLineArguments const& command_line_arguments) -> bool { + LogConverter log_converter; + + std::error_code ec{}; + if (false == std::filesystem::create_directory(command_line_arguments.get_output_dir(), ec) + && ec) + { + SPDLOG_ERROR( + "Can not create output directory {} - {}", + command_line_arguments.get_output_dir(), + ec.message() + ); + return false; + } + + for (auto const& path : command_line_arguments.get_input_paths()) { + auto reader{clp_s::try_create_reader(path, command_line_arguments.get_network_auth())}; + if (nullptr == reader) { + SPDLOG_ERROR("Failed to open input {} for reading.", path.path); + return false; + } + + auto [nested_readers, file_type] = clp_s::try_deduce_reader_type(reader); + switch (file_type) { + case clp_s::FileType::LogText: + break; + case clp_s::FileType::Json: + case clp_s::FileType::KeyValueIr: + case clp_s::FileType::Zstd: + case clp_s::FileType::Unknown: + default: { + std::ignore = check_and_log_curl_error(path, reader.get()); + SPDLOG_ERROR("Received input that was not unstructured logtext: {}.", path.path); + return false; + } + } + + auto const convert_result{log_converter.convert_file( + path, + nested_readers.back().get(), + command_line_arguments.get_output_dir() + )}; + if (convert_result.has_error()) { + auto const& error{convert_result.error()}; + SPDLOG_ERROR( + "Failed to convert input {} to structured representation: {} - {}", + path.path, + error.category().name(), + error.message() + ); + return false; + } + } + + return true; +} +} // namespace + +auto main(int argc, char const** argv) -> int { + try { + auto stderr_logger = spdlog::stderr_logger_st("stderr"); + spdlog::set_default_logger(stderr_logger); + spdlog::set_pattern("%Y-%m-%dT%H:%M:%S.%e%z [%l] %v"); + } catch (std::exception& e) { + // NOTE: We can't log an exception if the logger couldn't be constructed + return 1; + } + + CommandLineArguments command_line_arguments{"log-converter"}; + + auto const parsing_result{command_line_arguments.parse_arguments(argc, argv)}; + switch (parsing_result) { + case CommandLineArguments::ParsingResult::Success: + break; + case CommandLineArguments::ParsingResult::InfoCommand: + return 0; + case CommandLineArguments::ParsingResult::Failure: + default: + return 1; + } + + if (false == convert_files(command_line_arguments)) { + return 1; + } + return 0; +} diff --git a/components/core/tests/test-ParserWithUserSchema.cpp b/components/core/tests/test-ParserWithUserSchema.cpp index 8232fd16f7..3ab9bfad72 100644 --- a/components/core/tests/test-ParserWithUserSchema.cpp +++ b/components/core/tests/test-ParserWithUserSchema.cpp @@ -121,7 +121,7 @@ TEST_CASE("Test error for colon missing schema file", "[LALR1Parser][SchemaParse auto const file_path = get_test_schema_files_dir() / "colon_missing_schema.txt"; REQUIRE_THROWS_WITH( generate_schema_ast(file_path.string()), - "Schema:3:4: error: expected '>',':','AlphaNumeric' before ' ' token\n" + "Schema:3:4: error: expected '>',':','IdentifierCharacters' before ' ' token\n" " int [0-9]+\n" " ^\n" ); diff --git a/components/package-template/src/sbin/compress-from-s3.sh b/components/package-template/src/sbin/compress-from-s3.sh new file mode 100755 index 0000000000..7d3bc8e455 --- /dev/null +++ b/components/package-template/src/sbin/compress-from-s3.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +package_root="$script_dir/.." + +PYTHONPATH=$(readlink -f "$package_root/lib/python3/site-packages") \ + python3 \ + -m clp_package_utils.scripts.compress_from_s3 \ + "$@" diff --git a/docs/src/dev-docs/components-core/index.md b/docs/src/dev-docs/components-core/index.md index be2b00c85d..d7f28708b9 100644 --- a/docs/src/dev-docs/components-core/index.md +++ b/docs/src/dev-docs/components-core/index.md @@ -45,7 +45,7 @@ The task will download, build, and install (within the build directory) the foll | [date](https://github.com/HowardHinnant/date) | v3.0.1 | | [fmt](https://github.com/fmtlib/fmt) | v11.2.0 | | [liblzma](https://github.com/tukaani-project/xz) | v5.8.1 | -| [log-surgeon](https://github.com/y-scope/log-surgeon) | a722d07 | +| [log-surgeon](https://github.com/y-scope/log-surgeon) | 840f262 | | [lz4](https://github.com/lz4/lz4) | v1.10.0 | | [microsoft.gsl](https://github.com/microsoft/GSL) | v4.0.0 | | [mongo-cxx-driver](https://github.com/mongodb/mongo-cxx-driver) | r4.1.1 | diff --git a/docs/src/dev-docs/tooling-gh-workflows.md b/docs/src/dev-docs/tooling-gh-workflows.md index 4d52694b3e..25491c9073 100644 --- a/docs/src/dev-docs/tooling-gh-workflows.md +++ b/docs/src/dev-docs/tooling-gh-workflows.md @@ -91,3 +91,7 @@ This workflow builds CLP-core on macOS and runs its unit tests. ## clp-lint This workflow runs linting checks on the codebase. + +## clp-uv-checks + +This workflow checks whether each UV Python project's lockfile matches the project metadata. diff --git a/docs/src/user-docs/guides-using-presto.md b/docs/src/user-docs/guides-using-presto.md index 87df924dca..1e73e40e3d 100644 --- a/docs/src/user-docs/guides-using-presto.md +++ b/docs/src/user-docs/guides-using-presto.md @@ -43,23 +43,6 @@ Using Presto with CLP requires: query_engine: "presto" ``` - * Set the `database.host` key to a non-localhost hostname/IP. - - ```yaml - database: - # type: "mariadb" - host: "" - # port: 3306 - # name: "clp-db" - ``` - - :::{note} - This change is necessary because the Presto containers run on a Docker network, and CLP's - database runs on the host network. `localhost` will refer to a different entity in each of - those contexts. This limitation will be addressed in the future when we unify Presto and CLP's - deployment infrastructure. - ::: - * Set the `results_cache.retention_period` key to `null` since the CLP + Presto integration doesn't yet support garbage collection. diff --git a/docs/src/user-docs/quick-start/clp-json.md b/docs/src/user-docs/quick-start/clp-json.md index 7f080a3d87..a2f0f67567 100644 --- a/docs/src/user-docs/quick-start/clp-json.md +++ b/docs/src/user-docs/quick-start/clp-json.md @@ -17,6 +17,11 @@ To start CLP, run: sbin/start-clp.sh ``` +:::{tip} +To validate configuration and prepare directories without launching services, add the +`--setup-only` flag (e.g., `sbin/start-clp.sh --setup-only`). +::: + :::{note} If CLP fails to start (e.g., due to a port conflict), try adjusting the settings in `etc/clp-config.yml` and then run the start command again. diff --git a/docs/src/user-docs/quick-start/clp-text.md b/docs/src/user-docs/quick-start/clp-text.md index 89af511003..bc5d51719e 100644 --- a/docs/src/user-docs/quick-start/clp-text.md +++ b/docs/src/user-docs/quick-start/clp-text.md @@ -19,6 +19,11 @@ To start CLP, run: sbin/start-clp.sh ``` +:::{tip} +To validate configuration and prepare directories without launching services, add the +`--setup-only` flag (e.g., `sbin/start-clp.sh --setup-only`). +::: + :::{note} If CLP fails to start (e.g., due to a port conflict), try adjusting the settings in `etc/clp-config.yml` and then run the start command again. diff --git a/integration-tests/tests/utils/config.py b/integration-tests/tests/utils/config.py index f663e4ca33..9ef6c101e5 100644 --- a/integration-tests/tests/utils/config.py +++ b/integration-tests/tests/utils/config.py @@ -27,7 +27,15 @@ def __post_init__(self) -> None: validate_dir_exists(clp_core_bins_dir) # Check for required CLP core binaries - required_binaries = ["clg", "clo", "clp", "clp-s", "indexer", "reducer-server"] + required_binaries = [ + "clg", + "clo", + "clp", + "clp-s", + "indexer", + "log-converter", + "reducer-server", + ] missing_binaries = [b for b in required_binaries if not (clp_core_bins_dir / b).is_file()] if len(missing_binaries) > 0: err_msg = ( diff --git a/taskfile.yaml b/taskfile.yaml index ef0ad2aa9b..e1a43e77a5 100644 --- a/taskfile.yaml +++ b/taskfile.yaml @@ -114,6 +114,7 @@ tasks: - "{{.G_CORE_COMPONENT_BUILD_DIR}}/clp" - "{{.G_CORE_COMPONENT_BUILD_DIR}}/clp-s" - "{{.G_CORE_COMPONENT_BUILD_DIR}}/indexer" + - "{{.G_CORE_COMPONENT_BUILD_DIR}}/log-converter" - "{{.G_CORE_COMPONENT_BUILD_DIR}}/reducer-server" - "{{.G_SPIDER_BUILD_DIR}}/spider-build/src/spider/spider_scheduler" - "{{.G_SPIDER_BUILD_DIR}}/spider-build/src/spider/spider_worker" @@ -159,6 +160,7 @@ tasks: "{{.G_CORE_COMPONENT_BUILD_DIR}}/clp" "{{.G_CORE_COMPONENT_BUILD_DIR}}/clp-s" "{{.G_CORE_COMPONENT_BUILD_DIR}}/indexer" + "{{.G_CORE_COMPONENT_BUILD_DIR}}/log-converter" "{{.G_CORE_COMPONENT_BUILD_DIR}}/reducer-server" "{{.G_SPIDER_BUILD_DIR}}/spider-build/src/spider/spider_scheduler" "{{.G_SPIDER_BUILD_DIR}}/spider-build/src/spider/spider_worker" @@ -218,13 +220,14 @@ tasks: - "{{.G_CORE_COMPONENT_BUILD_DIR}}/clp" - "{{.G_CORE_COMPONENT_BUILD_DIR}}/clp-s" - "{{.G_CORE_COMPONENT_BUILD_DIR}}/indexer" + - "{{.G_CORE_COMPONENT_BUILD_DIR}}/log-converter" - "{{.G_CORE_COMPONENT_BUILD_DIR}}/reducer-server" cmds: - task: "utils:cmake:build" vars: BUILD_DIR: "{{.G_CORE_COMPONENT_BUILD_DIR}}" JOBS: "{{.G_CPP_MAX_PARALLELISM_PER_BUILD_TASK}}" - TARGETS: ["clg", "clo", "clp", "clp-s", "indexer", "reducer-server"] + TARGETS: ["clg", "clo", "clp", "clp-s", "indexer", "log-converter", "reducer-server"] clp-mcp-server: - task: "uv-component" diff --git a/taskfiles/deps/lock.yaml b/taskfiles/deps/lock.yaml index bf656e9c11..651bfd87e6 100644 --- a/taskfiles/deps/lock.yaml +++ b/taskfiles/deps/lock.yaml @@ -10,12 +10,33 @@ tasks: vars: CARGO_FETCH_FLAGS: "--locked" + check-uv: + cmds: + - for: &uv-projects + - "{{.G_COMPONENTS_DIR}}/clp-mcp-server" + - "{{.G_COMPONENTS_DIR}}/clp-package-utils" + - "{{.G_COMPONENTS_DIR}}/clp-py-utils" + - "{{.G_COMPONENTS_DIR}}/job-orchestration" + - "{{.G_INTEGRATION_TESTS_DIR}}" + task: "uv-lock" + vars: + PROJECT_DIR: "{{.ITEM}}" + UV_LOCK_FLAGS: "--check" + update-rust: cmds: - task: "cargo-workspace-update" vars: CARGO_UPDATE_FLAGS: "" + update-uv: + cmds: + - for: *uv-projects + task: "uv-lock" + vars: + PROJECT_DIR: "{{.ITEM}}" + UV_LOCK_FLAGS: "" + # Runs `cargo fetch` in the root Rust workspace directory with the specified flags. # # @param {string} CARGO_FETCH_FLAGS The flags to pass to the `cargo fetch` command. @@ -41,3 +62,15 @@ tasks: cmd: |- . "$HOME/.cargo/env" cargo update {{.CARGO_UPDATE_FLAGS}} + + # Runs `uv lock` in the UV project directory with the specified flags. + # + # @param {string} PROJECT_DIR + # @param {string} UV_LOCK_FLAGS The flags to pass to the `uv lock` command. + uv-lock: + internal: true + requires: + vars: ["PROJECT_DIR", "UV_LOCK_FLAGS"] + dir: "{{.PROJECT_DIR}}" + cmd: |- + uv lock {{.UV_LOCK_FLAGS}} diff --git a/taskfiles/deps/main.yaml b/taskfiles/deps/main.yaml index bbfbc71007..e1b5d762c9 100644 --- a/taskfiles/deps/main.yaml +++ b/taskfiles/deps/main.yaml @@ -371,8 +371,8 @@ tasks: - "-DCMAKE_INSTALL_MESSAGE=LAZY" - "-Dlog_surgeon_BUILD_TESTING=OFF" LIB_NAME: "log_surgeon" - TARBALL_SHA256: "69a99e0804a52c6b6397c5e7eabecc9bb4915d0145632c66fc63ad13678ff56a" - TARBALL_URL: "https://github.com/y-scope/log-surgeon/archive/a722d07.tar.gz" + TARBALL_SHA256: "4551ea50cd22e8423770fd66a167e1c86053b1f4957f72c582a2da93e7820210" + TARBALL_URL: "https://github.com/y-scope/log-surgeon/archive/840f262.tar.gz" lz4: internal: true @@ -406,6 +406,7 @@ tasks: - task: "utils:install-remote-cmake-lib" vars: CMAKE_GEN_ARGS: + - "-DCMAKE_BUILD_TYPE=Release" - "-DUSE_SYSTEM_INSTALLED_LIB=ON" - "-DINSTALL_LAYOUT=DEB" LIB_NAME: "mariadb-connector-cpp" diff --git a/tools/deployment/presto-clp/.gitignore b/tools/deployment/presto-clp/.gitignore index 50e6da86f2..64f531d7ca 100644 --- a/tools/deployment/presto-clp/.gitignore +++ b/tools/deployment/presto-clp/.gitignore @@ -1 +1,3 @@ +# Generated config files +/.env /worker/config-template/clp.properties diff --git a/tools/deployment/presto-clp/docker-compose.yaml b/tools/deployment/presto-clp/docker-compose.yaml index dd21696c55..9948f8d0aa 100644 --- a/tools/deployment/presto-clp/docker-compose.yaml +++ b/tools/deployment/presto-clp/docker-compose.yaml @@ -11,7 +11,7 @@ services: - "./coordinator/scripts:/scripts:ro" - "coordinator-config:/opt/presto-server/etc" networks: - - "presto" + - "clp-package" ports: - "8889:8080" healthcheck: @@ -34,17 +34,22 @@ services: - "coordinator-common.env" - "worker.env" volumes: + - "${CLP_ARCHIVES_DIR:-empty}:/var/data/archives:ro" + - "${CLP_STAGED_ARCHIVES_DIR:-empty}:/var/data/staged-archives:ro" - "./worker/config-template:/configs:ro" - "./worker/scripts:/scripts:ro" - - "${CLP_ARCHIVES_DIR}:${CLP_ARCHIVES_DIR}" - "worker-config:/opt/presto-server/etc" networks: - - "presto" + - "clp-package" volumes: + # Dummy volume to use when a bind mount is not desired. + empty: + coordinator-config: worker-config: networks: - presto: - driver: "bridge" + clp-package: + name: "${CLP_PACKAGE_NETWORK_NAME:-clp-package_default}" + external: true diff --git a/tools/deployment/presto-clp/scripts/init.py b/tools/deployment/presto-clp/scripts/init.py index c27d5f140b..53238ee095 100644 --- a/tools/deployment/presto-clp/scripts/init.py +++ b/tools/deployment/presto-clp/scripts/init.py @@ -7,6 +7,11 @@ import yaml from dotenv import dotenv_values +# Database endpoint inside the CLP Package Docker network. Must match the constants defined in +# `components/clp-py-utils/clp_py_utils/clp_config.py`. +DATABASE_COMPONENT_NAME = "database" +DATABASE_DEFAULT_PORT = 3306 + # Set up console logging logging_console_handler = logging.StreamHandler() logging_formatter = logging.Formatter( @@ -101,11 +106,9 @@ def _add_clp_env_vars( ) return False - database_host = _get_config_value(clp_config, "database.host", "localhost") - database_port = _get_config_value(clp_config, "database.port", 3306) database_name = _get_config_value(clp_config, "database.name", "clp-db") env_vars["PRESTO_COORDINATOR_CLPPROPERTIES_METADATA_DATABASE_URL"] = ( - f"jdbc:mysql://{database_host}:{database_port}" + f"jdbc:mysql://{DATABASE_COMPONENT_NAME}:{DATABASE_DEFAULT_PORT}" ) env_vars["PRESTO_COORDINATOR_CLPPROPERTIES_METADATA_DATABASE_NAME"] = database_name @@ -124,7 +127,7 @@ def _add_clp_env_vars( ) ) elif "s3" == clp_archive_output_storage_type: - env_vars["CLP_ARCHIVES_DIR"] = str( + env_vars["CLP_STAGED_ARCHIVES_DIR"] = str( _get_path_clp_config_value( clp_config, f"{archive_output_storage_key}.staging_directory", @@ -163,6 +166,11 @@ def _add_clp_env_vars( env_vars["PRESTO_COORDINATOR_CLPPROPERTIES_METADATA_DATABASE_USER"] = database_user env_vars["PRESTO_COORDINATOR_CLPPROPERTIES_METADATA_DATABASE_PASSWORD"] = database_password + instance_id = _get_clp_package_instance_id(clp_config, clp_package_dir) + if instance_id is None: + return False + env_vars["CLP_PACKAGE_NETWORK_NAME"] = f"clp-package-{instance_id}_default" + return True @@ -274,6 +282,46 @@ def _generate_worker_clp_properties( return True +def _get_clp_package_instance_id( + clp_config: Dict[str, Any], clp_package_dir: Path +) -> Optional[str]: + """ + Retrieves the CLP package instance ID from the logs directory. + + :param clp_config: + :param clp_package_dir: + :return: The instance ID if it could be read, otherwise `None`. + """ + + logs_directory = _get_path_clp_config_value( + clp_config, "logs_directory", Path("var") / "log", clp_package_dir + ) + instance_id_path = logs_directory / "instance-id" + if not instance_id_path.exists(): + logger.error( + "Cannot determine the CLP package Docker network because '%s' does not exist." + " Start the CLP package at least once before configuring Presto.", + instance_id_path, + ) + return None + + try: + instance_id = instance_id_path.read_text(encoding="utf-8").strip() + except OSError: + logger.exception("Failed to read the CLP package instance ID from '%s'.", instance_id_path) + return None + + if not instance_id: + logger.error( + "Instance ID file '%s' is empty. Restart the CLP package to regenerate the instance" + " ID.", + instance_id_path, + ) + return None + + return instance_id + + def _get_path_clp_config_value( clp_config: Dict[str, Any], key: str, default_value: Path, clp_package_dir: Path ) -> Path: diff --git a/tools/docker-images/clp-package/Dockerfile b/tools/docker-images/clp-package/Dockerfile index 35a047b5fe..6728a5f0c9 100644 --- a/tools/docker-images/clp-package/Dockerfile +++ b/tools/docker-images/clp-package/Dockerfile @@ -18,7 +18,8 @@ COPY --link --from=base / / ARG UID=1000 ENV CLP_HOME="/opt/clp" -ENV PATH="${CLP_HOME}/sbin:${CLP_HOME}/bin:${PATH}" \ +ENV LD_LIBRARY_PATH="${CLP_HOME}/lib" \ + PATH="${CLP_HOME}/sbin:${CLP_HOME}/bin:${PATH}" \ PYTHONPATH="${CLP_HOME}/lib/python3/site-packages" \ USER="clp-user" @@ -27,3 +28,5 @@ USER ${USER} WORKDIR ${CLP_HOME} COPY --link --chown=${UID} ./build/clp-package ${CLP_HOME} +COPY --link --chown=${UID} ./build/deps/cpp/mariadb-connector-cpp-install/lib/*/libmariadbcpp.so* \ + ${CLP_HOME}/lib/