diff --git a/src/daft-local-execution/src/dynamic_batching/mod.rs b/src/daft-local-execution/src/dynamic_batching/mod.rs index 1f03660bce..8c349f5f51 100644 --- a/src/daft-local-execution/src/dynamic_batching/mod.rs +++ b/src/daft-local-execution/src/dynamic_batching/mod.rs @@ -117,6 +117,10 @@ mod tests { // Mock RuntimeStats for testing pub(crate) struct MockRuntimeStats; impl RuntimeStats for MockRuntimeStats { + fn as_any(&self) -> &dyn std::any::Any { + self + } + fn as_any_arc(self: Arc) -> Arc { unimplemented!() } diff --git a/src/daft-local-execution/src/intermediate_ops/distributed_actor_pool_project.rs b/src/daft-local-execution/src/intermediate_ops/distributed_actor_pool_project.rs index 0ce41c02d9..c52117bdd4 100644 --- a/src/daft-local-execution/src/intermediate_ops/distributed_actor_pool_project.rs +++ b/src/daft-local-execution/src/intermediate_ops/distributed_actor_pool_project.rs @@ -219,9 +219,12 @@ impl IntermediateOperator for DistributedActorPoolProjectOperator { .and_then(NonZeroUsize::new) .map(MorselSizeRequirement::Strict) } - fn batching_strategy(&self) -> DaftResult { + fn batching_strategy( + &self, + morsel_size_requirement: MorselSizeRequirement, + ) -> DaftResult { Ok(crate::dynamic_batching::StaticBatchingStrategy::new( - self.morsel_size_requirement().unwrap_or_default(), + morsel_size_requirement, )) } } diff --git a/src/daft-local-execution/src/intermediate_ops/explode.rs b/src/daft-local-execution/src/intermediate_ops/explode.rs index d149057585..81d20534bc 100644 --- a/src/daft-local-execution/src/intermediate_ops/explode.rs +++ b/src/daft-local-execution/src/intermediate_ops/explode.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use common_error::DaftResult; use common_metrics::{ @@ -15,7 +15,12 @@ use tracing::{Span, instrument}; use super::intermediate_op::{ IntermediateOpExecuteResult, IntermediateOperator, IntermediateOperatorResult, }; -use crate::{ExecutionTaskSpawner, pipeline::NodeName, runtime_stats::RuntimeStats}; +use crate::{ + ExecutionTaskSpawner, + dynamic_batching::{BatchingState, BatchingStrategy}, + pipeline::{MorselSizeRequirement, NodeName}, + runtime_stats::RuntimeStats, +}; pub struct ExplodeStats { cpu_us: Counter, @@ -39,6 +44,9 @@ impl ExplodeStats { } impl RuntimeStats for ExplodeStats { + fn as_any(&self) -> &dyn std::any::Any { + self + } fn as_any_arc(self: Arc) -> Arc { self } @@ -96,7 +104,7 @@ impl ExplodeOperator { impl IntermediateOperator for ExplodeOperator { type State = (); - type BatchingStrategy = crate::dynamic_batching::StaticBatchingStrategy; + type BatchingStrategy = crate::dynamic_batching::DynBatchingStrategy; #[instrument(skip_all, name = "ExplodeOperator::execute")] fn execute( &self, @@ -144,9 +152,357 @@ impl IntermediateOperator for ExplodeOperator { fn make_runtime_stats(&self, id: usize) -> Arc { Arc::new(ExplodeStats::new(id)) } - fn batching_strategy(&self) -> DaftResult { - Ok(crate::dynamic_batching::StaticBatchingStrategy::new( - self.morsel_size_requirement().unwrap_or_default(), - )) + + fn batching_strategy( + &self, + morsel_size_requirement: MorselSizeRequirement, + ) -> DaftResult { + let cfg = daft_context::get_context().execution_config(); + Ok(if cfg.enable_dynamic_batching { + ExpansionAwareBatchingStrategy::new(morsel_size_requirement).into() + } else { + crate::dynamic_batching::StaticBatchingStrategy::new(morsel_size_requirement).into() + }) + } +} + +const MIN_EXPANSION: f64 = 1.0; // Prevent reduction when no expansion +const MAX_REDUCTION: f64 = 0.001; // Cap reduction factor (1000x reduction max) +const SMOOTHING_FACTOR: f64 = 0.3; // EMA smoothing for expansion ratio + +/// A batching strategy that dynamically adjusts upstream batch size requirements based on +/// observed explode expansion ratio to prevent excessive downstream batch sizes. +/// +/// # Problem +/// When an explode operator has high expansion (one input row produces many output rows), +/// normal-sized input batches produce very large output batches. If a downstream operator +/// has a strict batch size requirement, it receives far more rows than needed, causing +/// memory pressure and inefficient execution. +/// +/// # Solution +/// This strategy monitors the explode's expansion ratio (output_rows / input_rows) and +/// reduces the upstream batch size requirement accordingly. If expansion is 100x, the +/// explode requests ~100x fewer rows from upstream to produce the right output size. +/// +/// # Example +/// - Downstream operator requires `Strict(100)` rows +/// - Explode has 50x expansion +/// - Strategy reduces upstream requirement to `Strict(2)` rows +/// - Explode processes 2 rows → outputs ~100 rows → downstream executes efficiently +/// +/// # Smoothing +/// Uses exponential moving average (EMA) to smooth expansion measurements across batches, +/// preventing wild swings in batch size from transient expansion changes. +/// +/// # Safety Bounds +/// - `MIN_EXPANSION`: Prevents amplification when expansion is less than 1x +/// - `MAX_REDUCTION`: Caps minimum batch size to prevent fetching too few rows +#[derive(Debug, Clone)] +struct ExpansionAwareBatchingStrategy { + downstream_requirement: MorselSizeRequirement, +} + +impl ExpansionAwareBatchingStrategy { + pub fn new(downstream_requirement: MorselSizeRequirement) -> Self { + Self { + downstream_requirement, + } + } + + fn reduce_requirement( + &self, + requirement: MorselSizeRequirement, + factor: f64, + ) -> MorselSizeRequirement { + match requirement { + MorselSizeRequirement::Strict(size) => { + let new_size = ((size.get() as f64) * factor).ceil() as usize; + MorselSizeRequirement::Strict(NonZeroUsize::new(new_size.max(1)).unwrap()) + } + MorselSizeRequirement::Flexible(lower, upper) => { + let new_lower = ((lower as f64) * factor).ceil() as usize; + let new_upper = ((upper.get() as f64) * factor).ceil() as usize; + MorselSizeRequirement::Flexible( + new_lower.max(1), + NonZeroUsize::new(new_upper.max(1)).unwrap(), + ) + } + } + } +} + +impl BatchingStrategy for ExpansionAwareBatchingStrategy { + type State = ExpansionState; + + fn make_state(&self) -> Self::State { + ExpansionState { + downstream_requirement: self.downstream_requirement, + input_rows: 0, + output_rows: 0, + smoothed_expansion: None, + } + } + + fn calculate_new_requirements(&self, state: &mut Self::State) -> MorselSizeRequirement { + if let Some(expansion) = state.smoothed_expansion { + let clamped_expansion = expansion.max(MIN_EXPANSION); + let reduction = (1.0 / clamped_expansion).max(MAX_REDUCTION); + self.reduce_requirement(state.downstream_requirement, reduction) + } else { + state.downstream_requirement + } + } + + fn initial_requirements(&self) -> MorselSizeRequirement { + self.downstream_requirement + } +} + +#[derive(Debug)] +struct ExpansionState { + downstream_requirement: MorselSizeRequirement, + input_rows: u64, + output_rows: u64, + smoothed_expansion: Option, +} + +impl BatchingState for ExpansionState { + fn record_execution_stat( + &mut self, + stats: &dyn RuntimeStats, + _batch_size: usize, + _duration: Duration, + ) { + if let Some(explode_stats) = stats.as_any().downcast_ref::() { + self.input_rows = explode_stats + .rows_in + .load(std::sync::atomic::Ordering::Relaxed); + self.output_rows = explode_stats + .rows_out + .load(std::sync::atomic::Ordering::Relaxed); + + if self.input_rows > 0 { + let current_expansion = (self.output_rows as f64) / (self.input_rows as f64); + + self.smoothed_expansion = Some(match self.smoothed_expansion { + Some(prev) => { + SMOOTHING_FACTOR.mul_add(current_expansion, (1.0 - SMOOTHING_FACTOR) * prev) + } + None => current_expansion, + }); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + fn new_explode_stats(rows_in: u64, rows_out: u64) -> ExplodeStats { + let stats = ExplodeStats::new(0); + stats.add_rows_in(rows_in); + stats.add_rows_out(rows_out); + stats + } + + #[test] + fn test_initial_requirements_match_downstream() { + let strategy = ExpansionAwareBatchingStrategy::new(MorselSizeRequirement::Strict( + NonZeroUsize::new(10).unwrap(), + )); + assert_eq!( + strategy.initial_requirements(), + MorselSizeRequirement::Strict(NonZeroUsize::new(10).unwrap()) + ); + + let strategy = ExpansionAwareBatchingStrategy::new(MorselSizeRequirement::Flexible( + 5, + NonZeroUsize::new(20).unwrap(), + )); + assert_eq!( + strategy.initial_requirements(), + MorselSizeRequirement::Flexible(5, NonZeroUsize::new(20).unwrap()) + ); + } + + #[test] + fn test_no_reduction_without_stats() { + let strategy = ExpansionAwareBatchingStrategy::new(MorselSizeRequirement::Strict( + NonZeroUsize::new(10).unwrap(), + )); + let mut state = strategy.make_state(); + + let requirement = strategy.calculate_new_requirements(&mut state); + assert_eq!( + requirement, + MorselSizeRequirement::Strict(NonZeroUsize::new(10).unwrap()) + ); + } + + #[test] + fn test_high_expansion_reduces_requirement() { + let strategy = ExpansionAwareBatchingStrategy::new(MorselSizeRequirement::Strict( + NonZeroUsize::new(1000).unwrap(), + )); + let mut state = strategy.make_state(); + + // Simulate 100x expansion (10 in, 1000 out) + let stats: Arc = Arc::new(new_explode_stats(10, 1000)); + state.record_execution_stat(stats.as_ref(), 10, Duration::from_millis(100)); + + let requirement = strategy.calculate_new_requirements(&mut state); + match requirement { + MorselSizeRequirement::Strict(size) => { + // Should reduce by ~100x for 100x expansion + assert!(size.get() < 20, "Expected reduced size < 20, got {}", size); + } + _ => panic!("Expected Strict requirement"), + } + } + + #[test] + fn test_low_expansion_minimal_reduction() { + let strategy = ExpansionAwareBatchingStrategy::new(MorselSizeRequirement::Strict( + NonZeroUsize::new(100).unwrap(), + )); + let mut state = strategy.make_state(); + + // Simulate 1.1x expansion (100 in, 110 out) + let stats: Arc = Arc::new(new_explode_stats(100, 110)); + state.record_execution_stat(stats.as_ref(), 100, Duration::from_millis(100)); + + let requirement = strategy.calculate_new_requirements(&mut state); + match requirement { + MorselSizeRequirement::Strict(size) => { + // Should only reduce by ~1.1x for 1.1x expansion + assert!( + size.get() >= 85 && size.get() <= 100, + "Expected size 85-100, got {}", + size + ); + } + _ => panic!("Expected Strict requirement"), + } + } + + #[test] + fn test_flexible_requirement_reduction() { + let strategy = ExpansionAwareBatchingStrategy::new(MorselSizeRequirement::Flexible( + 100, + NonZeroUsize::new(1000).unwrap(), + )); + let mut state = strategy.make_state(); + + // Simulate 10x expansion + let stats = Arc::new(new_explode_stats(100, 1000)); + state.record_execution_stat(stats.as_ref(), 100, Duration::from_millis(100)); + + let requirement = strategy.calculate_new_requirements(&mut state); + match requirement { + MorselSizeRequirement::Flexible(lower, upper) => { + // Should reduce by ~10x for 10x expansion + assert!(lower <= 15, "Expected lower <= 15, got {}", lower); + assert!(upper.get() <= 150, "Expected upper <= 150, got {}", upper); + } + _ => panic!("Expected Flexible requirement"), + } + } + + #[test] + fn test_max_reduction_cap() { + let strategy = ExpansionAwareBatchingStrategy::new(MorselSizeRequirement::Strict( + NonZeroUsize::new(10000).unwrap(), + )); + let mut state = strategy.make_state(); + + // Simulate 100000x expansion (should hit max cap) + let stats: Arc = Arc::new(new_explode_stats(1, 100000)); + state.record_execution_stat(stats.as_ref(), 1, Duration::from_millis(100)); + + let requirement = strategy.calculate_new_requirements(&mut state); + match requirement { + MorselSizeRequirement::Strict(size) => { + // Should be capped at MAX_REDUCTION * 10000 + assert!( + size.get() >= (MAX_REDUCTION * 10000.0) as usize, + "Expected floor at {}, got {}", + MAX_REDUCTION * 10000.0, + size + ); + } + _ => panic!("Expected Strict requirement"), + } + } + + #[test] + fn test_smoothing_across_multiple_recordings() { + let strategy = ExpansionAwareBatchingStrategy::new(MorselSizeRequirement::Strict( + NonZeroUsize::new(1000).unwrap(), + )); + let mut state = strategy.make_state(); + + // First recording: 2x expansion + let stats1: Arc = Arc::new(new_explode_stats(100, 200)); + state.record_execution_stat(stats1.as_ref(), 100, Duration::from_millis(100)); + let first_expansion = state.smoothed_expansion.unwrap(); + + // Second recording: 10x expansion + let stats2: Arc = Arc::new(new_explode_stats(200, 2000)); + state.record_execution_stat(stats2.as_ref(), 100, Duration::from_millis(100)); + let second_expansion = state.smoothed_expansion.unwrap(); + + // Should be smoothed between the two + assert!( + second_expansion > first_expansion, + "Expansion should increase" + ); + assert!( + second_expansion < 10.0, + "Should be smoothed, not instantaneous" + ); + } + + #[test] + fn test_no_expansion_handled_gracefully() { + let strategy = ExpansionAwareBatchingStrategy::new(MorselSizeRequirement::Strict( + NonZeroUsize::new(100).unwrap(), + )); + let mut state = strategy.make_state(); + + // Simulate 1x expansion (no change) + let stats: Arc = Arc::new(new_explode_stats(100, 100)); + state.record_execution_stat(stats.as_ref(), 100, Duration::from_millis(100)); + + let requirement = strategy.calculate_new_requirements(&mut state); + match requirement { + MorselSizeRequirement::Strict(size) => { + // Should not reduce with 1x expansion + assert_eq!(size.get(), 100); + } + _ => panic!("Expected Strict requirement"), + } + } + + #[test] + fn test_cumulative_stats_update() { + let strategy = ExpansionAwareBatchingStrategy::new(MorselSizeRequirement::Strict( + NonZeroUsize::new(1000).unwrap(), + )); + let mut state = strategy.make_state(); + + // Stats should be cumulative, not incremental + let stats1 = Arc::new(new_explode_stats(100, 200)); + state.record_execution_stat(stats1.as_ref(), 100, Duration::from_millis(100)); + + let stats2 = Arc::new(new_explode_stats(200, 400)); + state.record_execution_stat(stats2.as_ref(), 100, Duration::from_millis(100)); + + // State should reflect latest cumulative values + assert_eq!(state.input_rows, 200); + assert_eq!(state.output_rows, 400); + assert_eq!(state.smoothed_expansion, Some(2.0)); } } diff --git a/src/daft-local-execution/src/intermediate_ops/filter.rs b/src/daft-local-execution/src/intermediate_ops/filter.rs index a9bf82f289..c35a565ea0 100644 --- a/src/daft-local-execution/src/intermediate_ops/filter.rs +++ b/src/daft-local-execution/src/intermediate_ops/filter.rs @@ -13,7 +13,11 @@ use tracing::{Span, instrument}; use super::intermediate_op::{ IntermediateOpExecuteResult, IntermediateOperator, IntermediateOperatorResult, }; -use crate::{ExecutionTaskSpawner, pipeline::NodeName, runtime_stats::RuntimeStats}; +use crate::{ + ExecutionTaskSpawner, + pipeline::{MorselSizeRequirement, NodeName}, + runtime_stats::RuntimeStats, +}; pub struct FilterStats { cpu_us: Counter, @@ -49,6 +53,9 @@ impl FilterStats { } impl RuntimeStats for FilterStats { + fn as_any(&self) -> &dyn std::any::Any { + self + } fn as_any_arc(self: Arc) -> Arc { self } @@ -134,9 +141,12 @@ impl IntermediateOperator for FilterOperator { } fn make_state(&self) -> Self::State {} - fn batching_strategy(&self) -> DaftResult { + fn batching_strategy( + &self, + morsel_size_requirement: MorselSizeRequirement, + ) -> DaftResult { Ok(crate::dynamic_batching::StaticBatchingStrategy::new( - self.morsel_size_requirement().unwrap_or_default(), + morsel_size_requirement, )) } } diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index 983fb000c1..09a4addfeb 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -71,7 +71,10 @@ pub(crate) trait IntermediateOperator: Send + Sync { None } - fn batching_strategy(&self) -> DaftResult; + fn batching_strategy( + &self, + morsel_size_requirement: MorselSizeRequirement, + ) -> DaftResult; } pub struct IntermediateNode { @@ -437,6 +440,11 @@ impl PipelineNode for IntermediateNode { let stats_manager = runtime_handle.stats_manager(); let node_id = self.node_id(); let runtime_stats = self.runtime_stats.clone(); + let strategy = + op.batching_strategy(self.morsel_size_requirement) + .context(PipelineExecutionSnafu { + node_name: op.name().to_string(), + })?; runtime_handle.spawn( async move { // Initialize state pool with max_concurrency states @@ -446,11 +454,7 @@ impl PipelineNode for IntermediateNode { } // Create batch manager and task set - let batch_manager = Arc::new(BatchManager::new(op.batching_strategy().context( - PipelineExecutionSnafu { - node_name: op.name().to_string(), - }, - )?)); + let batch_manager = Arc::new(BatchManager::new(strategy)); let task_set = OrderingAwareJoinSet::new(maintain_order); // Process each child receiver sequentially diff --git a/src/daft-local-execution/src/intermediate_ops/into_batches.rs b/src/daft-local-execution/src/intermediate_ops/into_batches.rs index 7aa2dd48d6..0166d00930 100644 --- a/src/daft-local-execution/src/intermediate_ops/into_batches.rs +++ b/src/daft-local-execution/src/intermediate_ops/into_batches.rs @@ -79,9 +79,12 @@ impl IntermediateOperator for IntoBatchesOperator { } } } - fn batching_strategy(&self) -> DaftResult { + fn batching_strategy( + &self, + morsel_size_requirement: MorselSizeRequirement, + ) -> DaftResult { Ok(crate::dynamic_batching::StaticBatchingStrategy::new( - self.morsel_size_requirement().unwrap_or_default(), + morsel_size_requirement, )) } } diff --git a/src/daft-local-execution/src/intermediate_ops/project.rs b/src/daft-local-execution/src/intermediate_ops/project.rs index 01acaad206..2f2619f554 100644 --- a/src/daft-local-execution/src/intermediate_ops/project.rs +++ b/src/daft-local-execution/src/intermediate_ops/project.rs @@ -216,7 +216,10 @@ impl IntermediateOperator for ProjectOperator { fn make_state(&self) -> Self::State {} - fn batching_strategy(&self) -> DaftResult { + fn batching_strategy( + &self, + morsel_size_requirement: MorselSizeRequirement, + ) -> DaftResult { let cfg = daft_context::get_context().execution_config(); Ok(if cfg.enable_dynamic_batching { @@ -244,7 +247,7 @@ impl IntermediateOperator for ProjectOperator { _ => unreachable!("should already be checked in the ctx"), } } else { - StaticBatchingStrategy::new(self.morsel_size_requirement().unwrap_or_default()).into() + StaticBatchingStrategy::new(morsel_size_requirement).into() }) } } diff --git a/src/daft-local-execution/src/intermediate_ops/udf.rs b/src/daft-local-execution/src/intermediate_ops/udf.rs index 5d7713b964..3074c0c274 100644 --- a/src/daft-local-execution/src/intermediate_ops/udf.rs +++ b/src/daft-local-execution/src/intermediate_ops/udf.rs @@ -100,6 +100,9 @@ struct UdfRuntimeStats { } impl RuntimeStats for UdfRuntimeStats { + fn as_any(&self) -> &dyn std::any::Any { + self + } fn as_any_arc(self: Arc) -> Arc { self } @@ -607,7 +610,10 @@ impl IntermediateOperator for UdfOperator { }) } - fn batching_strategy(&self) -> DaftResult { + fn batching_strategy( + &self, + morsel_size_requirement: MorselSizeRequirement, + ) -> DaftResult { let cfg = daft_context::get_context().execution_config(); Ok(if cfg.enable_dynamic_batching { @@ -635,7 +641,7 @@ impl IntermediateOperator for UdfOperator { _ => unreachable!("should already be checked in the ctx"), } } else { - StaticBatchingStrategy::new(self.morsel_size_requirement().unwrap_or_default()).into() + StaticBatchingStrategy::new(morsel_size_requirement).into() }) } } diff --git a/src/daft-local-execution/src/intermediate_ops/unpivot.rs b/src/daft-local-execution/src/intermediate_ops/unpivot.rs index f16b50dc08..0f0bc61233 100644 --- a/src/daft-local-execution/src/intermediate_ops/unpivot.rs +++ b/src/daft-local-execution/src/intermediate_ops/unpivot.rs @@ -10,7 +10,10 @@ use tracing::{Span, instrument}; use super::intermediate_op::{ IntermediateOpExecuteResult, IntermediateOperator, IntermediateOperatorResult, }; -use crate::{ExecutionTaskSpawner, pipeline::NodeName}; +use crate::{ + ExecutionTaskSpawner, + pipeline::{MorselSizeRequirement, NodeName}, +}; struct UnpivotParams { ids: Vec, @@ -95,9 +98,12 @@ impl IntermediateOperator for UnpivotOperator { fn make_state(&self) -> Self::State {} - fn batching_strategy(&self) -> DaftResult { + fn batching_strategy( + &self, + morsel_size_requirement: MorselSizeRequirement, + ) -> DaftResult { Ok(crate::dynamic_batching::StaticBatchingStrategy::new( - self.morsel_size_requirement().unwrap_or_default(), + morsel_size_requirement, )) } } diff --git a/src/daft-local-execution/src/runtime_stats/values.rs b/src/daft-local-execution/src/runtime_stats/values.rs index 96a83a8781..9e4fd5e549 100644 --- a/src/daft-local-execution/src/runtime_stats/values.rs +++ b/src/daft-local-execution/src/runtime_stats/values.rs @@ -8,6 +8,7 @@ use opentelemetry::{KeyValue, global}; // ----------------------- General Traits for Runtime Stat Collection ----------------------- // pub trait RuntimeStats: Send + Sync + std::any::Any { + fn as_any(&self) -> &dyn std::any::Any; fn as_any_arc(self: Arc) -> Arc; /// Create a snapshot of the current statistics. fn build_snapshot(&self, ordering: Ordering) -> StatSnapshot; @@ -48,6 +49,9 @@ impl DefaultRuntimeStats { } impl RuntimeStats for DefaultRuntimeStats { + fn as_any(&self) -> &dyn std::any::Any { + self + } fn as_any_arc(self: Arc) -> Arc { self } diff --git a/src/daft-local-execution/src/sinks/write.rs b/src/daft-local-execution/src/sinks/write.rs index 8df6aaf20a..414f60c3e1 100644 --- a/src/daft-local-execution/src/sinks/write.rs +++ b/src/daft-local-execution/src/sinks/write.rs @@ -52,6 +52,9 @@ impl WriteStats { } impl RuntimeStats for WriteStats { + fn as_any(&self) -> &dyn std::any::Any { + self + } fn as_any_arc(self: Arc) -> Arc { self } diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index 86c32dce67..c410dff57a 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -49,6 +49,9 @@ impl SourceStats { } impl RuntimeStats for SourceStats { + fn as_any(&self) -> &dyn std::any::Any { + self + } fn as_any_arc(self: Arc) -> Arc { self } diff --git a/src/daft-local-execution/src/streaming_sink/async_udf.rs b/src/daft-local-execution/src/streaming_sink/async_udf.rs index d96bd1332b..a9bbd85c30 100644 --- a/src/daft-local-execution/src/streaming_sink/async_udf.rs +++ b/src/daft-local-execution/src/streaming_sink/async_udf.rs @@ -51,6 +51,9 @@ struct AsyncUdfRuntimeStats { } impl RuntimeStats for AsyncUdfRuntimeStats { + fn as_any(&self) -> &dyn std::any::Any { + self + } fn as_any_arc(self: Arc) -> Arc { self }