Skip to content

Conversation

@universalmind303
Copy link
Member

Changes Made

new dynamic batching strategy for explodes. Since explodes can greatly increase the selectivity and potentially blow up memory, I thought we could use our dynamic batching to inform the explode operator to back off. So now similar to this PR which increased the batch sizes for filters with low cardinality, This PR is essentially the inverse of that, but for explodes.

See comment here #5924 (comment)

Related Issues

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 20, 2026

Greptile Summary

This PR introduces expansion-aware dynamic batching for the explode operator to prevent memory pressure from high-cardinality expansions. The implementation monitors the explode's expansion ratio (output rows / input rows) and dynamically reduces upstream batch size requirements proportionally, so that when an explode has 100x expansion, it requests ~100x fewer rows from upstream to produce appropriately sized downstream batches.

Key changes:

  • Added ExpansionAwareBatchingStrategy with exponential moving average (EMA) smoothing to handle varying expansion ratios gracefully
  • Modified IntermediateOperator::batching_strategy trait method signature to accept morsel_size_requirement as a parameter instead of operators calling self.morsel_size_requirement() internally
  • Added as_any() method to RuntimeStats trait to enable downcasting for type-specific stats access in batching strategies
  • Updated all intermediate operators (filter, project, udf, distributed_actor_pool_project, into_batches, unpivot) to match the new trait signature
  • Comprehensive test coverage for the new batching strategy including edge cases for high/low expansion, smoothing, and cumulative stats

The implementation includes safety bounds (MIN_EXPANSION, MAX_REDUCTION) to prevent pathological cases and uses cumulative statistics from Counter metrics.

Confidence Score: 5/5

  • This PR is safe to merge with minimal risk
  • The implementation is well-tested with comprehensive unit tests covering edge cases, uses proper safety bounds to prevent pathological behavior, and follows the established patterns from the filter operator's dynamic batching (PR feat(dynamic batching): add new selectivity aware batching strategy for filter op #5924). The trait signature change is consistently applied across all operators, and the cumulative stats handling is correct.
  • No files require special attention

Important Files Changed

Filename Overview
src/daft-local-execution/src/intermediate_ops/explode.rs Added ExpansionAwareBatchingStrategy that dynamically reduces upstream batch sizes based on observed explode expansion ratio, with comprehensive tests and EMA smoothing
src/daft-local-execution/src/intermediate_ops/intermediate_op.rs Modified batching_strategy trait method to accept morsel_size_requirement parameter, batching strategy is now created earlier before spawning
src/daft-local-execution/src/runtime_stats/values.rs Added as_any() method to RuntimeStats trait to enable downcasting for type-specific stats access in batching strategies

Sequence Diagram

sequenceDiagram
    participant Upstream as Upstream Operator
    participant BE as BatchManager
    participant EO as ExplodeOperator
    participant ES as ExpansionState
    participant Stats as ExplodeStats
    participant Downstream as Downstream Operator

    Note over BE: Initial requirement from<br/>downstream (e.g., Strict(1000))
    BE->>Upstream: Request initial batch<br/>(Strict(1000) rows)
    Upstream->>EO: Send 1000 rows
    EO->>EO: Explode operation<br/>(e.g., 100x expansion)
    EO->>Stats: Update rows_in=1000,<br/>rows_out=100000
    EO->>Downstream: Send 100000 rows
    
    Note over BE,ES: Record execution stats
    BE->>ES: record_execution_stat(stats)
    ES->>Stats: Load cumulative counters<br/>(rows_in, rows_out)
    ES->>ES: Calculate expansion ratio<br/>(100000/1000 = 100)
    ES->>ES: Apply EMA smoothing<br/>smoothed_expansion = 100
    
    Note over BE: Calculate new requirements
    BE->>ES: calculate_new_requirements()
    ES->>ES: reduction = 1/100 = 0.01
    ES->>BE: Return Strict(10)
    
    Note over BE: Next batch uses reduced size
    BE->>Upstream: Request reduced batch<br/>(Strict(10) rows)
    Upstream->>EO: Send 10 rows
    EO->>EO: Explode operation<br/>(~100x expansion)
    EO->>Stats: Update rows_in=1010,<br/>rows_out=101000
    EO->>Downstream: Send ~1000 rows
    
    Note over Downstream: Receives appropriately<br/>sized batches
Loading

@codecov
Copy link

codecov bot commented Jan 20, 2026

Codecov Report

❌ Patch coverage is 86.25954% with 36 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.93%. Comparing base (7bec778) to head (d8787e6).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...ft-local-execution/src/intermediate_ops/explode.rs 95.12% 10 Missing ⚠️
...intermediate_ops/distributed_actor_pool_project.rs 0.00% 5 Missing ⚠️
...c/daft-local-execution/src/dynamic_batching/mod.rs 0.00% 3 Missing ⚠️
...aft-local-execution/src/intermediate_ops/filter.rs 62.50% 3 Missing ⚠️
...c/daft-local-execution/src/intermediate_ops/udf.rs 62.50% 3 Missing ⚠️
...c/daft-local-execution/src/runtime_stats/values.rs 0.00% 3 Missing ⚠️
src/daft-local-execution/src/sinks/write.rs 0.00% 3 Missing ⚠️
src/daft-local-execution/src/sources/source.rs 0.00% 3 Missing ⚠️
...ft-local-execution/src/streaming_sink/async_udf.rs 0.00% 3 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #6069      +/-   ##
==========================================
+ Coverage   72.91%   72.93%   +0.01%     
==========================================
  Files         973      973              
  Lines      126166   126407     +241     
==========================================
+ Hits        91995    92191     +196     
- Misses      34171    34216      +45     
Files with missing lines Coverage Δ
...-execution/src/intermediate_ops/intermediate_op.rs 87.50% <100.00%> (+0.36%) ⬆️
...cal-execution/src/intermediate_ops/into_batches.rs 90.74% <100.00%> (+0.54%) ⬆️
...ft-local-execution/src/intermediate_ops/project.rs 92.47% <100.00%> (+0.10%) ⬆️
...ft-local-execution/src/intermediate_ops/unpivot.rs 81.81% <100.00%> (+0.86%) ⬆️
...c/daft-local-execution/src/dynamic_batching/mod.rs 90.28% <0.00%> (-1.58%) ⬇️
...aft-local-execution/src/intermediate_ops/filter.rs 91.80% <62.50%> (-2.17%) ⬇️
...c/daft-local-execution/src/intermediate_ops/udf.rs 93.48% <62.50%> (-0.67%) ⬇️
...c/daft-local-execution/src/runtime_stats/values.rs 84.21% <0.00%> (-7.22%) ⬇️
src/daft-local-execution/src/sinks/write.rs 90.07% <0.00%> (-1.96%) ⬇️
src/daft-local-execution/src/sources/source.rs 85.54% <0.00%> (-1.51%) ⬇️
... and 3 more

... and 3 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants