diff --git a/components/clp-rust-utils/src/job_config/clp_io_config.rs b/components/clp-rust-utils/src/job_config/clp_io_config.rs index 2e297713de..d032392996 100644 --- a/components/clp-rust-utils/src/job_config/clp_io_config.rs +++ b/components/clp-rust-utils/src/job_config/clp_io_config.rs @@ -1,7 +1,7 @@ use non_empty_string::NonEmptyString; use serde::Serialize; -use crate::clp_config::S3Config; +use crate::{clp_config::S3Config, s3::S3ObjectMetadataId}; /// Represents CLP IO config. #[derive(Debug, Clone, PartialEq, Eq, Serialize)] @@ -10,14 +10,17 @@ pub struct ClpIoConfig { pub output: OutputConfig, } +/// Represents the unique identifier for an ingestion job in CLP DB. +pub type IngestionJobId = u64; + /// An enum representing CLP input config. #[derive(Debug, Clone, PartialEq, Eq, Serialize)] #[serde(tag = "type")] pub enum InputConfig { - #[serde(rename = "s3")] - S3InputConfig { + #[serde(rename = "s3_object_metadata")] + S3ObjectMetadataInputConfig { #[serde(flatten)] - config: S3InputConfig, + config: S3ObjectMetadataInputConfig, }, } @@ -33,6 +36,18 @@ pub struct S3InputConfig { pub unstructured: bool, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct S3ObjectMetadataInputConfig { + #[serde(flatten)] + pub s3_config: S3Config, + + pub ingestion_job_id: IngestionJobId, + pub s3_object_metadata_ids: Option>, + pub dataset: Option, + pub timestamp_key: Option, + pub unstructured: bool, +} + /// Represents CLP output config. #[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct OutputConfig { diff --git a/components/clp-rust-utils/src/s3.rs b/components/clp-rust-utils/src/s3.rs index 8036ee3db0..6e9be865ec 100644 --- a/components/clp-rust-utils/src/s3.rs +++ b/components/clp-rust-utils/src/s3.rs @@ -11,5 +11,4 @@ pub struct ObjectMetadata { pub bucket: NonEmptyString, pub key: NonEmptyString, pub size: u64, - pub id: Option, } diff --git a/components/clp-rust-utils/tests/clp_config_test.rs b/components/clp-rust-utils/tests/clp_config_test.rs index 681b8ac88f..57f6e76186 100644 --- a/components/clp-rust-utils/tests/clp_config_test.rs +++ b/components/clp-rust-utils/tests/clp_config_test.rs @@ -1,6 +1,6 @@ use clp_rust_utils::{ clp_config::{AwsAuthentication, AwsCredentials, S3Config}, - job_config::{ClpIoConfig, InputConfig, OutputConfig, S3InputConfig}, + job_config::{ClpIoConfig, InputConfig, OutputConfig, S3ObjectMetadataInputConfig}, serde::BrotliMsgpack, types::non_empty_string::ExpectedNonEmpty, }; @@ -22,10 +22,11 @@ fn test_clp_io_config_serialization() { }, }; let config = ClpIoConfig { - input: InputConfig::S3InputConfig { - config: S3InputConfig { + input: InputConfig::S3ObjectMetadataInputConfig { + config: S3ObjectMetadataInputConfig { s3_config, - keys: None, + ingestion_job_id: 1, + s3_object_metadata_ids: None, dataset: Some(NonEmptyString::from_static_str("test-dataset")), timestamp_key: Some(NonEmptyString::from_static_str("timestamp")), unstructured: false, @@ -42,12 +43,17 @@ fn test_clp_io_config_serialization() { let brotli_compressed_msgpack = BrotliMsgpack::serialize(&config) .expect("Brotli-compressed MessagePack serialized config."); - let expected = "1ba80100e4ffdf9f43284b650e496850ba5f98ef7b53044b04d074faa66eb23ebef25c0d1a13ac\ - 4b17669ac5cbe254cffc4e749bb4c455d2f61c988e679697db09ba171ce575912c9a14986d0e05f87e4dd1babca\ - 9d5b5d1c5267067deea8cacf9928cf673f1a75a627945e06c7cbe6c6b21d98945e16df94ba0d5fce7c2d61158ca\ - 6541545e74dd2e9e6e790fd24191a86c1efb597b4f9e82aadfaab0f7f87c81440262811c44fc671677003b39270\ - 2be7820095035db7be96769d799a348764444621df0fb4382c2fdba353b5a24ae147e0ba5f3def513120b64bf01\ - aa2df16f31180cafc98d35a4544544c0267abd69708e6ce25962baa581e1cc970a7a2301b070ca0c"; + + let expected = "1bdc0100c4aa350b081365c242113820d5873cbb21492ac6023d91a3f\ + 23f7593f5f195c76e4c5a03e42fdb9869162f8b4349b77947a7a45a53e95a85ab8362b8db53\ + 5eba172ae2a7000dce0626b90c8f12976d951d179058379f0af6f585537abbfd7\ + 4e16aab0b60697d67cb483bbe4ed93ef912f9a26c2852467682cc92d89dea296f\ + cd9f8247cecf8b3823881cdb84997211e8d566c29bcc49daf198293a2447ab7b\ + 3015a2be9bca7da952a9f57ae8d42668559f51dd40ede0b957ab746b7da8e64f\ + a6f792ff1fbe07cb97602af18f731b3015fa39495352d91d5389a5877bdf5828\ + 847b99c51df7c04e95bf6375c4dbd78f4b6c24e809642ed5ff542c16fb5a5cd\ + 1495253250d3841af440db6c9ca9e39a66d0034cfb2efd6083a204eb64a02"; + assert_eq!(expected, hex::encode(brotli_compressed_msgpack)); let json_serialized_result = serde_json::to_string_pretty(&config); @@ -55,7 +61,7 @@ fn test_clp_io_config_serialization() { let json_serialized = json_serialized_result.unwrap(); let expected = serde_json::json!({ "input": { - "type": "s3", + "type": "s3_object_metadata", "bucket": "yscope", "region_code": "us-east-2", "key_prefix": "sample-logs/cockroachdb.clp.zst", @@ -67,7 +73,8 @@ fn test_clp_io_config_serialization() { "secret_access_key": "SECRET_ACCESS_KEY" } }, - "keys": null, + "ingestion_job_id": 1, + "s3_object_metadata_ids": null, "dataset": "test-dataset", "timestamp_key": "timestamp", "unstructured": false diff --git a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py index 0d03542472..dcdca505cb 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py @@ -38,8 +38,10 @@ InputType, PathsToCompress, S3InputConfig, + S3ObjectMetadataInputConfig, ) from job_orchestration.scheduler.task_result import CompressionTaskResult +from job_orchestration.scheduler.utils import is_s3_based_input def update_compression_task_metadata(db_cursor, task_id, kv): @@ -137,7 +139,7 @@ def _generate_fs_logs_list( def _generate_s3_logs_list( output_file_path: pathlib.Path, paths_to_compress: PathsToCompress, - s3_input_config: S3InputConfig, + s3_input_config: S3InputConfig | S3ObjectMetadataInputConfig, ) -> None: # S3 object keys are stored as file_paths in `PathsToCompress` object_keys = paths_to_compress.file_paths @@ -271,7 +273,7 @@ def _make_clp_s_command_and_env( # fmt: on compression_env_vars = dict(os.environ) - if InputType.S3 == clp_config.input.type and not clp_config.input.unstructured: + if is_s3_based_input(clp_config.input.type) and not clp_config.input.unstructured: compression_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication)) compression_cmd.append("--auth") compression_cmd.append("s3") @@ -313,7 +315,7 @@ def _make_log_converter_command_and_env( # fmt: on conversion_env_vars = dict(os.environ) - if InputType.S3 == clp_config.input.type: + if is_s3_based_input(clp_config.input.type): conversion_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication)) conversion_cmd.append("--auth") conversion_cmd.append("s3") @@ -392,7 +394,7 @@ def run_clp( logs_list_path = tmp_dir / f"{instance_id_str}-log-paths.txt" if InputType.FS == input_type: _generate_fs_logs_list(logs_list_path, paths_to_compress) - elif InputType.S3 == input_type: + elif is_s3_based_input(input_type): _generate_s3_logs_list(logs_list_path, paths_to_compress, clp_config.input) else: error_msg = f"Unsupported input type: {input_type}." diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index 4975fe5f0d..009aca7f6d 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -26,7 +26,10 @@ fetch_existing_datasets, ) from clp_py_utils.compression import validate_path_and_get_info -from clp_py_utils.core import read_yaml_config_file +from clp_py_utils.core import ( + FileMetadata, + read_yaml_config_file, +) from clp_py_utils.s3_utils import s3_get_object_metadata from clp_py_utils.sql_adapter import SqlAdapter from pydantic import ValidationError @@ -38,6 +41,7 @@ from job_orchestration.scheduler.constants import ( CompressionJobStatus, CompressionTaskStatus, + INGESTED_S3_OBJECT_METADATA_TABLE_NAME, SchedulerType, ) from job_orchestration.scheduler.job_config import ( @@ -45,6 +49,7 @@ FsInputConfig, InputType, S3InputConfig, + S3ObjectMetadataInputConfig, ) from job_orchestration.scheduler.scheduler_data import ( CompressionJob, @@ -183,6 +188,58 @@ def _process_s3_input( paths_to_compress_buffer.add_file(object_metadata) +def _process_s3_object_metadata_input( + s3_object_metadata_input_config: S3ObjectMetadataInputConfig, + paths_to_compress_buffer: PathsToCompressBuffer, + db_context: DbContext, +) -> None: + """ + Fetches S3 object metadata rows from the `INGESTED_S3_OBJECT_METADATA_TABLE_NAME` table for the + given `s3_object_metadata_ids` and `ingestion_job_id`, and adds the metadata to + `paths_to_compress_buffer`. + + :param s3_object_metadata_input_config: + :param paths_to_compress_buffer: + :param db_context: + :raises RuntimeError: If no rows are found, or if any requested metadata_id is missing. + """ + s3_object_metadata_ids = s3_object_metadata_input_config.s3_object_metadata_ids + ingestion_job_id = s3_object_metadata_input_config.ingestion_job_id + + placeholders = ", ".join(["%s"] * len(s3_object_metadata_ids)) + query = ( + f"SELECT `id`, `key`, `size` FROM {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} " + f"WHERE `id` IN ({placeholders}) AND `ingestion_job_id` = %s" + ) + params = (*s3_object_metadata_ids, ingestion_job_id) + db_context.cursor.execute(query, params) + metadata_list = db_context.cursor.fetchall() + if len(metadata_list) == 0: + raise RuntimeError( + f"No rows found in {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} for the given " + f"s3_object_metadata_ids and ingestion_job_id {ingestion_job_id}." + ) + + # Validate that all requested IDs are present. + returned_ids = {row["id"] for row in metadata_list} + requested_ids = set(s3_object_metadata_ids) + missing_ids = requested_ids - returned_ids + if len(missing_ids) > 0: + raise RuntimeError( + f"Missing metadata rows in {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} for " + f"ingestion_job_id {ingestion_job_id}: {sorted(missing_ids)}." + ) + + for metadata in metadata_list: + if not metadata["key"].startswith(s3_object_metadata_input_config.key_prefix): + raise RuntimeError( + f"Metadata key {metadata['key']} does not start with the key prefix " + f"{s3_object_metadata_input_config.key_prefix}." + ) + file_metadata = FileMetadata(path=Path(metadata["key"]), size=int(metadata["size"])) + paths_to_compress_buffer.add_file(file_metadata) + + def _write_user_failure_log( title: str, content: list[str], @@ -321,6 +378,22 @@ def search_and_schedule_new_tasks( }, ) return + elif input_type == InputType.S3_OBJECT_METADATA.value: + try: + _process_s3_object_metadata_input( + input_config, paths_to_compress_buffer, db_context + ) + except Exception as err: + logger.exception("Failed to process S3 object metadata input for job %s", job_id) + update_compression_job_metadata( + db_context, + job_id, + { + "status": CompressionJobStatus.FAILED, + "status_msg": f"S3 object metadata input failure: {err}", + }, + ) + return else: logger.error(f"Unsupported input type {input_type}") update_compression_job_metadata( diff --git a/components/job-orchestration/job_orchestration/scheduler/constants.py b/components/job-orchestration/job_orchestration/scheduler/constants.py index 4a66ab462e..c5bd6fdcb8 100644 --- a/components/job-orchestration/job_orchestration/scheduler/constants.py +++ b/components/job-orchestration/job_orchestration/scheduler/constants.py @@ -2,6 +2,8 @@ from enum import auto, IntEnum +INGESTED_S3_OBJECT_METADATA_TABLE_NAME = "ingested_s3_object_metadata" + TASK_QUEUE_LOWEST_PRIORITY = 1 TASK_QUEUE_HIGHEST_PRIORITY = 3 diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index d7ce48296f..535a6a7e5a 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -11,6 +11,7 @@ class InputType(LowercaseStrEnum): FS = auto() S3 = auto() + S3_OBJECT_METADATA = auto() class PathsToCompress(BaseModel): @@ -44,6 +45,24 @@ def validate_keys(cls, value): return value +class S3ObjectMetadataInputConfig(S3Config): + type: Literal[InputType.S3_OBJECT_METADATA.value] = InputType.S3_OBJECT_METADATA.value + ingestion_job_id: int + dataset: str | None = None + timestamp_key: str | None = None + unstructured: bool = False + s3_object_metadata_ids: list[int] + + @field_validator("s3_object_metadata_ids") + @classmethod + def validate_s3_object_metadata_ids(cls, value: list[int]) -> list[int]: + if len(value) == 0: + raise ValueError("s3_object_metadata_ids cannot be an empty list") + if len(value) != len(set(value)): + raise ValueError("s3_object_metadata_ids must be a list of unique IDs") + return value + + class OutputConfig(BaseModel): target_archive_size: int target_dictionaries_size: int @@ -53,7 +72,7 @@ class OutputConfig(BaseModel): class ClpIoConfig(BaseModel): - input: FsInputConfig | S3InputConfig + input: FsInputConfig | S3InputConfig | S3ObjectMetadataInputConfig output: OutputConfig diff --git a/components/job-orchestration/job_orchestration/scheduler/utils.py b/components/job-orchestration/job_orchestration/scheduler/utils.py index 3735113a33..4b526d798f 100644 --- a/components/job-orchestration/job_orchestration/scheduler/utils.py +++ b/components/job-orchestration/job_orchestration/scheduler/utils.py @@ -17,6 +17,11 @@ QueryTaskStatus, SchedulerType, ) +from job_orchestration.scheduler.job_config import InputType + + +def is_s3_based_input(input_type: InputType) -> bool: + return InputType.S3 == input_type or InputType.S3_OBJECT_METADATA == input_type def kill_hanging_jobs(sql_adapter: SqlAdapter, scheduler_type: str) -> list[int] | None: diff --git a/components/log-ingestor/src/compression/buffer.rs b/components/log-ingestor/src/compression/buffer.rs index 7786c38f94..74e20bb7e7 100644 --- a/components/log-ingestor/src/compression/buffer.rs +++ b/components/log-ingestor/src/compression/buffer.rs @@ -1,6 +1,14 @@ use anyhow::Result; use async_trait::async_trait; -use clp_rust_utils::s3::ObjectMetadata; +use clp_rust_utils::s3::S3ObjectMetadataId; +use sqlx::FromRow; + +/// One entry of data fed into the compression buffer +#[derive(Debug, Clone, PartialEq, Eq, FromRow)] +pub struct CompressionBufferEntry { + pub id: S3ObjectMetadataId, + pub size: u64, +} #[async_trait] /// A trait for submitting buffered object metadata for processing. @@ -14,10 +22,11 @@ pub trait BufferSubmitter { /// # Errors: /// /// Returns an [`anyhow::Error`] on failure. - async fn submit(&self, buffer: &[ObjectMetadata]) -> Result<()>; + async fn submit(&self, buffer: &[S3ObjectMetadataId]) -> Result<()>; } -/// A buffer that accumulates object metadata and submits it when a size threshold is reached. +/// A buffer that accumulates object metadata IDs and a running total size, and submits when the +/// size threshold is reached. /// /// # Type Parameters: /// @@ -25,7 +34,7 @@ pub trait BufferSubmitter { /// data. pub struct Buffer { submitter: Submitter, - buf: Vec, + buf: Vec, total_size: u64, size_threshold: u64, } @@ -45,7 +54,8 @@ impl Buffer { } } - /// Adds object metadata to the buffer and submits if the size threshold is reached. + /// Adds [`CompressionBufferEntry`] values to the buffer and submits if the size threshold is + /// reached. /// /// # Returns /// @@ -56,10 +66,13 @@ impl Buffer { /// Returns an error if: /// /// * Forwards [`Self::submit`]'s return values on failure. - pub async fn add(&mut self, object_metadata_to_ingest: Vec) -> Result<()> { - for object_metadata in object_metadata_to_ingest { - self.total_size += object_metadata.size; - self.buf.push(object_metadata); + pub async fn add( + &mut self, + object_metadata_to_ingest: Vec, + ) -> Result<()> { + for ref_ in object_metadata_to_ingest { + self.total_size += ref_.size; + self.buf.push(ref_.id); if self.total_size >= self.size_threshold { self.submit().await?; diff --git a/components/log-ingestor/src/compression/compression_job_submitter.rs b/components/log-ingestor/src/compression/compression_job_submitter.rs index fd87ae715c..831e0f0b08 100644 --- a/components/log-ingestor/src/compression/compression_job_submitter.rs +++ b/components/log-ingestor/src/compression/compression_job_submitter.rs @@ -10,14 +10,14 @@ use clp_rust_utils::{ ClpIoConfig, CompressionJobId, CompressionJobStatus, + IngestionJobId, InputConfig, OutputConfig, - S3InputConfig, + S3ObjectMetadataInputConfig, ingestion::s3::BaseConfig, }, - s3::{ObjectMetadata, S3ObjectMetadataId}, + s3::S3ObjectMetadataId, }; -use non_empty_string::NonEmptyString; use crate::{compression::BufferSubmitter, ingestion_job_manager::ClpCompressionState}; @@ -32,22 +32,13 @@ pub struct CompressionJobSubmitter { #[async_trait] impl BufferSubmitter for CompressionJobSubmitter { - async fn submit(&self, buffer: &[ObjectMetadata]) -> Result<()> { - let id_and_key_pairs = buffer - .iter() - .map(|object_metadata| { - ( - object_metadata.id.expect("the ingestion ID must be set"), - object_metadata.key.clone(), - ) - }) - .collect(); + async fn submit(&self, buffer: &[S3ObjectMetadataId]) -> Result<()> { let io_config_template = self.io_config_template.clone(); let state = self.state.clone(); tokio::spawn(submit_clp_compression_job_and_wait_for_completion( state, io_config_template, - id_and_key_pairs, + buffer.to_vec(), )); Ok(()) } @@ -66,8 +57,9 @@ impl CompressionJobSubmitter { aws_authentication: AwsAuthentication, archive_output_config: &ArchiveOutput, ingestion_job_config: &BaseConfig, + ingestion_job_id: IngestionJobId, ) -> Self { - let s3_input_config = S3InputConfig { + let s3_object_metadata_input_config = S3ObjectMetadataInputConfig { s3_config: S3Config { bucket: ingestion_job_config.bucket_name.clone(), region_code: ingestion_job_config.region.clone(), @@ -75,7 +67,8 @@ impl CompressionJobSubmitter { endpoint_url: ingestion_job_config.endpoint_url.clone(), aws_authentication, }, - keys: None, + ingestion_job_id, + s3_object_metadata_ids: None, // NOTE: Workaround for #1735 dataset: Some( ingestion_job_config @@ -94,8 +87,8 @@ impl CompressionJobSubmitter { compression_level: archive_output_config.compression_level, }; let io_config_template = ClpIoConfig { - input: InputConfig::S3InputConfig { - config: s3_input_config, + input: InputConfig::S3ObjectMetadataInputConfig { + config: s3_object_metadata_input_config, }, output: output_config, }; @@ -164,14 +157,14 @@ pub async fn wait_for_compression_job_completion_and_update_metadata( async fn submit_clp_compression_job_and_wait_for_completion( state: ClpCompressionState, io_config_template: ClpIoConfig, - id_and_key_pairs: Vec<(S3ObjectMetadataId, NonEmptyString)>, + object_metadata_ids: Vec, ) { let ingestion_job_id = state.get_ingestion_job_id(); - let num_objects_submitted = id_and_key_pairs.len(); + let num_objects_submitted = object_metadata_ids.len(); tracing::info!(ingestion_job_id = ? ingestion_job_id, "Submitting CLP compression job."); let compression_job_id = match state - .submit_for_compression(io_config_template, id_and_key_pairs) + .submit_for_compression(io_config_template, &object_metadata_ids) .await { Ok(id) => id, diff --git a/components/log-ingestor/src/compression/listener.rs b/components/log-ingestor/src/compression/listener.rs index d2c0efbf91..1cd09c0a76 100644 --- a/components/log-ingestor/src/compression/listener.rs +++ b/components/log-ingestor/src/compression/listener.rs @@ -1,7 +1,6 @@ use std::{pin::Pin, time::Duration}; use anyhow::Result; -use clp_rust_utils::s3::ObjectMetadata; use tokio::{ select, sync::mpsc, @@ -9,24 +8,25 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; -use crate::compression::{Buffer, BufferSubmitter}; +use crate::compression::{Buffer, BufferSubmitter, CompressionBufferEntry}; -/// Represents a listener task that buffers and submits S3 object metadata. +/// Represents a listener task that buffers incoming [`CompressionBufferEntry`] values and submits +/// when full or on timeout. /// /// # Type Parameters /// * `Submitter`: A type that implements the [`BufferSubmitter`] trait. struct ListenerTask { buffer: Buffer, timeout: Duration, - receiver: mpsc::Receiver>, + receiver: mpsc::Receiver>, } impl ListenerTask { - /// Runs the listener task to buffer and submit S3 object metadata. Submission can be triggered - /// in three ways: + /// Runs the listener task to buffer and submit [`CompressionBufferEntry`] values. Submission + /// can be triggered in three ways: /// * Receiving a cancellation signal via the provided [`cancel_token`]. - /// * The buffer capacity is reached after receiving a new object metadata. - /// * A timeout occurs without receiving new object metadata. + /// * The buffer's size threshold is reached after receiving new entries. + /// * A timeout occurs without receiving new entries. /// /// # Returns /// @@ -50,9 +50,9 @@ impl ListenerTask { return Ok(()); } - // New object metadata received. - optional_object_metadata = self.receiver.recv() => { - match optional_object_metadata { + // New object metadata refs received. + optional_refs = self.receiver.recv() => { + match optional_refs { None => { self.buffer.submit().await?; tracing::info!( @@ -60,8 +60,8 @@ impl ListenerTask { ); return Ok(()); } - Some(object_metadata_to_ingest) => { - self.buffer.add(object_metadata_to_ingest).await?; + Some(refs) => { + self.buffer.add(refs).await?; } } } @@ -77,10 +77,10 @@ impl ListenerTask { } } -/// Represents a listener that accepts S3 object metadata from multiple senders and buffers them -/// for submission. +/// Represents a listener that accepts [`CompressionBufferEntry`] values from multiple senders and +/// buffers them for submission. pub struct Listener { - sender: mpsc::Sender>, + sender: mpsc::Sender>, cancel_token: CancellationToken, handle: tokio::task::JoinHandle>, } @@ -89,8 +89,8 @@ impl Listener { /// Creates and spawns a new [`Listener`] backed by a [`ListenerTask`]. /// /// This function spawns a [`ListenerTask`]. The spawned task will buffer incoming - /// [`ObjectMetadata`] values and call the supplied `Submitter` when either the buffer's - /// threshold is reached or the configured `timeout` fires. + /// [`CompressionBufferEntry`] values and call the supplied `Submitter` when either the + /// buffer's threshold is reached or the configured `timeout` fires. /// /// # Type parameters /// @@ -144,15 +144,15 @@ impl Listener { } /// # Returns - /// A new `mpsc::Sender>` that can be used to send metadata to this - /// listener. + /// A new `mpsc::Sender>` that can be used to send + /// [`CompressionBufferEntry`] values to this listener. /// /// The returned sender is a cheap clone of the listener's internal channel sender. It can be /// freely cloned and moved to other tasks; multiple senders may concurrently send to the same /// listener. Messages sent by a single sender preserve order; messages from different senders /// are interleaved in the order they are received by the runtime. #[must_use] - pub fn get_new_sender(&self) -> mpsc::Sender> { + pub fn get_new_sender(&self) -> mpsc::Sender> { self.sender.clone() } } diff --git a/components/log-ingestor/src/ingestion_job.rs b/components/log-ingestor/src/ingestion_job.rs index ee61a91582..c586b277ac 100644 --- a/components/log-ingestor/src/ingestion_job.rs +++ b/components/log-ingestor/src/ingestion_job.rs @@ -2,12 +2,11 @@ mod s3_scanner; mod sqs_listener; mod state; +pub use clp_rust_utils::job_config::IngestionJobId; pub use s3_scanner::*; pub use sqs_listener::*; pub use state::*; -pub type IngestionJobId = u64; - /// Enum for different types of ingestion jobs. /// /// # Type Parameters: diff --git a/components/log-ingestor/src/ingestion_job/s3_scanner.rs b/components/log-ingestor/src/ingestion_job/s3_scanner.rs index aafbc05126..61232beedd 100644 --- a/components/log-ingestor/src/ingestion_job/s3_scanner.rs +++ b/components/log-ingestor/src/ingestion_job/s3_scanner.rs @@ -129,7 +129,6 @@ impl, State: IngestionJobState + S key: NonEmptyString::new(key.clone()) .map_err(|_| anyhow::anyhow!("An empty key is received."))?, size: size.try_into()?, - id: None, }; object_metadata_to_ingest.push(object_metadata); } diff --git a/components/log-ingestor/src/ingestion_job/sqs_listener.rs b/components/log-ingestor/src/ingestion_job/sqs_listener.rs index bede233d8d..1a8749236c 100644 --- a/components/log-ingestor/src/ingestion_job/sqs_listener.rs +++ b/components/log-ingestor/src/ingestion_job/sqs_listener.rs @@ -201,7 +201,6 @@ impl, State: IngestionJobState + bucket: record.s3.bucket.name, key: record.s3.object.key, size: record.s3.object.size, - id: None, }) } diff --git a/components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs b/components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs index 5e381a3f82..74cb516c39 100644 --- a/components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs +++ b/components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs @@ -30,6 +30,7 @@ use crate::{ compression::{ Buffer, CLP_COMPRESSION_JOB_TABLE_NAME, + CompressionBufferEntry, CompressionJobSubmitter, Listener, wait_for_compression_job_completion_and_update_metadata, @@ -115,7 +116,7 @@ impl ClpIngestionJobContext { /// /// A new sender for buffer ingestion. #[must_use] - pub fn get_ingestion_buffer_sender(&self) -> mpsc::Sender> { + pub fn get_ingestion_buffer_sender(&self) -> mpsc::Sender> { self.listener.get_new_sender() } @@ -430,6 +431,7 @@ impl ClpDbIngestionConnector { self.aws_authentication.clone(), &self.archive_output_config, config.as_base_config(), + job_id, ); let listener = Listener::spawn( @@ -665,7 +667,7 @@ impl ClpDbIngestionConnector { pub struct ClpIngestionState { job_id: IngestionJobId, db_pool: MySqlPool, - sender: mpsc::Sender>, + sender: mpsc::Sender>, } impl ClpIngestionState { @@ -699,50 +701,34 @@ impl ClpIngestionState { Ok(status) } - /// Gets all buffered S3 object metadata ingested for the underlying ingestion job from CLP DB. + /// Gets all buffered S3 object metadata for the underlying ingestion job from CLP DB as + /// [`CompressionBufferEntry`] values. /// /// # Returns /// - /// A vector of [`ObjectMetadata`] representing all ingested S3 object metadata in + /// A vector of [`CompressionBufferEntry`] for each row in /// [`IngestedS3ObjectMetadataStatus::Buffered`] for the underlying ingestion job on success. /// /// # Errors /// /// Returns an error if: /// - /// * [`anyhow::Error`] if any fetched bucket name or key is an empty string. /// * Forwards [`sqlx::query::Query::fetch_all`]'s return values on failure. - pub async fn get_buffered_object_metadata(&self) -> anyhow::Result> { + pub async fn get_buffered_object_metadata( + &self, + ) -> anyhow::Result> { const QUERY: &str = formatcp!( - "SELECT `id`, `bucket`, `key`, `size` FROM `{table}` WHERE `ingestion_job_id` = ? AND \ - `status` = ?;", + "SELECT `id`, `size` FROM `{table}` WHERE `ingestion_job_id` = ? AND `status` = ?;", table = INGESTED_S3_OBJECT_METADATA_TABLE_NAME, ); - let metadata_records = - sqlx::query_as::<_, (S3ObjectMetadataId, String, String, u64)>(QUERY) - .bind(self.job_id) - .bind(IngestedS3ObjectMetadataStatus::Buffered) - .fetch_all(&self.db_pool) - .await?; - - let mut object_metadata_vec = Vec::with_capacity(metadata_records.len()); - for (id, bucket, key, size) in metadata_records { - let bucket = NonEmptyString::new(bucket).map_err(|_| { - anyhow::anyhow!("Invalid bucket name stored in CLP DB: empty string") - })?; - let key = NonEmptyString::new(key).map_err(|_| { - anyhow::anyhow!("Invalid object key stored in CLP DB: empty string") - })?; - object_metadata_vec.push(ObjectMetadata { - id: Some(id), - bucket, - key, - size, - }); - } + let rows = sqlx::query_as::<_, CompressionBufferEntry>(QUERY) + .bind(self.job_id) + .bind(IngestedS3ObjectMetadataStatus::Buffered) + .fetch_all(&self.db_pool) + .await?; - Ok(object_metadata_vec) + Ok(rows) } /// Ingests the given S3 object metadata into CLP DB. @@ -902,15 +888,19 @@ impl ClpIngestionState { ); })?; + let mut refs = Vec::with_capacity(objects.len()); for (chunk_id, chunk) in objects.chunks_mut(chunk_size).enumerate() { for (next_metadata_id, object) in (*last_inserted_ids.get(chunk_id).expect("invalid chunk ID")..) .zip(chunk.iter_mut()) { - object.id = Some(next_metadata_id); + refs.push(CompressionBufferEntry { + id: next_metadata_id, + size: object.size, + }); } } - self.sender.send(objects).await?; + self.sender.send(refs).await?; Ok(()) } } @@ -1083,31 +1073,24 @@ impl ClpCompressionState { pub async fn submit_for_compression( &self, io_config_template: ClpIoConfig, - id_and_key_pairs: Vec<(S3ObjectMetadataId, NonEmptyString)>, + object_metadata_ids: &[S3ObjectMetadataId], ) -> anyhow::Result { - // TODO: As tracked in #2018, once we support submitting compression jobs using IDs, there - // is no need for passing keys into this method. const COMPRESSION_JOB_SUBMISSION_QUERY: &str = formatcp!( r"INSERT INTO {table} (`clp_config`) VALUES (?)", table = CLP_COMPRESSION_JOB_TABLE_NAME ); - if id_and_key_pairs.is_empty() { + if object_metadata_ids.is_empty() { const ERROR_MSG: &str = "No objects to compress."; tracing::error!(job_id = ? self.ingestion_job_id, ERROR_MSG); panic!("{}", ERROR_MSG); } let mut io_config = io_config_template; - let s3_input_config = match &mut io_config.input { - InputConfig::S3InputConfig { config } => config, + let s3_object_metadata_input_config = match &mut io_config.input { + InputConfig::S3ObjectMetadataInputConfig { config } => config, }; - s3_input_config.keys = Some( - id_and_key_pairs - .iter() - .map(|(_, key)| key.clone()) - .collect(), - ); + s3_object_metadata_input_config.s3_object_metadata_ids = Some(object_metadata_ids.to_vec()); let mut tx = self.db_pool.begin().await?; @@ -1126,7 +1109,7 @@ impl ClpCompressionState { // batch size is chosen to be 10000, which is conservative enough to avoid hitting the limit // while also minimizing the number of batches for typical use cases. If the number of // placeholders per update changes, we may need to adjust the batch size accordingly. - for chunk in id_and_key_pairs.chunks(10000) { + for chunk in object_metadata_ids.chunks(10000) { let mut query_builder = sqlx::QueryBuilder::::new(formatcp!( r"UPDATE `{table}` ", table = INGESTED_S3_OBJECT_METADATA_TABLE_NAME, @@ -1139,7 +1122,7 @@ impl ClpCompressionState { .push_bind(IngestedS3ObjectMetadataStatus::Submitted); query_builder.push(" WHERE `id` IN ("); let mut separated_ids = query_builder.separated(", "); - for (id, _) in chunk { + for id in chunk { separated_ids.push_bind(id); } query_builder.push(")"); diff --git a/components/log-ingestor/tests/test_compression_listener.rs b/components/log-ingestor/tests/test_compression_listener.rs index d27750ecb8..40ae3dbb97 100644 --- a/components/log-ingestor/tests/test_compression_listener.rs +++ b/components/log-ingestor/tests/test_compression_listener.rs @@ -2,16 +2,21 @@ use std::{sync::Arc, time::Duration}; use anyhow::Result; use async_trait::async_trait; -use clp_rust_utils::{s3::ObjectMetadata, types::non_empty_string::ExpectedNonEmpty}; -use log_ingestor::compression::{Buffer, BufferSubmitter, DEFAULT_LISTENER_CAPACITY, Listener}; -use non_empty_string::NonEmptyString; +use clp_rust_utils::s3::S3ObjectMetadataId; +use log_ingestor::compression::{ + Buffer, + BufferSubmitter, + CompressionBufferEntry, + DEFAULT_LISTENER_CAPACITY, + Listener, +}; use tokio::sync::{Mutex, mpsc}; const TEST_OBJECT_SIZE: u64 = 1024; /// A test submitter that stores submitted buffers in memory for inspection. struct TestBufferSubmitter { - store: Arc>>>, + store: Arc>>>, } impl TestBufferSubmitter { @@ -21,41 +26,41 @@ impl TestBufferSubmitter { } } - fn shared_store(&self) -> Arc>>> { + fn shared_store(&self) -> Arc>>> { self.store.clone() } } #[async_trait] impl BufferSubmitter for TestBufferSubmitter { - async fn submit(&self, buffer: &[ObjectMetadata]) -> Result<()> { - let store = self.store.clone(); - let _submitted_results = store.lock().await.push(buffer.to_vec()); + async fn submit(&self, buffer: &[S3ObjectMetadataId]) -> Result<()> { + self.store.lock().await.push(buffer.to_vec()); Ok(()) } } -/// Sends a list of objects to the listener via the provided sender. -async fn send_to_listener(objects: Vec, sender: mpsc::Sender>) { +/// Sends [`CompressionBufferEntry`] values to the listener via the provided sender. +async fn send_to_listener( + refs: Vec, + sender: mpsc::Sender>, +) { sender - .send(objects) + .send(refs) .await - .expect("Failed to send objects to listener"); + .expect("Failed to send refs to listener"); } -/// Creates a vector of [`ObjectMetadata`] objects for a given bucket. Each object will have a -/// unique key and a fixed size of [`TEST_OBJECT_SIZE`]. +/// Creates [`CompressionBufferEntry`] values for testing. IDs start at `id_start`. +/// [`TEST_OBJECT_SIZE`]. /// /// # Returns /// -/// A vector of created objects for testing. -fn create_test_objects(bucket_name: &str, count: usize) -> Vec { - (0..count) - .map(|i| ObjectMetadata { - bucket: NonEmptyString::from_string(bucket_name.to_string()), - key: NonEmptyString::from_string(format!("object-{i}")), +/// A vector of [`CompressionBufferEntry`] for testing. +fn create_test_refs(id_start: S3ObjectMetadataId, count: usize) -> Vec { + (id_start..id_start + count as S3ObjectMetadataId) + .map(|id| CompressionBufferEntry { + id, size: TEST_OBJECT_SIZE, - id: None, }) .collect() } @@ -63,8 +68,6 @@ fn create_test_objects(bucket_name: &str, count: usize) -> Vec { #[tokio::test] async fn test_compression_listener() -> Result<()> { const DEFAULT_TIMEOUT_SECONDS: u64 = 4; - const BUCKET_0: &str = "bucket-0"; - const BUCKET_1: &str = "bucket-1"; let submitter = TestBufferSubmitter::new(); let shared = submitter.shared_store(); @@ -76,24 +79,18 @@ async fn test_compression_listener() -> Result<()> { DEFAULT_LISTENER_CAPACITY, ); - let bucket_0_objects = create_test_objects(BUCKET_0, 100); - let bucket_1_objects = create_test_objects(BUCKET_1, 200); + let refs1 = create_test_refs(1, 100); + let refs2 = create_test_refs(101, 100); + let refs3 = create_test_refs(201, 100); - // Spawn three tasks that send into the listener concurrently: - // 1) send all of `bucket_0_objects` - // 2) send the first 100 objects of `bucket_1_objects` - // 3) send the last 100 objects of `bucket_1_objects` + // Spawn three tasks that send into the listener concurrently let sender1 = listener.get_new_sender(); let sender2 = listener.get_new_sender(); let sender3 = listener.get_new_sender(); - let objs1 = bucket_0_objects.clone(); - let objs2 = bucket_1_objects[..100].to_vec(); - let objs3 = bucket_1_objects[100..].to_vec(); - - let h1 = tokio::spawn(async move { send_to_listener(objs1, sender1).await }); - let h2 = tokio::spawn(async move { send_to_listener(objs2, sender2).await }); - let h3 = tokio::spawn(async move { send_to_listener(objs3, sender3).await }); + let h1 = tokio::spawn(async move { send_to_listener(refs1, sender1).await }); + let h2 = tokio::spawn(async move { send_to_listener(refs2, sender2).await }); + let h3 = tokio::spawn(async move { send_to_listener(refs3, sender3).await }); // Wait for all sender tasks to finish h1.await.unwrap(); @@ -110,16 +107,13 @@ async fn test_compression_listener() -> Result<()> { // The first two triggered by buffer size limit, the last one by timeout. assert_eq!(submitted_buffers.len(), 3); - let mut actual_total_submitted: Vec = - submitted_buffers.iter().flatten().cloned().collect(); - actual_total_submitted.sort(); + let mut actual_ids: Vec = + submitted_buffers.iter().flatten().copied().collect(); + actual_ids.sort_unstable(); - let mut expected_total_submitted = Vec::new(); - expected_total_submitted.extend_from_slice(&bucket_0_objects); - expected_total_submitted.extend_from_slice(&bucket_1_objects); - expected_total_submitted.sort(); + let expected_ids: Vec = (1..=300).collect(); + assert_eq!(expected_ids, actual_ids); - assert_eq!(expected_total_submitted, actual_total_submitted); submitted_buffers.clear(); drop(submitted_buffers); diff --git a/components/log-ingestor/tests/test_ingestion_job.rs b/components/log-ingestor/tests/test_ingestion_job.rs index fd8a614519..83cd083ef4 100644 --- a/components/log-ingestor/tests/test_ingestion_job.rs +++ b/components/log-ingestor/tests/test_ingestion_job.rs @@ -104,7 +104,6 @@ async fn upload_and_receive( bucket: bucket.clone(), key: NonEmptyString::from_string(format!("{prefix}/{idx:05}.log")), size: 16, - id: None, }) .collect(); @@ -140,7 +139,6 @@ async fn upload_noise_objects( bucket: bucket.clone(), key: NonEmptyString::from_string(format!("{}.log", Uuid::new_v4())), size: 16, - id: None, }) .collect();