Skip to content

feat(log-ingestor): Introduce an abstract State layer for fault-tolerance implementation:#2001

Open
LinZhihao-723 wants to merge 19 commits intoy-scope:mainfrom
LinZhihao-723:state-manager
Open

feat(log-ingestor): Introduce an abstract State layer for fault-tolerance implementation:#2001
LinZhihao-723 wants to merge 19 commits intoy-scope:mainfrom
LinZhihao-723:state-manager

Conversation

@LinZhihao-723
Copy link
Member

@LinZhihao-723 LinZhihao-723 commented Feb 17, 2026

  • Define state-related traits to formalize fault-tolerance contracts.
  • Refactor SqsListener and S3Scanner to integrate with state operations.
  • Update listener and buffer components to support batched object metadata handling.
  • Implement a no-op state backend to preserve the existing zero fault-tolerance behavior.

Description

This PR is a part of the implementation for #1978.

This PR introduces an abstract layer called State for fault-tolerance implementation. It abstracts all the state transitions required when persisting ingested files and the related ingestion job state. This abstraction allows us to implement a no-op state to preserve the zero-fault-tolerance mode, while keeping the tester runnable without introducing the persistence layer.

For more details, check the multi-line commit message followed by the PR title.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows pass.
  • Verified all unit tests pass after adding --nocapture option.
  • Tested S3 scanner and SQS listener ingestion jobs work as normal.

Summary by CodeRabbit

  • Refactor

    • End-to-end ingestion now handles metadata as batched payloads and threads per-job state through job lifecycle and shutdown.
  • New Features

    • Pluggable per-job ingestion state abstraction added, with a default "zero-fault" state implementation for start/stop and ingestion hooks.
  • Tests

    • Test suite updated to exercise batched payloads and the new per-job state paths.

@LinZhihao-723 LinZhihao-723 requested a review from a team as a code owner February 17, 2026 02:31
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 17, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Introduces a pluggable async state abstraction for ingestion jobs and switches internal messaging to batched Vec<ObjectMetadata>: scanners and listeners now produce/consume batches, buffer.add accepts vectors and iterates per-item with threshold checks, and ingestion job types are generic over new state traits with a ZeroFaultToleranceIngestionJobState implementation.

Changes

Cohort / File(s) Summary
Compression: buffer & listener
components/log-ingestor/src/compression/buffer.rs, components/log-ingestor/src/compression/listener.rs
Channel payloads and APIs changed from ObjectMetadata to Vec<ObjectMetadata>. buffer.add now accepts a Vec and iterates items (updating total_size and performing threshold checks inside the loop); listener sender/receiver types and call sites updated to send/receive batches.
Ingestion job state & generics
components/log-ingestor/src/ingestion_job/state.rs, components/log-ingestor/src/ingestion_job.rs
Added async traits IngestionJobState, SqsListenerState, S3ScannerState and concrete ZeroFaultToleranceIngestionJobState. Made IngestionJob generic over State: IngestionJobState, re-exported state module, and updated impls/From conversions to propagate the generic State.
S3 scanner: batching + state
components/log-ingestor/src/ingestion_job/s3_scanner.rs
S3Scanner and its Task are now generic over State: S3ScannerState; spawn accepts state. Scan batches object metadata into a Vec, calls state.ingest(...), and advances start_after using last ingested key. Removed per-item channel sends in favor of state-driven ingestion.
SQS listener: batching + state
components/log-ingestor/src/ingestion_job/sqs_listener.rs
SqsListener, Task, and TaskHandle are generic over State: SqsListenerState; spawn accepts state. Internal payloads are Vec<ObjectMetadata>. Tasks accumulate metadata into a vector, call state.ingest(...), send batches, and perform batch SQS deletes using aggregated receipt handles.
Job manager: per-job state propagation
components/log-ingestor/src/ingestion_job_manager.rs
Per-job ZeroFaultToleranceIngestionJobState created, started, stored in IngestionJobTableEntry, and passed to S3Scanner::spawn / SqsListener::spawn. Shutdown now calls entry.state.end() before job shutdown. Callback signatures updated to accept per-job state.
Tests updated for batching and state
components/log-ingestor/tests/test_compression_listener.rs, components/log-ingestor/tests/test_ingestion_job.rs
Tests updated to use Sender<Vec<ObjectMetadata>> / Receiver<Vec<ObjectMetadata>>, send/receive batches (flattening where needed), and pass ZeroFaultToleranceIngestionJobState to spawn calls.

Sequence Diagram(s)

sequenceDiagram
    participant Scanner as "S3Scanner / SqsListener"
    participant State as "ZeroFaultToleranceIngestionJobState"
    participant Channel as "mpsc::Sender<Vec<ObjectMetadata>>"
    participant Listener as "ListenerTask / Receiver"
    participant Buffer as "Compression Buffer"
    participant Submit as "Submit / Compressor"

    Scanner->>State: ingest(batch: Vec<ObjectMetadata>)
    State-->>Channel: send(batch) (for ZeroFaultTolerance)
    Channel-->>Listener: deliver(batch)
    Listener->>Buffer: add(batch)
    Buffer->>Buffer: iterate items, update total_size
    alt size >= threshold (during iteration)
        Buffer->>Submit: submit(buffered items)
        Submit-->>Buffer: Ok
    end
    Listener-->>Submit: submit remaining buffered items (if any)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly describes the main change: introducing an abstract State layer for fault-tolerance implementation, which aligns with the comprehensive refactoring across multiple components to add generic state handling.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
components/log-ingestor/src/compression/listener.rs (1)

144-153: ⚠️ Potential issue | 🟡 Minor

Stale docstring: return type reference not updated.

Line 145 still says mpsc::Sender<ObjectMetadata> but the actual return type is now mpsc::Sender<Vec<ObjectMetadata>>.

Proposed fix
     /// # Returns
-    /// A new `mpsc::Sender<ObjectMetadata>` that can be used to send metadata to this listener.
+    /// A new `mpsc::Sender<Vec<ObjectMetadata>>` that can be used to send metadata to this
+    /// listener.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/compression/listener.rs` around lines 144 - 153,
The docstring for get_new_sender is stale: it states the return type is
mpsc::Sender<ObjectMetadata> but the function actually returns
mpsc::Sender<Vec<ObjectMetadata>>; update the Returns section and any inline
text to reference mpsc::Sender<Vec<ObjectMetadata>> (or "sender of
Vec<ObjectMetadata>") so it matches the function signature and clarify that
clones send Vec<ObjectMetadata> messages, referencing the get_new_sender method
and the sender field.
components/log-ingestor/src/ingestion_job_manager.rs (1)

211-222: ⚠️ Potential issue | 🟠 Major

state.end() is called before the ingestion job and listener are shut down — this ordering will break real state backends.

Currently entry.state.end() executes while the ingestion job and listener are still running. With the noop implementation this is harmless, but any real fault-tolerance backend would finalize/persist state before in-flight data has been flushed through the pipeline.

The expected shutdown sequence should be:

  1. Shut down the ingestion job (stop producing new data).
  2. Shut down the listener (flush remaining buffered data).
  3. Call state.end() (finalize state after all data has been processed).
Proposed fix
             Some(entry) => {
-                entry.state.end().await?;
                 entry.ingestion_job.shutdown_and_join().await?;
                 tracing::debug!("Ingestion job {} shut down.", job_id);
                 entry.listener.shutdown_and_join().await?;
                 tracing::debug!("Ingestion job {}'s listener shut down.", job_id);
+                entry.state.end().await?;
+                tracing::debug!("Ingestion job {}'s state finalized.", job_id);
                 Ok(())
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 211 - 222,
The shutdown ordering is wrong: call
entry.ingestion_job.shutdown_and_join().await? first, then
entry.listener.shutdown_and_join().await?, and only after both complete call
entry.state.end().await?; update the match arm in the function that removes jobs
(the block that currently calls entry.state.end() before shutdowns) to perform
ingestion_job.shutdown_and_join(), then listener.shutdown_and_join(), then
state.end(), preserving the existing error propagation (using ?), and keep the
debug logs after each successful shutdown (e.g., "Ingestion job {} shut down."
and "Ingestion job {}'s listener shut down.") so state.finalization happens
last.
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)

241-297: 🧹 Nitpick | 🔵 Trivial

_state stored in SqsListener — same scaffolding pattern as S3Scanner.

Same observation as in S3Scanner: _state is held but never read. If this is intentional scaffolding for future checkpoint/shutdown logic, a brief doc comment on the field would help.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job/sqs_listener.rs` around lines 241 -
297, The field `_state` in struct `SqsListener<State: SqsListenerState>` is
currently stored but never read (same pattern as `S3Scanner`); either remove it
or, if it is intentional scaffolding for future checkpoint/shutdown/ownership
reasons, add a brief doc comment on the `_state` field explaining its purpose
and consider adding an attribute to suppress unused warnings (e.g., a simple
`/// Held to keep state ownership for future checkpoint/shutdown logic.` above
`_state` and optionally `#[allow(dead_code)]`) so maintainers understand why
`SqsListener::spawn`, `TaskHandle::spawn`, and the `state` clones keep passing
state through even though it is not yet used.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 139-143: The job creation callbacks and table entries currently
hardcode NoopIngestionJobState; update the code to add a brief TODO comment near
create_s3_ingestion_job (and the analogous creation call around lines 179-190)
noting that the manager should be made generic over State and that the State
type parameter must be propagated through IngestionJobManager,
IngestionJobManagerState, and IngestionJobTableEntry (linking to the tracking
issue). Place the TODO right above the create_s3_ingestion_job closure (and the
other creation site) mentioning the exact types to change: IngestionJobManager,
IngestionJobManagerState, IngestionJobTableEntry, and NoopIngestionJobState so
future refactors can find and remove the hardcoded state.
- Around line 296-299: state.start() is awaited while holding the job_table
Mutex, which can block other operations; move the async startup off the locked
section by calling NoopIngestionJobState::default() and awaiting
state.start().await? before acquiring job_table.lock().await (i.e., create the
listener and sender with create_listener/get_new_sender, construct the state,
call state.start().await? outside the lock, then lock job_table and insert the
job entry), or alternatively restructure to release the lock before awaiting
state.start() so that state.start() never runs while job_table.lock().await is
held.

In `@components/log-ingestor/src/ingestion_job/s3_scanner.rs`:
- Around line 132-148: The code currently calls self.state.ingest(...) and sets
self.start_after before awaiting self.sender.send(...), which can commit state
while send fails and cause skipped items; change the ordering so the channel
send is awaited before committing state: first await
self.sender.send(object_metadata_to_ingest).await? and return the error if it
fails, then call self.state.ingest(&object_metadata_to_ingest,
last_ingested_key.as_str()).await? and only after that set self.start_after =
Some(last_ingested_key); ensure you use the same object_metadata_to_ingest and
last_ingested_key values and preserve error propagation.
- Around line 162-167: The struct S3Scanner currently stores a private field
named _state: State (where State: S3ScannerState) that is never read; either
remove the field to avoid surprising duplicated state clones or make its intent
explicit: if you need to hold state for lifecycle/shutdown/checkpoint
coordination, keep the field but replace the leading underscore with a clear
name (e.g., state) and add a one-line comment on S3Scanner explaining why the
scanner keeps an owned State (or switch to storing an Arc/handle to avoid
cloning into both the scanner and the spawned task). Update usages around
S3Scanner, CancellationToken, and the spawned task so only one canonical owner
or a documented shared reference remains.

In `@components/log-ingestor/src/ingestion_job/sqs_listener.rs`:
- Around line 138-161: The code currently calls
self.state.ingest(&object_metadata_to_ingest).await? and
self.sender.send(object_metadata_to_ingest).await? even when
object_metadata_to_ingest.is_empty(), and always invokes
delete_message_batch().set_entries(Some(delete_message_batch_request_entries))
which will error if delete_message_batch_request_entries is empty; fix by adding
guards: only call self.state.ingest(...) and self.sender.send(...) when
object_metadata_to_ingest.is_empty() is false, and only call the
delete_message_batch() chain when delete_message_batch_request_entries is
non-empty (ensure the vec length is between 1 and 10 before calling); update the
code paths around object_metadata_to_ingest, self.state.ingest,
self.sender.send, delete_message_batch_request_entries and the
delete_message_batch call to skip these operations when their respective
collections are empty.
- Around line 210-213: The spawn signature currently takes an unused `_state:
State` parameter; remove that dead parameter from TaskHandle::spawn (the generic
fn spawn<SqsClientManager: AwsClientManagerType<Client>, State:
SqsListenerState>(task: Task<SqsClientManager, State>, job_id: Uuid) ...) since
Task already owns its `state` field, and update all call sites that passed
`_state` (remove the `.clone()`/argument) so callers just pass the Task and
job_id; ensure the generic `State` type remains inferred from
`Task<SqsClientManager, State>` and compile by adjusting any trait bounds or
type annotations that referenced the removed parameter.

In `@components/log-ingestor/src/ingestion_job/state.rs`:
- Around line 4-27: Add clear doc comments to the trait methods to describe
their expected semantics: document IngestionJobState::start (what initialization
it must perform and when it's called), IngestionJobState::end (what
cleanup/finalization it must perform and when it's invoked),
SqsListenerState::ingest (when it will be called, whether it must be idempotent,
error-handling expectations and concurrency guarantees), and
S3ScannerState::ingest (what last_ingested_key represents, call ordering
relative to previous ingests, idempotency and error semantics). Put these
comments directly above the method signatures in state.rs so implementors of
these traits have explicit behavioral contracts for fault-tolerance and
lifecycle handling.

In `@taskfiles/tests/main.yaml`:
- Line 37: Add an inline comment immediately above the cargo nextest invocation
(the line containing "cargo nextest run --nocapture --all --all-features
--run-ignored all --release") that briefly explains why --nocapture is required
(to work around the LocalStack/nextest interaction in CI), notes the tradeoff of
increased test log noise, and suggests that it should only be removed if the
underlying LocalStack/nextest issue is resolved or verified fixed.

---

Outside diff comments:
In `@components/log-ingestor/src/compression/listener.rs`:
- Around line 144-153: The docstring for get_new_sender is stale: it states the
return type is mpsc::Sender<ObjectMetadata> but the function actually returns
mpsc::Sender<Vec<ObjectMetadata>>; update the Returns section and any inline
text to reference mpsc::Sender<Vec<ObjectMetadata>> (or "sender of
Vec<ObjectMetadata>") so it matches the function signature and clarify that
clones send Vec<ObjectMetadata> messages, referencing the get_new_sender method
and the sender field.

In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 211-222: The shutdown ordering is wrong: call
entry.ingestion_job.shutdown_and_join().await? first, then
entry.listener.shutdown_and_join().await?, and only after both complete call
entry.state.end().await?; update the match arm in the function that removes jobs
(the block that currently calls entry.state.end() before shutdowns) to perform
ingestion_job.shutdown_and_join(), then listener.shutdown_and_join(), then
state.end(), preserving the existing error propagation (using ?), and keep the
debug logs after each successful shutdown (e.g., "Ingestion job {} shut down."
and "Ingestion job {}'s listener shut down.") so state.finalization happens
last.

In `@components/log-ingestor/src/ingestion_job/sqs_listener.rs`:
- Around line 241-297: The field `_state` in struct `SqsListener<State:
SqsListenerState>` is currently stored but never read (same pattern as
`S3Scanner`); either remove it or, if it is intentional scaffolding for future
checkpoint/shutdown/ownership reasons, add a brief doc comment on the `_state`
field explaining its purpose and consider adding an attribute to suppress unused
warnings (e.g., a simple `/// Held to keep state ownership for future
checkpoint/shutdown logic.` above `_state` and optionally `#[allow(dead_code)]`)
so maintainers understand why `SqsListener::spawn`, `TaskHandle::spawn`, and the
`state` clones keep passing state through even though it is not yet used.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)

62-78: 🧹 Nitpick | 🔵 Trivial

Cancellation may be delayed when sqs_client_manager.get() or receive_message().send() blocks.

Inside the select! branch, if get().await succeeds but receive_message().send() uses long polling (up to 20 seconds via wait_time_seconds), the cancellation branch won't take effect until the SQS response returns or times out. This is generally acceptable for graceful shutdown, but worth noting: the maximum shutdown delay for this task equals the configured wait_time_sec. If faster shutdown is desired, consider using select! around just the .send() call, or using a shorter wait_time_sec with a retry loop.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job/sqs_listener.rs` around lines 62 -
78, The current select! waits for the whole async chain
(sqs_client_manager.get().await?.receive_message()...send()) so
cancel_token.cancelled() can be delayed up to wait_time_sec; to fix, split the
work: await sqs_client_manager.get().await? and build the request with
receive_message().queue_url(...).max_number_of_messages(...).wait_time_seconds(wait_time_sec)
first, then run the final .send() inside its own select! with
cancel_token.cancelled() so you can abort the long-polling send early;
alternatively, replace wait_time_sec with a shorter value and loop retries
around receive_message().send() if faster shutdown is acceptable—adjust
references to sqs_client_manager.get(), receive_message().send(),
cancel_token.cancelled(), wait_time_sec and keep calling
process_sqs_response(result?).await? after a successful send.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@components/log-ingestor/src/ingestion_job/sqs_listener.rs`:
- Around line 62-78: The current select! waits for the whole async chain
(sqs_client_manager.get().await?.receive_message()...send()) so
cancel_token.cancelled() can be delayed up to wait_time_sec; to fix, split the
work: await sqs_client_manager.get().await? and build the request with
receive_message().queue_url(...).max_number_of_messages(...).wait_time_seconds(wait_time_sec)
first, then run the final .send() inside its own select! with
cancel_token.cancelled() so you can abort the long-polling send early;
alternatively, replace wait_time_sec with a shorter value and loop retries
around receive_message().send() if faster shutdown is acceptable—adjust
references to sqs_client_manager.get(), receive_message().send(),
cancel_token.cancelled(), wait_time_sec and keep calling
process_sqs_response(result?).await? after a successful send.

---

Duplicate comments:
In `@components/log-ingestor/src/ingestion_job/sqs_listener.rs`:
- Around line 228-251: The spawn function currently takes an unused `_state:
State` parameter; remove that parameter from the TaskHandle::spawn signature and
from all call sites (e.g., where `.clone()` was only used to satisfy this
param), and adjust the caller that passes state (line where spawn is invoked) to
stop cloning/passing state — Task already carries its own state so nothing else
needs forwarding; update function signature references to
`spawn<SqsClientManager: AwsClientManagerType<Client>, State:
SqsListenerState>(task: Task<SqsClientManager, State>, job_id: Uuid)` and remove
any related unused variable bindings in the caller.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 156-158: The doc comment incorrectly references a non-existent
variant Error::InvalidNumConcurrentListenerTasks; update the comment to
reference the actual variant used for validation failures (Error::InvalidConfig)
or rephrase to say "validation errors from
ValidatedSqsListenerConfig::validate_and_create (surfaced as
Error::InvalidConfig)"; locate the comment near the function that calls
ValidatedSqsListenerConfig::validate_and_create and the Error enum to ensure
wording matches the enum variants.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)

209-216: ⚠️ Potential issue | 🟠 Major

Shutdown ordering: state.end() is called before the ingestion job is stopped.

entry.state.end() is awaited on line 211 while the scanner/listener task is still running (it isn't cancelled until shutdown_and_join() on line 212). Since the task holds its own clone of the state, a real (non-noop) state backend with shared backing storage (e.g., a database) could see state.ingest() calls from the still-running task racing with or occurring after state.end() has finalised the job's state.

The safe ordering would be to shut down the ingestion job first (ensuring the task has stopped), then call state.end():

Proposed fix
             Some(entry) => {
-                entry.state.end().await?;
                 entry.ingestion_job.shutdown_and_join().await?;
                 tracing::debug!("Ingestion job {} shut down.", job_id);
+                entry.state.end().await?;
                 entry.listener.shutdown_and_join().await?;
                 tracing::debug!("Ingestion job {}'s listener shut down.", job_id);
                 Ok(())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 209 - 216,
The shutdown ordering is unsafe: call
entry.ingestion_job.shutdown_and_join().await? and
entry.listener.shutdown_and_join().await? before awaiting
entry.state.end().await? so the running task(s) stop holding/cloning the state
prior to finalizing it; update the block that handles job removal (the match arm
handling job_to_remove / entry) to first shut down the ingestion job
(entry.ingestion_job.shutdown_and_join()), then shut down the listener
(entry.listener.shutdown_and_join()), then call entry.state.end(), preserving
the tracing::debug messages around each step and propagating errors as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/src/ingestion_job/state.rs`:
- Around line 76-81: The explicit lifetime `'object_metadata_lifetime` on the
async fn ingest signature is redundant; update the method signature in state.rs
(async fn ingest) to use plain borrowed types for the parameters (change
objects: &'object_metadata_lifetime [ObjectMetadata] and last_ingested_key:
&'object_metadata_lifetime str to objects: &[ObjectMetadata] and
last_ingested_key: &str) so both parameters use elided lifetimes while keeping
the same return type anyhow::Result<()> and behavior.

---

Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 209-216: The shutdown ordering is unsafe: call
entry.ingestion_job.shutdown_and_join().await? and
entry.listener.shutdown_and_join().await? before awaiting
entry.state.end().await? so the running task(s) stop holding/cloning the state
prior to finalizing it; update the block that handles job removal (the match arm
handling job_to_remove / entry) to first shut down the ingestion job
(entry.ingestion_job.shutdown_and_join()), then shut down the listener
(entry.listener.shutdown_and_join()), then call entry.state.end(), preserving
the tracing::debug messages around each step and propagating errors as before.

Copy link
Contributor

@hoophalab hoophalab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments.

/// for submission.
pub struct Listener {
sender: mpsc::Sender<ObjectMetadata>,
sender: mpsc::Sender<Vec<ObjectMetadata>>,
Copy link
Contributor

@hoophalab hoophalab Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing a Vec through the channel feels like a duplication of the buffer's functionality.

More critically, it's posiible that s3_scanner/sqs_listener successfully adds objects to the buffer and deletes the messages, but the buffer isn't full so they are not compressed immediately. The ingestor could crash after message delete and before buffer flushes, and there will be data loss.

It might be best to delete the buffer.rs, or move the buffer's functionality to jobs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • "Passing a Vec through the channel feels like a duplication of the buffer's functionality" is not true. Buffer is operating on a single file/metadata object to trigger size-based flush. Passing a vector through the channel is intended to improve the current batched-receiving design, which is more efficient when multiple SQS listener tasks are concurrently sending things to the channel, since it requires fewer locking operations.
  • In the fault-tolerance design, the buffer is recoverable, and there will be no data loss. I will share the design doc with you offline.
  • In general, buffering is still an important feature we need to implement inside the log-ingestor. Please review the design doc to see if you have any more comments.


/// An abstract layer for managing ingestion job states.
#[async_trait]
pub trait IngestionJobState:
Copy link
Contributor

@hoophalab hoophalab Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. IngestionJobManager uses NoopIngestionJobState directly. IngestionJobState isn't actually used.

  2. I cannot see why IngestionJobState: SqsListenerState + S3ScannerState. If IngestionJobManager needs to call state structs defined in s3_scanner.rs and sqs_listener.rs, IngestionJobState should be simple trait with fn start and fn end.

  3. On an even higher level, there would be only one struct that implements trait S3ScannerState in the foreseeable future. It might be best to use two structs in both s3_scanner.rs and sqs_listener.rs. and remove trait SqsListenerState and trait S3ScannerState.

Copy link
Member Author

@LinZhihao-723 LinZhihao-723 Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. IngestionJobManager is the server-side implementation, and I do expect it to use a concrete state instead of a generic.

  2. IngestionJobState: SqsListenerState + S3ScannerState requires the state to implement all methods required by all supported ingestion job types:

    • This implements all general job state APIs; for example, in the future, we may want to add APIs to get the number of compression jobs submitted vs. the number of files ingested. These general methods will be implemented in IngestionJobState and be used in the job level.
    • When going down to the task level, we narrow the trait to the actual job state implementation so that: (1) It avoids job type A to call job type B's specific job methods. (2) It allows different job states to have the same methods but different implementations (like the ingest method in the current two job states).

    I can see your concern about IngestionJobState: SqsListenerState + S3ScannerState. I think it might make more sense to add this constraint in the job level, for example, the state for an SQS listener might be State: IngestionJobState + SqsListenerState.

  3. Besides the reason for using a trait as explained above, using a trait allows us to implement a trivial noop state for testing, so that the unit testing won't require an external DB service. I do agree that we can still achieve this by using two structs (which is probably even simpler to implement). But I believe using the trait implementation is a more formal approach for better documentation and type safety.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated docstring

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)

212-222: ⚠️ Potential issue | 🟠 Major

state.end() is called before ingestion job tasks are stopped — potential state corruption with a real backend.

Line 214 invokes entry.state.end().await? while the ingestion job's tasks are still running (they aren't cancelled until shutdown_and_join on line 215). Since S3Scanner/SqsListener tasks hold clones of the state and may still call state.ingest(), a fault-tolerant backend could observe ingestion calls after the state has been finalized.

For ZeroFaultToleranceIngestionJobState this is harmless, but the abstract contract implies end() is a finalization step. Consider reordering:

Proposed fix
             Some(entry) => {
-                entry.state.end().await?;
                 entry.ingestion_job.shutdown_and_join().await?;
                 tracing::debug!("Ingestion job {} shut down.", job_id);
                 entry.listener.shutdown_and_join().await?;
                 tracing::debug!("Ingestion job {}'s listener shut down.", job_id);
+                entry.state.end().await?;
                 Ok(())
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/ingestion_job_manager.rs` around lines 212 - 222,
The code finalizes the job state too early: move the finalization call
entry.state.end().await? to after the running tasks are stopped to avoid state
corruption; specifically, in the match arm for Some(entry) call
entry.ingestion_job.shutdown_and_join().await? and
entry.listener.shutdown_and_join().await? first (with the existing
tracing::debug logs), then call entry.state.end().await? last, and keep the
tracing messages indicating shutdown and finalization order; this ensures
S3Scanner/SqsListener clones won't call state.ingest() after the state has been
finalized (applies to ZeroFaultToleranceIngestionJobState and other
implementations).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/tests/test_ingestion_job.rs`:
- Around line 164-172: The code unnecessarily clones sender when calling
ZeroFaultToleranceIngestionJobState::new; since sender is not used afterwards,
move it instead of cloning to avoid an extra clone and potential overhead —
replace the call that passes sender.clone() to
ZeroFaultToleranceIngestionJobState::new with a direct move of sender (created
by mpsc::channel) so SqsListener::spawn receives the owned sender via
ZeroFaultToleranceIngestionJobState::new; no other logic changes required.

---

Outside diff comments:
In `@components/log-ingestor/src/ingestion_job_manager.rs`:
- Around line 212-222: The code finalizes the job state too early: move the
finalization call entry.state.end().await? to after the running tasks are
stopped to avoid state corruption; specifically, in the match arm for
Some(entry) call entry.ingestion_job.shutdown_and_join().await? and
entry.listener.shutdown_and_join().await? first (with the existing
tracing::debug logs), then call entry.state.end().await? last, and keep the
tracing messages indicating shutdown and finalization order; this ensures
S3Scanner/SqsListener clones won't call state.ingest() after the state has been
finalized (applies to ZeroFaultToleranceIngestionJobState and other
implementations).

---

Duplicate comments:
In `@components/log-ingestor/src/ingestion_job/sqs_listener.rs`:
- Around line 225-228: The _state parameter on TaskHandle::spawn is dead code
because Task already owns its state (the Task struct's state field) — remove the
unused parameter and its passing to eliminate unnecessary .clone() at callers:
change the spawn signature (the generic bounds on SqsClientManager and State can
remain if still used elsewhere) to accept only (task: Task<..., State>, job_id:
Uuid) or simply (task: Task<...>, job_id: Uuid) and update all call sites that
currently pass state or state.clone() to stop passing that argument; ensure you
also remove any unused parameter name (_state) and related imports so the
compiler has no unused variable warnings.

In `@components/log-ingestor/src/ingestion_job/state.rs`:
- Around line 71-75: The explicit lifetime parameter 'object_metadata_lifetime'
on the async method ingest is now decorative because objects is an owned
Vec<ObjectMetadata>; remove the unused lifetime from the ingest signature in the
trait/impl (i.e., change async fn ingest<'object_metadata_lifetime>(&self, ...)
to async fn ingest(&self, objects: Vec<ObjectMetadata>, last_ingested_key: &str)
-> anyhow::Result<()>), and remove any matching lifetime mentions in related
impls or trait bounds so the signature accurately reflects that
last_ingested_key is an independent &str.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments