Skip to content

Commit c4b48d7

Browse files
alambandygrovecomphead
authored
Minor: Improve documentation on execution error handling (#12651)
* Minor: document execution handling on shutdown better * Improve RecordBatchStream docs * Update datafusion/execution/src/stream.rs Co-authored-by: Andy Grove <[email protected]> * Update datafusion/execution/src/stream.rs Co-authored-by: Oleks V <[email protected]> --------- Co-authored-by: Andy Grove <[email protected]> Co-authored-by: Oleks V <[email protected]>
1 parent 810e908 commit c4b48d7

File tree

5 files changed

+43
-2
lines changed

5 files changed

+43
-2
lines changed

datafusion/execution/src/stream.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use datafusion_common::Result;
2020
use futures::Stream;
2121
use std::pin::Pin;
2222

23-
/// Trait for types that stream [arrow::record_batch::RecordBatch]
23+
/// Trait for types that stream [RecordBatch]
24+
///
25+
/// See [`SendableRecordBatchStream`] for more details.
2426
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
2527
/// Returns the schema of this `RecordBatchStream`.
2628
///
@@ -29,5 +31,23 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
2931
fn schema(&self) -> SchemaRef;
3032
}
3133

32-
/// Trait for a [`Stream`] of [`RecordBatch`]es
34+
/// Trait for a [`Stream`] of [`RecordBatch`]es that can be passed between threads
35+
///
36+
/// This trait is used to retrieve the results of DataFusion execution plan nodes.
37+
///
38+
/// The trait is a specialized Rust Async [`Stream`] that also knows the schema
39+
/// of the data it will return (even if the stream has no data). Every
40+
/// `RecordBatch` returned by the stream should have the same schema as returned
41+
/// by [`schema`](`RecordBatchStream::schema`).
42+
///
43+
/// # Error Handling
44+
///
45+
/// Once a stream returns an error, it should not be polled again (the caller
46+
/// should stop calling `next`) and handle the error.
47+
///
48+
/// However, returning `Ready(None)` (end of stream) is likely the safest
49+
/// behavior after an error. Like [`Stream`]s, `RecordBatchStream`s should not
50+
/// be polled after end of stream or returning an error. However, also like
51+
/// [`Stream`]s there is no mechanism to prevent callers polling so returning
52+
/// `Ready(None)` is recommended.
3353
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
228228
/// [`TryStreamExt`]: futures::stream::TryStreamExt
229229
/// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
230230
///
231+
/// # Error handling
232+
///
233+
/// Any error that occurs during execution is sent as an `Err` in the output
234+
/// stream.
235+
///
236+
/// `ExecutionPlan` implementations in DataFusion cancel additional work
237+
/// immediately once an error occurs. The rationale is that if the overall
238+
/// query will return an error, any additional work such as continued
239+
/// polling of inputs will be wasted as it will be thrown away.
240+
///
231241
/// # Cancellation / Aborting Execution
232242
///
233243
/// The [`Stream`] that is returned must ensure that any allocated resources

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,11 @@ impl BatchPartitioner {
377377
/// `───────' `───────'
378378
///```
379379
///
380+
/// # Error Handling
381+
///
382+
/// If any of the input partitions return an error, the error is propagated to
383+
/// all output partitions and inputs are not polled again.
384+
///
380385
/// # Output Ordering
381386
///
382387
/// If more than one stream is being repartitioned, the output will be some

datafusion/physical-plan/src/sorts/merge.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use futures::Stream;
3939
/// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`]
4040
type CursorStream<C> = Box<dyn PartitionedStream<Output = Result<(C, RecordBatch)>>>;
4141

42+
/// Merges a stream of sorted cursors and record batches into a single sorted stream
4243
#[derive(Debug)]
4344
pub(crate) struct SortPreservingMergeStream<C: CursorValues> {
4445
in_progress: BatchBuilder,

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ use log::{debug, trace};
6565
/// Input Streams Output stream
6666
/// (sorted) (sorted)
6767
/// ```
68+
///
69+
/// # Error Handling
70+
///
71+
/// If any of the input partitions return an error, the error is propagated to
72+
/// the output and inputs are not polled again.
6873
#[derive(Debug)]
6974
pub struct SortPreservingMergeExec {
7075
/// Input plan

0 commit comments

Comments
 (0)