Skip to content

Commit 192079e

Browse files
authored
Improve documentation on start_demuxer_task and file_extension (#13216)
1 parent 545976f commit 192079e

File tree

2 files changed

+39
-12
lines changed

2 files changed

+39
-12
lines changed

datafusion/core/src/datasource/file_format/write/demux.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,33 @@ type RecordBatchReceiver = Receiver<RecordBatch>;
5252
type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
5353

5454
/// Splits a single [SendableRecordBatchStream] into a dynamically determined
55-
/// number of partitions at execution time. The partitions are determined by
56-
/// factors known only at execution time, such as total number of rows and
57-
/// partition column values. The demuxer task communicates to the caller
58-
/// by sending channels over a channel. The inner channels send RecordBatches
59-
/// which should be contained within the same output file. The outer channel
60-
/// is used to send a dynamic number of inner channels, representing a dynamic
61-
/// number of total output files. The caller is also responsible to monitor
62-
/// the demux task for errors and abort accordingly. A path with an extension will
63-
/// force only a single file to be written with the extension from the path. Otherwise
64-
/// the default extension will be used and the output will be split into multiple files.
65-
/// partition_by parameter will additionally split the input based on the unique
66-
/// values of a specific column `<https://github.com/apache/datafusion/issues/7744>``
55+
/// number of partitions at execution time.
56+
///
57+
/// The partitions are determined by factors known only at execution time, such
58+
/// as total number of rows and partition column values. The demuxer task
59+
/// communicates to the caller by sending channels over a channel. The inner
60+
/// channels send RecordBatches which should be contained within the same output
61+
/// file. The outer channel is used to send a dynamic number of inner channels,
62+
/// representing a dynamic number of total output files.
63+
///
64+
/// The caller is also responsible to monitor the demux task for errors and
65+
/// abort accordingly.
66+
///
67+
/// A path with an extension will force only a single file to
68+
/// be written with the extension from the path. Otherwise the default extension
69+
/// will be used and the output will be split into multiple files.
70+
///
71+
/// Examples of `base_output_path`
72+
/// * `tmp/dataset/` -> is a folder since it ends in `/`
73+
/// * `tmp/dataset` -> is still a folder since it does not end in `/` but has no valid file extension
74+
/// * `tmp/file.parquet` -> is a file since it does not end in `/` and has a valid file extension `.parquet`
75+
/// * `tmp/file.parquet/` -> is a folder since it ends in `/`
76+
///
77+
/// The `partition_by` parameter will additionally split the input based on the
78+
/// unique values of a specific column, see
79+
/// <https://github.com/apache/datafusion/issues/7744>
80+
///
81+
/// ```text
6782
/// ┌───────────┐ ┌────────────┐ ┌─────────────┐
6883
/// ┌──────▶ │ batch 1 ├────▶...──────▶│ Batch a │ │ Output File1│
6984
/// │ └───────────┘ └────────────┘ └─────────────┘
@@ -75,6 +90,7 @@ type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;
7590
/// └──────────┘ │ ┌───────────┐ ┌────────────┐ ┌─────────────┐
7691
/// └──────▶ │ batch d ├────▶...──────▶│ Batch n │ │ Output FileN│
7792
/// └───────────┘ └────────────┘ └─────────────┘
93+
/// ```
7894
pub(crate) fn start_demuxer_task(
7995
input: SendableRecordBatchStream,
8096
context: &Arc<TaskContext>,

datafusion/core/src/datasource/listing/url.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,17 @@ impl ListingTableUrl {
191191
}
192192

193193
/// Returns the file extension of the last path segment if it exists
194+
///
195+
/// Examples:
196+
/// ```rust
197+
/// use datafusion::datasource::listing::ListingTableUrl;
198+
/// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap();
199+
/// assert_eq!(url.file_extension(), Some("csv"));
200+
/// let url = ListingTableUrl::parse("file:///foo/bar").unwrap();
201+
/// assert_eq!(url.file_extension(), None);
202+
/// let url = ListingTableUrl::parse("file:///foo/bar.").unwrap();
203+
/// assert_eq!(url.file_extension(), None);
204+
/// ```
194205
pub fn file_extension(&self) -> Option<&str> {
195206
if let Some(segments) = self.url.path_segments() {
196207
if let Some(last_segment) = segments.last() {

0 commit comments

Comments
 (0)