diff --git a/src/common/scan-info/src/lib.rs b/src/common/scan-info/src/lib.rs index c90e8f8e04..8894296d10 100644 --- a/src/common/scan-info/src/lib.rs +++ b/src/common/scan-info/src/lib.rs @@ -13,6 +13,7 @@ pub mod test; use std::{fmt::Debug, hash::Hash, sync::Arc}; use common_display::{DisplayAs, DisplayLevel}; +use common_error::DaftResult; use daft_schema::schema::SchemaRef; pub use expr_rewriter::{PredicateGroups, rewrite_predicate_for_partitioning}; pub use partitioning::{PartitionField, PartitionTransform}; @@ -24,6 +25,132 @@ pub use scan_task::{SPLIT_AND_MERGE_PASS, ScanTaskLike, ScanTaskLikeRef}; use serde::{Deserialize, Serialize}; pub use sharder::{Sharder, ShardingStrategy}; +/// Pre-computed estimated stats for use by the optimizer when scan tasks are lazy. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LazyTaskStats { + pub estimated_num_tasks: usize, + pub estimated_total_bytes: usize, + pub estimated_total_rows: usize, +} + +/// An owned iterator over an `Arc>` that yields cloned items by index. +struct OwnedArcVecIter { + tasks: Arc>, + index: usize, +} + +impl Iterator for OwnedArcVecIter { + type Item = DaftResult; + + fn next(&mut self) -> Option { + if self.index < self.tasks.len() { + let item = self.tasks[self.index].clone(); + self.index += 1; + Some(Ok(item)) + } else { + None + } + } +} + +/// Iterator type returned by `LazyTaskProducer`'s factory. +pub type ScanTaskIterator = Box> + Send>; + +/// Factory type for producing scan task iterators. +pub type ScanTaskFactory = Arc DaftResult + Send + Sync>; + +/// A factory that produces a fresh scan task iterator each time it is called. +/// +/// Each call may re-run the glob (lazy, paginated), so multiple consumers get +/// independent iteration. For the limit case, only one consumer (the executor) +/// actually iterates, and it stops early. +pub struct LazyTaskProducer { + factory: ScanTaskFactory, + pub estimated_stats: LazyTaskStats, +} + +impl LazyTaskProducer { + pub fn new(factory: ScanTaskFactory, estimated_stats: LazyTaskStats) -> Self { + Self { + factory, + estimated_stats, + } + } + + /// Create a LazyTaskProducer from an already-materialized Vec of scan tasks. + /// Used as backward-compatible default for scan operators that don't support lazy iteration. + pub fn from_vec(tasks: Vec) -> Self { + let len = tasks.len(); + // Pre-compute stats from the actual tasks. + let mut total_rows = 0usize; + let mut total_bytes = 0usize; + for task in &tasks { + if let Some(num_rows) = task.num_rows() { + total_rows += num_rows; + } else if let Some(approx) = task.approx_num_rows(None) { + total_rows += approx as usize; + } + total_bytes += task.estimate_in_memory_size_bytes(None).unwrap_or(0); + } + let tasks = Arc::new(tasks); + Self { + factory: Arc::new(move || { + let tasks = tasks.clone(); + Ok(Box::new(OwnedArcVecIter { tasks, index: 0 }) as ScanTaskIterator) + }), + estimated_stats: LazyTaskStats { + estimated_num_tasks: len, + estimated_total_bytes: total_bytes, + estimated_total_rows: total_rows, + }, + } + } + + /// Produce a fresh iterator of scan tasks. + pub fn produce(&self) -> DaftResult { + (self.factory)() + } + + /// Eagerly collect all scan tasks into a Vec. + pub fn collect_tasks(&self) -> DaftResult> { + self.produce()?.collect() + } +} + +impl Debug for LazyTaskProducer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LazyTaskProducer") + .field("estimated_stats", &self.estimated_stats) + .finish_non_exhaustive() + } +} + +impl Clone for LazyTaskProducer { + fn clone(&self) -> Self { + Self { + factory: self.factory.clone(), + estimated_stats: self.estimated_stats.clone(), + } + } +} + +impl PartialEq for LazyTaskProducer { + fn eq(&self, other: &Self) -> bool { + // Two LazyTaskProducers are considered equal if their estimated stats match. + // This mirrors how ScanOperatorRef uses pointer equality for "sameness" checking + // in the optimizer, but for lazy producers we can't compare closures, so we use stats. + self.estimated_stats == other.estimated_stats + } +} + +impl Eq for LazyTaskProducer {} + +impl Hash for LazyTaskProducer { + fn hash(&self, state: &mut H) { + self.estimated_stats.hash(state); + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum ScanState { Tasks(Arc>), @@ -32,6 +159,11 @@ pub enum ScanState { deserialize_with = "deserialize_invalid" )] Operator(ScanOperatorRef), + #[serde( + serialize_with = "serialize_lazy_invalid", + deserialize_with = "deserialize_lazy_invalid" + )] + LazyTasks(Arc), } fn serialize_invalid(_: &ScanOperatorRef, _: S) -> Result @@ -52,6 +184,24 @@ where )) } +fn serialize_lazy_invalid(_: &Arc, _: S) -> Result +where + S: serde::Serializer, +{ + Err(serde::ser::Error::custom( + "LazyTaskProducer cannot be serialized", + )) +} + +fn deserialize_lazy_invalid<'de, D>(_: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + Err(serde::de::Error::custom( + "LazyTaskProducer cannot be deserialized", + )) +} + impl ScanState { pub fn multiline_display(&self) -> Vec { match self { @@ -90,13 +240,30 @@ impl ScanState { result } + Self::LazyTasks(producer) => { + let stats = &producer.estimated_stats; + vec![format!( + "~{} Scan Tasks (estimated, lazily produced)", + stats.estimated_num_tasks + )] + } } } pub fn get_scan_op(&self) -> &ScanOperatorRef { match self { Self::Operator(scan_op) => scan_op, - Self::Tasks(_) => panic!("Tried to get scan op from materialized physical scan info"), + Self::Tasks(_) | Self::LazyTasks(_) => { + panic!("Tried to get scan op from materialized physical scan info") + } + } + } + + pub fn file_format_config(&self) -> Option> { + match self { + Self::Operator(_) => None, + Self::Tasks(scan_tasks) => scan_tasks.first().map(|t| t.file_format_config()), + Self::LazyTasks(_) => None, } } } diff --git a/src/common/scan-info/src/scan_operator.rs b/src/common/scan-info/src/scan_operator.rs index 9c4619f24c..a35a3d8185 100644 --- a/src/common/scan-info/src/scan_operator.rs +++ b/src/common/scan-info/src/scan_operator.rs @@ -7,7 +7,9 @@ use std::{ use common_error::DaftResult; use daft_schema::schema::SchemaRef; -use crate::{PartitionField, Pushdowns, ScanTaskLikeRef, SupportsPushdownFilters}; +use crate::{ + LazyTaskProducer, PartitionField, Pushdowns, ScanTaskLikeRef, SupportsPushdownFilters, +}; pub trait ScanOperator: Send + Sync + Debug { fn name(&self) -> &str; @@ -45,6 +47,14 @@ pub trait ScanOperator: Send + Sync + Debug { /// (merging, splitting) to the outputted scan tasks fn to_scan_tasks(&self, pushdowns: Pushdowns) -> DaftResult>; + /// Returns a lazy producer of scan tasks. Default implementation eagerly + /// materializes via `to_scan_tasks` and wraps in a factory. Scan operators + /// that support lazy iteration (e.g. GlobScanOperator) should override. + fn to_lazy_scan_tasks(&self, pushdowns: Pushdowns) -> DaftResult { + let tasks = self.to_scan_tasks(pushdowns)?; + Ok(LazyTaskProducer::from_vec(tasks)) + } + fn as_pushdown_filter(&self) -> Option<&dyn SupportsPushdownFilters> { None } diff --git a/src/daft-distributed/src/pipeline_node/scan_source.rs b/src/daft-distributed/src/pipeline_node/scan_source.rs index 0a966288c1..366d0fdfc8 100644 --- a/src/daft-distributed/src/pipeline_node/scan_source.rs +++ b/src/daft-distributed/src/pipeline_node/scan_source.rs @@ -1,20 +1,19 @@ use std::sync::{Arc, atomic::Ordering}; use common_display::{DisplayAs, DisplayLevel}; -#[cfg(feature = "python")] -use common_file_formats::FileFormatConfig; use common_metrics::{ BYTES_READ_KEY, Counter, DURATION_KEY, ROWS_OUT_KEY, StatSnapshot, UNIT_BYTES, UNIT_MICROSECONDS, UNIT_ROWS, ops::{NodeCategory, NodeInfo, NodeType}, snapshot::SourceSnapshot, }; -use common_scan_info::{Pushdowns, ScanTaskLikeRef}; +use common_scan_info::{LazyTaskProducer, Pushdowns}; use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan}; use daft_logical_plan::{ClusteringSpec, stats::StatsState}; use daft_schema::schema::SchemaRef; use futures::{StreamExt, stream}; use opentelemetry::{KeyValue, metrics::Meter}; +use tokio_stream::wrappers::ReceiverStream; use super::{PipelineNodeConfig, PipelineNodeContext, PipelineNodeImpl, TaskBuilderStream}; use crate::{ @@ -69,17 +68,17 @@ pub(crate) struct ScanSourceNode { config: PipelineNodeConfig, context: PipelineNodeContext, pushdowns: Pushdowns, - scan_tasks: Arc>, + lazy_producer: Arc, } impl ScanSourceNode { - const NODE_NAME: &'static str = "ScanTaskSource"; + const NODE_NAME: &'static str = "ScanSource"; pub fn new( node_id: NodeID, plan_config: &PlanConfig, pushdowns: Pushdowns, - scan_tasks: Arc>, + lazy_producer: Arc, schema: SchemaRef, ) -> Self { let context = PipelineNodeContext::new( @@ -94,14 +93,14 @@ impl ScanSourceNode { schema, plan_config.config.clone(), Arc::new(ClusteringSpec::unknown_with_num_partitions( - scan_tasks.len(), + lazy_producer.estimated_stats.estimated_num_tasks, )), ); Self { config, context, pushdowns, - scan_tasks, + lazy_producer, } } @@ -109,7 +108,10 @@ impl ScanSourceNode { DistributedPipelineNode::new(Arc::new(self)) } - fn make_source_task(self: &Arc, scan_task: ScanTaskLikeRef) -> SwordfishTaskBuilder { + fn make_source_task( + self: &Arc, + scan_task: common_scan_info::ScanTaskLikeRef, + ) -> SwordfishTaskBuilder { let physical_scan = LocalPhysicalPlan::physical_scan( self.node_id(), Some(scan_task.file_format_config()), @@ -140,51 +142,17 @@ impl PipelineNodeImpl for ScanSourceNode { fn multiline_display(&self, verbose: bool) -> Vec { let mut res = vec!["ScanTaskSource:".to_string()]; - let num_scan_tasks = self.scan_tasks.len(); - let total_bytes: usize = self - .scan_tasks - .iter() - .map(|st| { - st.estimate_in_memory_size_bytes(Some(self.config.execution_config.as_ref())) - .or_else(|| st.size_bytes_on_disk()) - .unwrap_or(0) - }) - .sum(); - res.push(format!("Num Scan Tasks = {num_scan_tasks}")); - res.push(format!("Estimated Scan Bytes = {total_bytes}")); - - if let Some(ffc) = self - .scan_tasks - .first() - .map(|s| s.file_format_config()) - .as_deref() - { - match ffc { - #[cfg(feature = "python")] - FileFormatConfig::Database(config) => { - if num_scan_tasks == 1 { - res.push(format!("SQL Query = {}", &config.sql)); - } else { - res.push(format!("SQL Queries = [{},..]", &config.sql)); - } - } - #[cfg(feature = "python")] - FileFormatConfig::PythonFunction { source_name, .. } => { - res.push(format!( - "Source = {}", - source_name.clone().unwrap_or_else(|| "None".to_string()) - )); - } - _ => {} - } - } - - if verbose { - res.push("Scan Tasks: [".to_string()); - for st in self.scan_tasks.iter() { - res.push(st.as_ref().display_as(DisplayLevel::Verbose)); - } - } else { + let stats = &self.lazy_producer.estimated_stats; + res.push(format!( + "Num Scan Tasks = ~{} (estimated)", + stats.estimated_num_tasks + )); + res.push(format!( + "Estimated Scan Bytes = {}", + stats.estimated_total_bytes + )); + + if !verbose { let pushdown = &self.pushdowns; if !pushdown.is_empty() { res.push(pushdown.display_as(DisplayLevel::Compact)); @@ -195,19 +163,8 @@ impl PipelineNodeImpl for ScanSourceNode { "Schema: {{{}}}", schema.display_as(DisplayLevel::Compact) )); - - res.push("Scan Tasks: [".to_string()); - let tasks = self.scan_tasks.iter(); - for (i, st) in tasks.enumerate() { - if i < 3 || i >= self.scan_tasks.len() - 3 { - res.push(st.as_ref().display_as(DisplayLevel::Compact)); - } else if i == 3 { - res.push("...".to_string()); - } - } } - res.push("]".to_string()); res } @@ -219,7 +176,7 @@ impl PipelineNodeImpl for ScanSourceNode { self: Arc, _plan_context: &mut PlanExecutionContext, ) -> TaskBuilderStream { - if self.scan_tasks.is_empty() { + if self.lazy_producer.estimated_stats.estimated_num_tasks == 0 { let transformed_plan = LocalPhysicalPlan::empty_scan( self.config.schema.clone(), LocalNodeContext::new(Some(self.node_id() as usize)), @@ -227,10 +184,37 @@ impl PipelineNodeImpl for ScanSourceNode { let empty_scan_task = SwordfishTaskBuilder::new(transformed_plan, self.as_ref()); TaskBuilderStream::new(stream::iter(std::iter::once(empty_scan_task)).boxed()) } else { - let slf = self.clone(); - let builders_iter = (0..self.scan_tasks.len()) - .map(move |i| slf.make_source_task(slf.scan_tasks[i].clone())); - TaskBuilderStream::new(stream::iter(builders_iter).boxed()) + let slf = self; + let (tx, rx) = tokio::sync::mpsc::channel(64); + + // Spawn a blocking thread to iterate the lazy producer and send tasks + // through the channel. When the pipeline shuts down (e.g. limit satisfied), + // the receiver drops, tx.blocking_send() returns Err, and iteration stops. + std::thread::spawn(move || { + let iter = match slf.lazy_producer.produce() { + Ok(iter) => iter, + Err(e) => { + tracing::error!("Error producing lazy scan tasks: {e}"); + return; + } + }; + for task_result in iter { + match task_result { + Ok(task) => { + let builder = slf.make_source_task(task); + if tx.blocking_send(builder).is_err() { + break; // Pipeline closed + } + } + Err(e) => { + tracing::error!("Error in lazy scan task iteration: {e}"); + break; + } + } + } + }); + + TaskBuilderStream::new(ReceiverStream::new(rx).boxed()) } } } diff --git a/src/daft-distributed/src/pipeline_node/translate.rs b/src/daft-distributed/src/pipeline_node/translate.rs index 538801ec80..62707a9269 100644 --- a/src/daft-distributed/src/pipeline_node/translate.rs +++ b/src/daft-distributed/src/pipeline_node/translate.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use common_error::DaftResult; use common_partitioning::PartitionRef; -use common_scan_info::{SPLIT_AND_MERGE_PASS, ScanState}; +use common_scan_info::{LazyTaskProducer, SPLIT_AND_MERGE_PASS, ScanState}; use common_treenode::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use daft_dsl::{ expr::{ @@ -113,30 +113,37 @@ impl TreeNodeVisitor for LogicalPlanToPipelineNodeTranslator { ) .into_node(), SourceInfo::Physical(info) => { - // We should be able to pass the ScanOperator into the physical plan directly but we need to figure out the serialization story - let scan_tasks = match &info.scan_state { + let lazy_producer = match &info.scan_state { ScanState::Operator(_) => unreachable!( "ScanOperator should not be present in the optimized logical plan for pipeline node translation" ), - ScanState::Tasks(scan_tasks) => scan_tasks.clone(), - }; - // Perform scan task splitting and merging. - let scan_tasks = if self.plan_config.config.enable_scan_task_split_and_merge - && let Some(split_and_merge_pass) = SPLIT_AND_MERGE_PASS.get() - { - split_and_merge_pass( - scan_tasks, - &info.pushdowns, - &self.plan_config.config, - )? - } else { - scan_tasks + ScanState::Tasks(scan_tasks) => { + Arc::new(LazyTaskProducer::from_vec((**scan_tasks).clone())) + } + ScanState::LazyTasks(producer) => producer.clone(), }; + + // Split-and-merge requires eager collection of all tasks. + let lazy_producer = + if self.plan_config.config.enable_scan_task_split_and_merge + && let Some(split_and_merge_pass) = SPLIT_AND_MERGE_PASS.get() + { + let scan_tasks = Arc::new(lazy_producer.collect_tasks()?); + let merged = split_and_merge_pass( + scan_tasks, + &info.pushdowns, + &self.plan_config.config, + )?; + Arc::new(LazyTaskProducer::from_vec((*merged).clone())) + } else { + lazy_producer + }; + ScanSourceNode::new( self.get_next_pipeline_node_id(), &self.plan_config, info.pushdowns.clone(), - scan_tasks, + lazy_producer, source.output_schema.clone(), ) .into_node() diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index 7991b5e098..028b4222e1 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -200,13 +200,51 @@ impl NativeExecutor { let mut receiver = pipeline.start(true, &mut runtime_handle)?; if let Some(message) = enqueue_input_rx.recv().await { + let mut lazy_handles = Vec::new(); for (key, plan_input) in message.inputs { if let Some(sender) = input_senders.get(&key) { - let _ = sender.send(message.input_id, plan_input); + match plan_input { + Input::LazyScanTasks(producer) => { + let sender = sender.clone(); + let input_id = message.input_id; + // Iterate the lazy producer on a blocking thread, + // sending each task individually through the channel. + // When downstream (e.g. LimitSink) is satisfied, the + // channel drops, send() returns Err, and we stop iterating. + let handle = tokio::task::spawn_blocking(move || { + let iter = producer.produce()?; + for task in iter { + let task = task?; + if sender + .send(input_id, Input::ScanTasks(vec![task])) + .is_err() + { + break; + } + } + DaftResult::Ok(()) + }); + lazy_handles.push(handle); + } + other => { + let _ = sender.send(message.input_id, other); + } + } } } + // Drop non-lazy senders so that channels close once lazy feeding finishes. + // The lazy tasks hold cloned senders that keep their channels open. + drop(input_senders); + + // Await all lazy feeding tasks to propagate errors. + for handle in lazy_handles { + if let Ok(Err(e)) = handle.await { + log::error!("Error during lazy scan task production: {e}"); + } + } + } else { + drop(input_senders); } - drop(input_senders); while let Some(val) = receiver.recv().await { if tx.send(val).await.is_err() { diff --git a/src/daft-local-plan/src/lib.rs b/src/daft-local-plan/src/lib.rs index b2fca14336..33a6807770 100644 --- a/src/daft-local-plan/src/lib.rs +++ b/src/daft-local-plan/src/lib.rs @@ -5,7 +5,9 @@ mod plan; pub mod python; mod results; mod translate; -use common_scan_info::ScanTaskLikeRef; +use std::sync::Arc; + +use common_scan_info::{LazyTaskProducer, ScanTaskLikeRef}; use daft_micropartition::MicroPartitionRef; #[cfg(feature = "python")] pub use plan::{CatalogWrite, DataSink, DistributedActorPoolProject, LanceWrite}; @@ -45,4 +47,6 @@ pub enum Input { GlobPaths(Vec), #[serde(skip)] InMemory(Vec), + #[serde(skip)] + LazyScanTasks(Arc), } diff --git a/src/daft-local-plan/src/translate.rs b/src/daft-local-plan/src/translate.rs index bb41161af1..4fd8aecb8c 100644 --- a/src/daft-local-plan/src/translate.rs +++ b/src/daft-local-plan/src/translate.rs @@ -49,22 +49,19 @@ fn translate_helper( ) } SourceInfo::Physical(info) => { - let scan_tasks = match &info.scan_state { + let input = match &info.scan_state { ScanState::Operator(scan_op) => { - scan_op.0.to_scan_tasks(info.pushdowns.clone())? + Input::ScanTasks(scan_op.0.to_scan_tasks(info.pushdowns.clone())?) } - ScanState::Tasks(scan_tasks) => (**scan_tasks).clone(), + ScanState::Tasks(scan_tasks) => Input::ScanTasks((**scan_tasks).clone()), + ScanState::LazyTasks(producer) => Input::LazyScanTasks(producer.clone()), }; - - let format_config = scan_tasks - .first() - .map(|scan_task| scan_task.file_format_config()); let source_id = source_counter.next(); - inputs.insert(source_id, Input::ScanTasks(scan_tasks)); + inputs.insert(source_id, input); LocalPhysicalPlan::physical_scan( source_id, - format_config, + info.scan_state.file_format_config(), info.pushdowns.clone(), source.output_schema.clone(), source.stats_state.clone(), diff --git a/src/daft-logical-plan/src/ops/source.rs b/src/daft-logical-plan/src/ops/source.rs index d7efc85362..4543f5f9fd 100644 --- a/src/daft-logical-plan/src/ops/source.rs +++ b/src/daft-logical-plan/src/ops/source.rs @@ -45,22 +45,22 @@ impl Source { self } - // Helper method that converts the ScanOperatorRef inside a Source node's PhysicalScanInfo into scan tasks. + // Helper method that converts the ScanOperatorRef inside a Source node's PhysicalScanInfo into lazy scan tasks. // Should only be called if a Source node's source info contains PhysicalScanInfo. The PhysicalScanInfo - // should also hold a ScanState::Operator and not a ScanState::Tasks (which would indicate that we're + // should also hold a ScanState::Operator and not a ScanState::Tasks/LazyTasks (which would indicate that we're // materializing this physical scan node multiple times). pub(crate) fn build_materialized_scan_source(mut self) -> DaftResult { let new_physical_scan_info = match Arc::unwrap_or_clone(self.source_info) { SourceInfo::Physical(mut physical_scan_info) => { - let scan_tasks = match &physical_scan_info.scan_state { + let lazy_producer = match &physical_scan_info.scan_state { ScanState::Operator(scan_op) => scan_op .0 - .to_scan_tasks(physical_scan_info.pushdowns.clone())?, - ScanState::Tasks(_) => { + .to_lazy_scan_tasks(physical_scan_info.pushdowns.clone())?, + ScanState::Tasks(_) | ScanState::LazyTasks(_) => { panic!("Physical scan nodes are being materialized more than once"); } }; - physical_scan_info.scan_state = ScanState::Tasks(Arc::new(scan_tasks)); + physical_scan_info.scan_state = ScanState::LazyTasks(Arc::new(lazy_producer)); physical_scan_info } _ => panic!("Only unmaterialized physical scan nodes can be materialized"), @@ -100,6 +100,18 @@ impl Source { .estimated_selectivity(self.output_schema.as_ref()); approx_stats } + ScanState::LazyTasks(producer) => { + let stats = &producer.estimated_stats; + let mut approx_stats = ApproxStats { + num_rows: stats.estimated_total_rows, + size_bytes: stats.estimated_total_bytes, + acc_selectivity: 1.0, + }; + approx_stats.acc_selectivity = physical_scan_info + .pushdowns + .estimated_selectivity(self.output_schema.as_ref()); + approx_stats + } }, SourceInfo::GlobScan(_) => ApproxStats::empty(), SourceInfo::PlaceHolder(_) => ApproxStats::empty(), diff --git a/src/daft-logical-plan/src/optimization/rules/push_down_filter.rs b/src/daft-logical-plan/src/optimization/rules/push_down_filter.rs index 96adcedd2e..0eb48e18ff 100644 --- a/src/daft-logical-plan/src/optimization/rules/push_down_filter.rs +++ b/src/daft-logical-plan/src/optimization/rules/push_down_filter.rs @@ -94,7 +94,10 @@ impl PushDownFilter { // Pushdown filter into the Source node SourceInfo::Physical(external_info) => { // If the scan is materialized, we don't pushdown the filter. - if matches!(external_info.scan_state, ScanState::Tasks(_)) { + if matches!( + external_info.scan_state, + ScanState::Tasks(_) | ScanState::LazyTasks(_) + ) { return Ok(Transformed::no(plan)); } let predicate = &filter.predicate; diff --git a/src/daft-logical-plan/src/optimization/rules/shard_scans.rs b/src/daft-logical-plan/src/optimization/rules/shard_scans.rs index c7d8308c1a..3525891fc6 100644 --- a/src/daft-logical-plan/src/optimization/rules/shard_scans.rs +++ b/src/daft-logical-plan/src/optimization/rules/shard_scans.rs @@ -43,6 +43,24 @@ impl ShardScans { } None => Ok(Transformed::no(plan)), }, + ScanState::LazyTasks(producer) => { + match &physical_scan_info.pushdowns.sharder { + Some(sharder) => { + // Sharding requires sorting, so we need to collect all tasks. + let scan_tasks = Arc::new(producer.collect_tasks()?); + let new_scan_tasks = sharder.shard_scan_tasks(&scan_tasks)?; + let new_scan_state = ScanState::Tasks(Arc::new(new_scan_tasks)); + let new_physical_scan_info = + physical_scan_info.with_scan_state(new_scan_state); + let new_source = Source::new( + source.output_schema.clone(), + SourceInfo::Physical(new_physical_scan_info).into(), + ); + Ok(Transformed::yes(LogicalPlan::Source(new_source).into())) + } + None => Ok(Transformed::no(plan)), + } + } ScanState::Operator(_) => Ok(Transformed::no(plan)), }, _ => Ok(Transformed::no(plan)), diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 858ade02f3..f7d1075595 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -3,7 +3,10 @@ use std::{sync::Arc, vec}; use common_error::{DaftError, DaftResult}; use common_file_formats::{CsvSourceConfig, FileFormat, FileFormatConfig, ParquetSourceConfig}; use common_runtime::RuntimeRef; -use common_scan_info::{PartitionField, Pushdowns, ScanOperator, ScanTaskLike, ScanTaskLikeRef}; +use common_scan_info::{ + LazyTaskProducer, LazyTaskStats, PartitionField, Pushdowns, ScanOperator, ScanTaskLike, + ScanTaskLikeRef, +}; use daft_core::{prelude::Utf8Array, series::IntoSeries}; use daft_csv::CsvParseOptions; use daft_dsl::expr::bound_expr::BoundExpr; @@ -131,8 +134,8 @@ fn run_glob_parallel( let stream = io_client .glob(glob_input, None, None, None, io_stats, Some(file_format)) .await?; - let results = stream.map_err(|e| e.into()).collect::>().await; - DaftResult::Ok(futures::stream::iter(results)) + // Yield FileMetadata lazily as S3 pages arrive instead of collecting all at once. + DaftResult::Ok(stream.map_err(|e| e.into())) }) })) .buffered(num_parallel_tasks) @@ -484,6 +487,177 @@ impl ScanOperator for GlobScanOperator { lines } + fn to_lazy_scan_tasks(&self, pushdowns: Pushdowns) -> DaftResult { + // Capture all fields needed to produce scan tasks lazily. + let glob_paths = self.glob_paths.clone(); + let file_format_config = self.file_format_config.clone(); + let schema = self.schema.clone(); + let storage_config = self.storage_config.clone(); + let file_path_column = self.file_path_column.clone(); + let hive_partitioning = self.hive_partitioning; + let partitioning_keys = self.partitioning_keys.clone(); + let skip_glob = self.skip_glob; + let first_metadata = self.first_metadata.clone(); + + let row_groups = if let FileFormatConfig::Parquet(ParquetSourceConfig { + row_groups: Some(row_groups), + .. + }) = self.file_format_config.as_ref() + { + Some(row_groups.clone()) + } else { + None + }; + + // Use glob_paths.len() as a cheap estimate for file count. + // This avoids a redundant listing pass — the actual glob runs lazily during task production. + let file_count_estimate = glob_paths.len(); + + let estimated_stats = if let Some((_path, meta)) = &self.first_metadata { + LazyTaskStats { + estimated_num_tasks: file_count_estimate, + estimated_total_bytes: 0, + estimated_total_rows: meta.length * file_count_estimate, + } + } else { + LazyTaskStats { + estimated_num_tasks: file_count_estimate, + estimated_total_bytes: 0, + estimated_total_rows: 0, + } + }; + + let factory = Arc::new(move || { + let (io_runtime, io_client) = storage_config.get_io_client_and_runtime()?; + let io_stats = IOStatsContext::new(format!( + "GlobScanOperator::to_lazy_scan_tasks for {:#?}", + glob_paths + )); + let file_format = file_format_config.file_format(); + + let files: Box> + Send> = if skip_glob { + Box::new(glob_paths.clone().into_iter().map(|path| { + Ok(FileMetadata { + filepath: path, + size: None, + filetype: FileType::File, + }) + })) + } else { + Box::new(run_glob_parallel( + glob_paths.clone(), + io_client, + io_runtime, + Some(io_stats), + file_format, + )?) + }; + + let file_format_config = file_format_config.clone(); + let schema = schema.clone(); + let storage_config = storage_config.clone(); + let pushdowns = pushdowns.clone(); + let file_path_column = file_path_column.clone(); + let partitioning_keys = partitioning_keys.clone(); + let row_groups = row_groups.clone(); + let first_metadata = first_metadata.clone(); + + let partition_fields = partitioning_keys + .iter() + .map(|partition_spec| partition_spec.clone_field()); + let partition_schema = Schema::new(partition_fields); + + let (first_filepath, first_meta) = + if let Some((first_filepath, first_meta)) = &first_metadata { + (Some(first_filepath.clone()), Some(first_meta.clone())) + } else { + (None, None) + }; + + let iter = files.enumerate().filter_map(move |(idx, f)| { + let scan_task_result = (|| { + let FileMetadata { + filepath: path, + size: size_bytes, + .. + } = f?; + let mut partition_values = if hive_partitioning { + let hive_partitions = parse_hive_partitioning(&path)?; + hive_partitions_to_series(&hive_partitions, &partition_schema)? + } else { + vec![] + }; + if let Some(fp_col) = &file_path_column { + let trimmed = path.trim_start_matches("file://"); + let file_paths_column_series = + Utf8Array::from_iter(fp_col, std::iter::once(Some(trimmed))) + .into_series(); + partition_values.push(file_paths_column_series); + } + let (partition_spec, generated_fields) = if !partition_values.is_empty() { + let partition_values_table = + RecordBatch::from_nonempty_columns(partition_values)?; + if let Some(partition_filters) = &pushdowns.partition_filters { + let partition_filters = BoundExpr::try_new( + partition_filters.clone(), + &partition_values_table.schema, + )?; + let filter_result = + partition_values_table.filter(&[partition_filters])?; + if filter_result.is_empty() { + return Ok(None); + } + } + let generated_fields = partition_values_table.schema.clone(); + let partition_spec = PartitionSpec { + keys: partition_values_table, + }; + (Some(partition_spec), Some(generated_fields)) + } else { + (None, None) + }; + let row_group = row_groups + .as_ref() + .and_then(|rgs| rgs.get(idx).cloned()) + .flatten(); + let chunk_spec = row_group.map(ChunkSpec::Parquet); + Ok(Some(ScanTask::new( + vec![DataSource::File { + metadata: if let Some(ref first_filepath) = first_filepath + && path == *first_filepath + { + first_meta.clone() + } else { + None + }, + path, + chunk_spec, + size_bytes, + iceberg_delete_files: None, + partition_spec, + statistics: None, + parquet_metadata: None, + }], + file_format_config.clone(), + schema.clone(), + storage_config.clone(), + pushdowns.clone(), + generated_fields, + ))) + })(); + match scan_task_result { + Ok(Some(scan_task)) => Some(Ok(Arc::new(scan_task) as Arc)), + Ok(None) => None, + Err(e) => Some(Err(e)), + } + }); + + Ok(Box::new(iter) as common_scan_info::ScanTaskIterator) + }); + + Ok(LazyTaskProducer::new(factory, estimated_stats)) + } + fn to_scan_tasks(&self, pushdowns: Pushdowns) -> DaftResult> { let (io_runtime, io_client) = self.storage_config.get_io_client_and_runtime()?; let io_stats = IOStatsContext::new(format!(