Skip to content

Commit 04e47c0

Browse files
committed
Sketch out a Morselize API
1 parent 15bc6bd commit 04e47c0

File tree

3 files changed

+77
-3
lines changed

3 files changed

+77
-3
lines changed

datafusion/datasource/src/file_stream.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use datafusion_common::instant::Instant;
4242
use futures::future::BoxFuture;
4343
use futures::stream::BoxStream;
4444
use futures::{FutureExt as _, Stream, StreamExt as _, ready};
45+
use crate::morsel::Morselizer;
4546

4647
/// A stream that iterates record batch by record batch, file over file.
4748
pub struct FileStream {
@@ -50,8 +51,10 @@ pub struct FileStream {
5051
/// The stream schema (file schema including partition columns and after
5152
/// projection).
5253
projected_schema: SchemaRef,
53-
/// The remaining number of records to parse, None if no limit
54+
/// The remaining number of records to parse until limit is reached, None if no limit
5455
remain: Option<usize>,
56+
/// A type specific [`Morselizer`] that examines the input files and produces a stream of `Morsels`
57+
morselizer: Option<Box<dyn Morselizer>>,
5558
/// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`],
5659
/// which can be resolved to a stream of `RecordBatch`.
5760
file_opener: Arc<dyn FileOpener>,
@@ -81,6 +84,7 @@ impl FileStream {
8184
file_iter: file_group.into_inner().into_iter().collect(),
8285
projected_schema,
8386
remain: config.limit,
87+
morselizer: None,
8488
file_opener,
8589
state: FileStreamState::Idle,
8690
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
@@ -292,6 +296,9 @@ impl RecordBatchStream for FileStream {
292296
}
293297

294298
/// A fallible future that resolves to a stream of [`RecordBatch`]
299+
///
300+
/// This is typically an `async` function that opens the file, and returns a
301+
/// stream that reads the file and produces `RecordBatch`es.
295302
pub type FileOpenFuture =
296303
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
297304

@@ -308,10 +315,15 @@ pub enum OnError {
308315
/// Generic API for opening a file using an [`ObjectStore`] and resolving to a
309316
/// stream of [`RecordBatch`]
310317
///
318+
/// TODO: rename this to MorselOpener
319+
/// Also, add documentation here describing IO expectations
320+
///
311321
/// [`ObjectStore`]: object_store::ObjectStore
312322
pub trait FileOpener: Unpin + Send + Sync {
313323
/// Asynchronously open the specified file and return a stream
314324
/// of [`RecordBatch`]
325+
///
326+
/// TODO: describe prefetching behavior here, and expectations around IO
315327
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture>;
316328
}
317329

@@ -330,7 +342,7 @@ pub enum FileStreamState {
330342
/// for a given file
331343
Open {
332344
/// A [`FileOpenFuture`] returned by [`FileOpener::open`]
333-
future: FileOpenFuture,
345+
future: FileOpenFuture,
334346
},
335347
/// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`]
336348
/// returned by [`FileOpener::open`]

datafusion/datasource/src/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub mod sink;
4444
pub mod source;
4545
mod statistics;
4646
pub mod table_schema;
47+
mod morsel;
4748

4849
#[cfg(test)]
4950
pub mod test_util;
@@ -70,7 +71,10 @@ use std::ops::Range;
7071
use std::pin::Pin;
7172
use std::sync::Arc;
7273

74+
pub use morsel::Morsel;
75+
7376
/// Stream of files get listed from object store
77+
#[deprecated(since="54.0.0", note = "This type is unused and will be removed in a future release")]
7478
pub type PartitionedFileStream =
7579
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
7680

@@ -122,7 +126,8 @@ pub struct PartitionedFile {
122126
/// [`wrap_partition_value_in_dict`]: crate::file_scan_config::wrap_partition_value_in_dict
123127
/// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L87
124128
pub partition_values: Vec<ScalarValue>,
125-
/// An optional file range for a more fine-grained parallel execution
129+
/// An optional file range for this file. This is used to statically
130+
/// schedule non-overlapping sections of a file to be read in parallel.
126131
pub range: Option<FileRange>,
127132
/// Optional statistics that describe the data in this file if known.
128133
///
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Structures for Morsel Driven IO
19+
//!
20+
//! Morsel Driven IO is a technique for parallelizing the reading of large files
21+
//! by dividing them into smaller "morsels" that can be processed independently.
22+
//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query
23+
//! Evaluation Framework for the Many-Core Age](https://db.in.tum.de/~leis/papers/morsels.pdf)
24+
25+
use std::any::Any;
26+
use std::sync::Arc;
27+
use futures::future::BoxFuture;
28+
use datafusion_common::error::Result;
29+
use futures::stream::BoxStream;
30+
use crate::PartitionedFile;
31+
32+
/// A Morsel represents a portion of a file that can be processed independently.
33+
#[derive(Debug)]
34+
pub struct Morsel {
35+
/// The original [`PartitionedFile`] that this morsel belongs to
36+
file: Arc<PartitionedFile>,
37+
/// File format specific information that describes the morsel, such as byte range, row group, etc.
38+
info: Box<dyn Any>,
39+
}
40+
41+
/// A fallible future that resolves to a stream of [`Morsel`]
42+
///
43+
/// This is typically an `async` function that opens the file, and returns a
44+
/// stream of Morsels that can be processed independently. The stream may yield
45+
/// an error if the file cannot be opened or read.
46+
pub type MorselOpenFuture = BoxFuture<'static, Result<BoxStream<'static, Morsel>>>;
47+
48+
pub trait Morselizer: Send + Sync {
49+
/// Given a [`PartitionedFile`], return a Vec of [`Morsel`]s, potentially doing IO
50+
///
51+
/// Notes
52+
///
53+
/// 1. The API is `async` for file formats that require I/O operations to
54+
/// determine the morsels, such as reading metadata from the file.
55+
/// 2.
56+
fn morselize(&self, file: Arc<PartitionedFile>) -> MorselOpenFuture;
57+
}

0 commit comments

Comments
 (0)