Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions components/clp-rust-utils/src/job_config/clp_io_config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use non_empty_string::NonEmptyString;
use serde::Serialize;

use crate::clp_config::S3Config;
use crate::{clp_config::S3Config, s3::S3ObjectMetadataId};

/// Represents CLP IO config.
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
Expand All @@ -10,14 +10,17 @@ pub struct ClpIoConfig {
pub output: OutputConfig,
}

/// Represents the unique identifier for an ingestion job in CLP DB.
pub type IngestionJobId = u64;

/// An enum representing CLP input config.
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(tag = "type")]
pub enum InputConfig {
#[serde(rename = "s3")]
S3InputConfig {
#[serde(rename = "metadata")]
S3ObjectMetadataInputConfig {
#[serde(flatten)]
config: S3InputConfig,
config: S3ObjectMetadataInputConfig,
},
}

Expand All @@ -33,6 +36,18 @@ pub struct S3InputConfig {
pub unstructured: bool,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct S3ObjectMetadataInputConfig {
#[serde(flatten)]
pub s3_config: S3Config,

pub ingestion_job_id: IngestionJobId,
pub metadata_ids: Option<Vec<S3ObjectMetadataId>>,
pub dataset: Option<NonEmptyString>,
pub timestamp_key: Option<NonEmptyString>,
pub unstructured: bool,
}

/// Represents CLP output config.
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct OutputConfig {
Expand Down
1 change: 0 additions & 1 deletion components/clp-rust-utils/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ pub struct ObjectMetadata {
pub bucket: NonEmptyString,
pub key: NonEmptyString,
pub size: u64,
pub id: Option<S3ObjectMetadataId>,
}
28 changes: 16 additions & 12 deletions components/clp-rust-utils/tests/clp_config_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clp_rust_utils::{
clp_config::{AwsAuthentication, AwsCredentials, S3Config},
job_config::{ClpIoConfig, InputConfig, OutputConfig, S3InputConfig},
job_config::{ClpIoConfig, InputConfig, OutputConfig, S3ObjectMetadataInputConfig},
serde::BrotliMsgpack,
types::non_empty_string::ExpectedNonEmpty,
};
Expand All @@ -22,10 +22,11 @@ fn test_clp_io_config_serialization() {
},
};
let config = ClpIoConfig {
input: InputConfig::S3InputConfig {
config: S3InputConfig {
input: InputConfig::S3ObjectMetadataInputConfig {
config: S3ObjectMetadataInputConfig {
s3_config,
keys: None,
ingestion_job_id: 1,
metadata_ids: None,
dataset: Some(NonEmptyString::from_static_str("test-dataset")),
timestamp_key: Some(NonEmptyString::from_static_str("timestamp")),
unstructured: false,
Expand All @@ -42,20 +43,22 @@ fn test_clp_io_config_serialization() {

let brotli_compressed_msgpack = BrotliMsgpack::serialize(&config)
.expect("Brotli-compressed MessagePack serialized config.");
let expected = "1ba80100e4ffdf9f43284b650e496850ba5f98ef7b53044b04d074faa66eb23ebef25c0d1a13ac\
4b17669ac5cbe254cffc4e749bb4c455d2f61c988e679697db09ba171ce575912c9a14986d0e05f87e4dd1babca\
9d5b5d1c5267067deea8cacf9928cf673f1a75a627945e06c7cbe6c6b21d98945e16df94ba0d5fce7c2d61158ca\
6541545e74dd2e9e6e790fd24191a86c1efb597b4f9e82aadfaab0f7f87c81440262811c44fc671677003b39270\
2be7820095035db7be96769d799a348764444621df0fb4382c2fdba353b5a24ae147e0ba5f3def513120b64bf01\
aa2df16f31180cafc98d35a4544544c0267abd69708e6ce25962baa581e1cc970a7a2301b070ca0c";
let expected = "1bc80100e4ffbf9f0b909134588545d1fdd9f7bd15164db4d3eaf8a66eb\
23ebef25ab5694cb0bf6c63a6593c294eb77947a7a4a6d3e85ac5c3b9623c9d515ef48aa981a92dc\
30547795d248b3e1066eb498155bba6180eb21f4e6c74b10f089a778d9cb061cb183d1d7f3c9be51\
5818bf086131920d98996f0c2fc2950cff85cd82e024b11cd88ca975e0f4388b73c1369c7222a6b\
a09c0cf7102ad8f55b15f6c9743a5bad4239db8662e68ce506d40eced56cba92ad816a7952adf7fbd\
f6c0b349b0355f0efb2754015f0b394a628921d11918487bf3f44c8b1bf9ed58e5ac0e6c26fd109e\
9e4fa498985623c01aa03f11ff2f97cafc51575245745d48003f4fad3c02c59d933c7b4e581eeccdb6\
63fa0b470615510";
assert_eq!(expected, hex::encode(brotli_compressed_msgpack));

let json_serialized_result = serde_json::to_string_pretty(&config);
assert!(json_serialized_result.is_ok());
let json_serialized = json_serialized_result.unwrap();
let expected = serde_json::json!({
"input": {
"type": "s3",
"type": "metadata",
"bucket": "yscope",
"region_code": "us-east-2",
"key_prefix": "sample-logs/cockroachdb.clp.zst",
Expand All @@ -67,7 +70,8 @@ fn test_clp_io_config_serialization() {
"secret_access_key": "SECRET_ACCESS_KEY"
}
},
"keys": null,
"ingestion_job_id": 1,
"metadata_ids": null,
"dataset": "test-dataset",
"timestamp_key": "timestamp",
"unstructured": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
InputType,
PathsToCompress,
S3InputConfig,
S3ObjectMetadataInputConfig,
)
from job_orchestration.scheduler.task_result import CompressionTaskResult
from job_orchestration.scheduler.utils import _is_s3_based_input


def update_compression_task_metadata(db_cursor, task_id, kv):
Expand Down Expand Up @@ -137,7 +139,7 @@ def _generate_fs_logs_list(
def _generate_s3_logs_list(
output_file_path: pathlib.Path,
paths_to_compress: PathsToCompress,
s3_input_config: S3InputConfig,
s3_input_config: S3InputConfig | S3ObjectMetadataInputConfig,
) -> None:
# S3 object keys are stored as file_paths in `PathsToCompress`
object_keys = paths_to_compress.file_paths
Expand Down Expand Up @@ -271,7 +273,7 @@ def _make_clp_s_command_and_env(
# fmt: on

compression_env_vars = dict(os.environ)
if InputType.S3 == clp_config.input.type and not clp_config.input.unstructured:
if _is_s3_based_input(clp_config.input.type) and not clp_config.input.unstructured:
compression_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication))
compression_cmd.append("--auth")
compression_cmd.append("s3")
Expand Down Expand Up @@ -313,7 +315,7 @@ def _make_log_converter_command_and_env(
# fmt: on

conversion_env_vars = dict(os.environ)
if InputType.S3 == clp_config.input.type:
if _is_s3_based_input(clp_config.input.type):
conversion_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication))
conversion_cmd.append("--auth")
conversion_cmd.append("s3")
Expand Down Expand Up @@ -392,7 +394,7 @@ def run_clp(
logs_list_path = tmp_dir / f"{instance_id_str}-log-paths.txt"
if InputType.FS == input_type:
_generate_fs_logs_list(logs_list_path, paths_to_compress)
elif InputType.S3 == input_type:
elif _is_s3_based_input(input_type):
_generate_s3_logs_list(logs_list_path, paths_to_compress, clp_config.input)
else:
error_msg = f"Unsupported input type: {input_type}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
fetch_existing_datasets,
)
from clp_py_utils.compression import validate_path_and_get_info
from clp_py_utils.core import read_yaml_config_file
from clp_py_utils.core import (
FileMetadata,
read_yaml_config_file,
)
from clp_py_utils.s3_utils import s3_get_object_metadata
from clp_py_utils.sql_adapter import SqlAdapter
from pydantic import ValidationError
Expand All @@ -38,13 +41,15 @@
from job_orchestration.scheduler.constants import (
CompressionJobStatus,
CompressionTaskStatus,
INGESTED_S3_OBJECT_METADATA_TABLE_NAME,
SchedulerType,
)
from job_orchestration.scheduler.job_config import (
ClpIoConfig,
FsInputConfig,
InputType,
S3InputConfig,
S3ObjectMetadataInputConfig,
)
from job_orchestration.scheduler.scheduler_data import (
CompressionJob,
Expand Down Expand Up @@ -183,6 +188,52 @@ def _process_s3_input(
paths_to_compress_buffer.add_file(object_metadata)


def _process_s3_object_metadata_input(
s3_object_metadata_input_config: S3ObjectMetadataInputConfig,
paths_to_compress_buffer: PathsToCompressBuffer,
db_context: DbContext,
) -> None:
"""
Fetches S3 object metadata rows from the INGESTED_S3_OBJECT_METADATA_TABLE_NAME table for the
given metadata_ids and ingestion_job_id, and adds the metadata to paths_to_compress_buffer.

:param s3_object_metadata_input_config:
:param paths_to_compress_buffer:
:param db_context:
:raises RuntimeError: If no rows are found, or if any requested metadata_id is missing.
"""
metadata_ids = s3_object_metadata_input_config.metadata_ids
ingestion_job_id = s3_object_metadata_input_config.ingestion_job_id

placeholders = ", ".join(["%s"] * len(metadata_ids))
query = (
f"SELECT id, `key`, size FROM {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} "
f"WHERE id IN ({placeholders}) AND ingestion_job_id = %s"
)
params = (*metadata_ids, ingestion_job_id)
db_context.cursor.execute(query, params)
metadata_list = db_context.cursor.fetchall()
if not metadata_list:
raise RuntimeError(
f"No rows found in {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} for the given "
f"metadata_ids and ingestion_job_id {ingestion_job_id}."
)

# Validate that all requested IDs are present.
returned_ids = {row["id"] for row in metadata_list}
requested_ids = set(metadata_ids)
missing_ids = requested_ids - returned_ids
if missing_ids:
raise RuntimeError(
f"Missing metadata rows in {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} for "
f"ingestion_job_id {ingestion_job_id}: {sorted(missing_ids)}."
)

for metadata in metadata_list:
file_metadata = FileMetadata(path=Path(metadata["key"]), size=int(metadata["size"]))
paths_to_compress_buffer.add_file(file_metadata)


def _write_user_failure_log(
title: str,
content: list[str],
Expand Down Expand Up @@ -321,6 +372,22 @@ def search_and_schedule_new_tasks(
},
)
return
elif input_type == InputType.METADATA.value:
try:
_process_s3_object_metadata_input(
input_config, paths_to_compress_buffer, db_context
)
except Exception as err:
logger.exception("Failed to process S3 object metadata input for job %s", job_id)
update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": f"S3 object metadata input failure: {err}",
},
)
return
else:
logger.error(f"Unsupported input type {input_type}")
update_compression_job_metadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from enum import auto, IntEnum

INGESTED_S3_OBJECT_METADATA_TABLE_NAME = "ingested_s3_object_metadata"

TASK_QUEUE_LOWEST_PRIORITY = 1
TASK_QUEUE_HIGHEST_PRIORITY = 3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
class InputType(LowercaseStrEnum):
FS = auto()
S3 = auto()
METADATA = auto()


class PathsToCompress(BaseModel):
Expand Down Expand Up @@ -44,6 +45,22 @@ def validate_keys(cls, value):
return value


class S3ObjectMetadataInputConfig(S3Config):
type: Literal[InputType.METADATA.value] = InputType.METADATA.value
ingestion_job_id: int
dataset: str | None = None
timestamp_key: str | None = None
unstructured: bool = False
metadata_ids: list[int]

@field_validator("metadata_ids")
@classmethod
def validate_metadata_ids_non_empty(cls, value: list[int]) -> list[int]:
if not value:
raise ValueError("metadata_ids cannot be an empty list")
return value


class OutputConfig(BaseModel):
target_archive_size: int
target_dictionaries_size: int
Expand All @@ -53,7 +70,7 @@ class OutputConfig(BaseModel):


class ClpIoConfig(BaseModel):
input: FsInputConfig | S3InputConfig
input: FsInputConfig | S3InputConfig | S3ObjectMetadataInputConfig
output: OutputConfig


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
QueryTaskStatus,
SchedulerType,
)
from job_orchestration.scheduler.job_config import InputType


def _is_s3_based_input(input_type: InputType) -> bool:
return input_type in {InputType.S3, InputType.METADATA}


def kill_hanging_jobs(sql_adapter: SqlAdapter, scheduler_type: str) -> list[int] | None:
Expand Down
31 changes: 22 additions & 9 deletions components/log-ingestor/src/compression/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
use anyhow::Result;
use async_trait::async_trait;
use clp_rust_utils::s3::ObjectMetadata;
use clp_rust_utils::s3::S3ObjectMetadataId;
use sqlx::FromRow;

/// One entry of data fed into the compression buffer
#[derive(Debug, Clone, PartialEq, Eq, FromRow)]
pub struct CompressionBufferEntry {
pub id: S3ObjectMetadataId,
pub size: u64,
}

#[async_trait]
/// A trait for submitting buffered object metadata for processing.
Expand All @@ -14,18 +22,19 @@ pub trait BufferSubmitter {
/// # Errors:
///
/// Returns an [`anyhow::Error`] on failure.
async fn submit(&self, buffer: &[ObjectMetadata]) -> Result<()>;
async fn submit(&self, buffer: &[S3ObjectMetadataId]) -> Result<()>;
}

/// A buffer that accumulates object metadata and submits it when a size threshold is reached.
/// A buffer that accumulates object metadata IDs and a running total size, and submits when the
/// size threshold is reached.
///
/// # Type Parameters:
///
/// * [`Submitter`]: A type that implements the [`BufferSubmitter`] trait for submitting buffered
/// data.
pub struct Buffer<Submitter: BufferSubmitter> {
submitter: Submitter,
buf: Vec<ObjectMetadata>,
buf: Vec<S3ObjectMetadataId>,
total_size: u64,
size_threshold: u64,
}
Expand All @@ -45,7 +54,8 @@ impl<Submitter: BufferSubmitter> Buffer<Submitter> {
}
}

/// Adds object metadata to the buffer and submits if the size threshold is reached.
/// Adds [`CompressionBufferEntry`] values to the buffer and submits if the size threshold is
/// reached.
///
/// # Returns
///
Expand All @@ -56,10 +66,13 @@ impl<Submitter: BufferSubmitter> Buffer<Submitter> {
/// Returns an error if:
///
/// * Forwards [`Self::submit`]'s return values on failure.
pub async fn add(&mut self, object_metadata_to_ingest: Vec<ObjectMetadata>) -> Result<()> {
for object_metadata in object_metadata_to_ingest {
self.total_size += object_metadata.size;
self.buf.push(object_metadata);
pub async fn add(
&mut self,
object_metadata_to_ingest: Vec<CompressionBufferEntry>,
) -> Result<()> {
for ref_ in object_metadata_to_ingest {
self.total_size += ref_.size;
self.buf.push(ref_.id);

if self.total_size >= self.size_threshold {
self.submit().await?;
Expand Down
Loading
Loading