Skip to content

[log-ingestor] Improve compression job submission flow. #2018

@LinZhihao-723

Description

@LinZhihao-723

Background

#2017 introduced DB operations to submit ingested S3 object metadata into a compression job, as a part of the implementation for #1978. With this PR, the ingested_s3_object_metadata table is created and populated by the log-ingestor during ingestion.

For reference, this table contains:

Column Type
id BIGINT UNSIGNED
bucket VARCHAR(1024)
key VARCHAR(1024)
size BIGINT UNSIGNED
status ENUM(...)
ingestion_job_id BIGINT UNSIGNED
compression_job_id INT
creation_ts DATETIME(3)
last_update_ts DATETIME(3)

Current Compression Job Submission & Processing Flow

Log Ingestor

  1. Once it has a full buffer of object metadata, the log ingestor calls submit_for_compression.
  2. This method takes a list of (S3ObjectMetadataId, key) pairs. It then only uses the S3 keys to build a clp_config which it then inserts into compression_jobs. The IDs are used to update columns in ingested_s3_object_metadata.
  3. Updates ingested_s3_object_metadata.compression_job_id and ingested_s3_object_metadata.status to submitted for those rows.

Compression Scheduler

  1. Polls compression_jobs for pending jobs
  2. For each pending job, it reads the clp_config. Since this config only contains keys, the scheduler has no way to associate the key with the relevant metadata entry. It thus calls out to S3 to fetch object sizes before it can partition the work into tasks.

Problem

Object size is already stored in ingested_s3_object_metadata. The compression scheduler fetches it again from S3 which is redundant. This happens because submit_for_compression stores only keys in clp_config. This means the compression_jobs table has no reference to the rows in ingested_s3_object_metadata.

A symptom of this issue was present in #2017. In this PR, we had to change ObjectMetadata to include an ID in the DB for compression job submission. The way of setting this ID is not legit as it mutates ObjectMetadata while the information duplicates what's been persisted in CLP DB.

Proposal

We will eliminate the redundant trip to S3. When a compression job is submitted by the log-ingestor, the job config will contain
ingested_s3_object_metadata row IDs rather than S3 keys. This enables us to address the problem because:

  • The compression scheduler can read the required metadata from the DB without contacting S3 eliminating the round trip.
  • The log-ingestor no longer needs to hold ObjectMetadata in its compression submitting buffer since the IDs can be associated with the metadata.
    • This means we can remove the ID field added to the ObjectMetadata

Compression job config schema

Scheduler

We will add a new input type IngestorInputConfig as a third variant of ClpIoConfig.input. It will resemble the following:

class IngestorInputConfig(BaseModel):
    type: Literal[InputType.INGESTOR.value] = InputType.INGESTOR.value
    
    dataset: str | None = None
    timestamp_key: str | None = None
    unstructured: bool = False
    metadata_ids: list[int]

    @field_validator("metadata_ids")
    @classmethod
    def validate_metadata_ids_non_empty(cls, value: list[int]) -> list[int]:
        if len(value) == 0:
            raise ValueError("metadata_ids cannot be an empty list")
        return value

Ingestor

We will remove the keys param in clp_io_config.rs's S3InputConfig and add an optional :

/// Represents S3 input config.
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct S3InputConfig {
    #[serde(flatten)]
    pub s3_config: S3Config,

    pub metadata_ids: Option<Vec<S3ObjectMetadataId>>,
    pub dataset: Option<NonEmptyString>,
    pub timestamp_key: Option<NonEmptyString>,
    pub unstructured: bool,
}

Updated Flow

Log-ingestor: Submitting a Compression Job

We will introduce a named type CompressionBufferEntry (pls suggest a better name; maybe BufferedObjectMetadataEntry?) in buffer.rs for the data fed into the buffer. While the buffer itself will contain a vector of S3ObjectMetadataId, it flushes when total byte size of accumulated objects reaches a threshold. So each entry added to the buffer must carry both the metadata row id and that row's size, so we can maintain a running total and decide when to flush. We opt for a named struct as opposed to a tuple for readability and deriving FromRow for loading from the DB when recovering buffered metadata.

It will resemble:

/// A reference to a persisted S3 object metadata row
#[derive(Debug, Clone, PartialEq, Eq, FromRow)]
pub struct CompressionBufferEntry {
    pub id: S3ObjectMetadataId,
    pub size: u64,
}

We introduce this type

  1. Once it has a full buffer of object metadata, the log ingestor calls submit_for_compression with a list of S3ObjectMetadataId values. Each item in this listed would have already been persisted in ingested_s3_object_metadata.
  2. It builds an S3InputConfig with those IDs.
  3. It serializes and inserts that config into compression_jobs.
  4. It updates ingested_s3_object_metadata (unchanged).

The log-ingestor no longer needs to carry S3 keys through the buffer or the compression job submitter.

The buffer accepts a named type CompressionBufferEntry (id and size) for each persisted object. Size is used for the flush threshold; the submitter receives only the list of IDs when the buffer flushes.

Compression scheduler: Processing The Job

When the compression scheduler reads a pending job whose clp_config is an IngestorInputConfig:

  1. Extract metadata_ids from the config.
  2. Query ingested_s3_object_metadata for those IDs:
 SELECT id, key, size FROM ingested_s3_object_metadata WHERE id IN (...)
  1. Build FileMetadata(path=key, size=size) for each row and add to PathsToCompressBuffer.
  2. Partitioning and dispatching will be unchanged.

Implementation plan

Each item will be a PR:

  1. Compression Scheduler
  • Add IngestorInputConfig to job_config.py and extend ClpIoConfig.input to include it.
  • In the scheduler, when clp_config.input is IngestorInputConfig, follow the flow outlined above.
  1. Log-ingestor: submit by IDs and simplify buffer
  • Update clp_io_config.rs's S3InputConfig as described above.
  • Introduce CompressionBufferEntry in buffer.rs and update the listener interface to use it.
  • Change the buffer and submitter interface so they work with the list of IDs when building the compression job, rather than passing a list of ObjectMetadata with keys.
  • Remove and stop populating the id field from ObjectMetadata in clp-rust-utils
  • Update log-ingestor's unit tests and component-wise integration tests so that the ingested files are buffered and checked directly through the ingestion job state implementation, and no buffering logic will be involved.

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions