Skip to content

Conversation

rescrv
Copy link
Contributor

@rescrv rescrv commented Sep 19, 2025

Description of changes

Hashtag jobs.

Test plan

CI

Migration plan

N/A

Observability plan

N/A

Documentation Changes

N/A

Copy link

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

@rescrv rescrv requested a review from tanujnay112 September 19, 2025 17:48
Copy link
Contributor

Add Transform Step to Compaction Orchestrator and Introduce TransformOperator Primitive

This PR integrates a new transformation step into the compaction flow within the CompactOrchestrator by introducing and wiring up a new operator, TransformOperator. The transformation step is now available between log fetching/sourcing and log partitioning, enabling the system to process (and potentially mutate) LogRecords as part of compactions. The transformation logic is currently a pass-through (identity), but the scaffolding allows for future functional transformations. Several orchestrator flow points, error handling, and state transitions have been updated to account for the transform phase. Additionally, a new operator module (transform_log.rs) is added, and necessary glue code and handler implementations are provided.

Key Changes

• Added new module rust/worker/src/execution/operators/transform_log.rs defining TransformOperator, TransformInput, TransformOutput, and TransformError.
• Registered the new operator in rust/worker/src/execution/operators/mod.rs.
• Integrated a Transform phase in the compaction orchestrator (rust/worker/src/execution/orchestration/compact.rs): added new enum variant, error path, and state updates.
• Inserted conditional logic to route log records through the new transformation step prior to partitioning, and added new async handlers for the transformation results.
• Wired error propagation and metrics for the new step.
• Inserted function placeholders and TODOs for future function splitting and further enhancements.

Affected Areas

rust/worker/src/execution/orchestration/compact.rs (CompactionOrchestrator logic, error handling, control flow)
rust/worker/src/execution/operators/transform_log.rs (new operator component)
rust/worker/src/execution/operators/mod.rs (operator registry)

This summary was automatically generated by @propel-code-bot

Comment on lines +41 to +42
pub fn transform(&self, records: &Chunk<LogRecord>) -> Chunk<LogRecord> {
records.clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The transform method performs a shallow clone of the entire chunk of records, which can be expensive for large datasets. Since this is currently a no-op transformation (just returns records.clone()), consider if this allocation is necessary. If this is placeholder code, consider adding a TODO comment explaining the intended transformation logic.

Context for Agents
[**BestPractice**]

The `transform` method performs a shallow clone of the entire chunk of records, which can be expensive for large datasets. Since this is currently a no-op transformation (just returns `records.clone()`), consider if this allocation is necessary. If this is placeholder code, consider adding a TODO comment explaining the intended transformation logic.

File: rust/worker/src/execution/operators/transform_log.rs
Line: 42

Comment on lines +169 to +170
// Functions
function: Option<()>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The function field is defined as Option<()> which only indicates presence/absence but provides no meaningful data. Consider using a more descriptive enum or boolean field name that clearly indicates what functionality is being toggled:

Suggested change
// Functions
function: Option<()>,
// Transform function configuration
transform_enabled: bool,

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**BestPractice**]

The `function` field is defined as `Option<()>` which only indicates presence/absence but provides no meaningful data. Consider using a more descriptive enum or boolean field name that clearly indicates what functionality is being toggled:

```suggestion
// Transform function configuration
transform_enabled: bool,
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: rust/worker/src/execution/orchestration/compact.rs
Line: 170

num_materialized_logs: 0,
segment_spans: HashMap::new(),
metrics: CompactOrchestratorMetrics::default(),
function: Some(()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Hardcoded Some(()) value provides no semantic meaning. If this is meant to enable transform functionality by default, use a more explicit boolean:

Suggested change
function: Some(()),
transform_enabled: true, // Default to transform mode

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**BestPractice**]

Hardcoded `Some(())` value provides no semantic meaning. If this is meant to enable transform functionality by default, use a more explicit boolean:

```suggestion
transform_enabled: true, // Default to transform mode
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: rust/worker/src/execution/orchestration/compact.rs
Line: 329

records: Chunk<LogRecord>,
ctx: &ComponentContext<CompactOrchestrator>,
) {
if let Some(_fn) = self.function {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Variable name _fn is unclear and doesn't follow Rust naming conventions. Consider using a more descriptive name that indicates what this represents:

Suggested change
if let Some(_fn) = self.function {
if self.transform_enabled {

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**BestPractice**]

Variable name `_fn` is unclear and doesn't follow Rust naming conventions. Consider using a more descriptive name that indicates what this represents:

```suggestion
if self.transform_enabled {
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: rust/worker/src/execution/orchestration/compact.rs
Line: 344

Comment on lines +7 to +8
#[derive(Debug)]
pub struct TransformOperator {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Documentation]

A doc comment explaining the purpose of this operator, especially that it's currently a placeholder, would be helpful for future maintainers.

For example:

Suggested change
#[derive(Debug)]
pub struct TransformOperator {}
/// A transformation operator for log records.
///
/// This is currently a placeholder and performs an identity transformation (cloning the input).
/// It is intended to be extended with actual transformation logic.
#[derive(Debug)]
pub struct TransformOperator {}

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**Documentation**]

A doc comment explaining the purpose of this operator, especially that it's currently a placeholder, would be helpful for future maintainers.

For example:
```suggestion
/// A transformation operator for log records.
///
/// This is currently a placeholder and performs an identity transformation (cloning the input).
/// It is intended to be extended with actual transformation logic.
#[derive(Debug)]
pub struct TransformOperator {}
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: rust/worker/src/execution/operators/transform_log.rs
Line: 8

@rescrv rescrv marked this pull request as draft September 19, 2025 20:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant