Skip to content

refactor(pipeline): unify execution loop and document semantics#5

Merged
cspinetta merged 1 commit intomainfrom
refactor/pipeline-core-and-docs
Feb 24, 2026
Merged

refactor(pipeline): unify execution loop and document semantics#5
cspinetta merged 1 commit intomainfrom
refactor/pipeline-core-and-docs

Conversation

@cspinetta
Copy link
Copy Markdown
Collaborator

Description

Refactor pipeline.rs to centralize execution logic in a single core loop and clarify pipeline semantics through improved documentation.

This removes duplicated stage execution code across run, run_streaming, and observed variants, introduces a clean streaming injection abstraction, and makes span handling uniform.

No functional behavior changes are intended.

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Code refactoring
  • Performance improvement
  • Test addition or update

Note: Pipeline::run_streaming now requires Send for the callback. This aligns with the goal of being spawn-safe on multithreaded runtimes. Existing tests pass.

Related Issues

Fixes #
Related to #

Changes Made

  • Consolidated duplicated stage loops into a single run_pipeline_core function

  • Introduced OutputHook trait to inject streaming behavior without branching execution logic

  • Unified root span handling with fan-out spans (Span + Instant pattern)

  • Simplified Pipeline::{run, run_streaming} and ObservablePipeline variants

  • Rewrote module-level documentation to explicitly define:

    • Pipeline execution model (Single vs Parallel)
    • Carry-forward precedence rules
    • Fan-out merge format (JSON array of result_text)
    • Completion-order semantics for fan-out
    • Failure/early-stop behavior
  • Clarified internal helper docs (extract_carry_data, merge_parallel_outputs, emit_synthetic_chunk)

Testing

  • All existing tests pass (cargo test --workspace)
  • Added new tests for the changes
  • Tested manually with mock sandbox
  • Tested manually with KVM sandbox (if applicable)
  • Verified examples still work

Test Commands Run

cargo fmt --all
cargo clippy --all-targets --all-features -- -D warnings
cargo test
cargo doc --no-deps
scripts/build_claude_rootfs.sh
cargo build --example ollama_local
codesign --force --sign - --entitlements voidbox.entitlements target/debug/examples/ollama_local
OLLAMA_HOST=0.0.0.0:11434 ollama serve
ollama ps
OLLAMA_MODEL=qwen3-coder VOID_BOX_KERNEL=target/vmlinuz-ubuntu-arm64 VOID_BOX_INITRAMFS=target/void-box-rootfs.cpio.gz target/debug/examples/ollama_local

Documentation

  • Updated inline code documentation
  • Updated README.md (if user-facing changes)
  • Updated CHANGELOG.md
  • Added/updated examples (if applicable)

Code Quality

  • Code follows project style guidelines (cargo fmt)
  • No clippy warnings (cargo clippy --workspace --all-targets)
  • Documentation builds without warnings (cargo doc --no-deps)
  • All files have appropriate licensing headers (if applicable)

Screenshots (if applicable)

N/A

Checklist

  • My code follows the project's coding standards
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes

Additional Notes

This refactor intentionally centralizes execution semantics in run_pipeline_core.
If future changes modify carry-forward behavior or fan-out merge format, both the module-level documentation and tests should be updated accordingly.

No changes to runtime behavior are expected; this is primarily structural cleanup and documentation clarification.

- Replace duplicated run/run_streaming/observe loops with run_pipeline_core
- Add OutputHook to inject streaming behavior
- Make root span handling uniform with fan-out spans (Span + Instant)
- Clarify carry-forward precedence, fan-out merge format, and ordering
- Tighten docs for contributors and stage result ordering semantics
- Require Send for run_streaming callback to support multithreaded runtimes
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR successfully refactors the pipeline execution logic to eliminate code duplication and improve maintainability. The changes centralize all execution semantics into a single run_pipeline_core function, introduce a clean OutputHook trait abstraction for streaming behavior, and significantly enhance module-level documentation.

Changes:

  • Consolidated four duplicated stage execution loops (from Pipeline::run, Pipeline::run_streaming, ObservablePipeline::run, ObservablePipeline::run_streaming) into a single run_pipeline_core function
  • Introduced OutputHook trait to abstract streaming vs non-streaming behavior
  • Enhanced module documentation with comprehensive execution model, data passing rules, failure semantics, and design notes for contributors
  • Added Send bound to streaming callbacks to ensure spawn-safety on multi-threaded runtimes

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}

/// Emit a synthetic ExecOutputChunk for callers that want at least one event per stage.
/// Ensures streaming callers observe at least one event per stage even if the VM produced no chunks.
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states this function "ensures streaming callers observe at least one event per stage even if the VM produced no chunks," but the implementation only emits a chunk when result_text is non-empty. Consider clarifying that no chunk is emitted if the stage produces no result_text. For example: "Ensures streaming callers observe at least one event per stage (from result_text) even if the VM produced no live chunks during execution."

Suggested change
/// Ensures streaming callers observe at least one event per stage even if the VM produced no chunks.
/// Ensures streaming callers observe at least one event per stage (from `result_text`)
/// even if the VM produced no live chunks during execution. No chunk is emitted if
/// the stage's `result_text` is empty.

Copilot uses AI. Check for mistakes.
@cspinetta cspinetta merged commit 56e1075 into main Feb 24, 2026
18 checks passed
@cspinetta cspinetta deleted the refactor/pipeline-core-and-docs branch February 24, 2026 23:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants