Skip to content

feat: Add support for reading whole text files to read_text#6354

Open
plotor wants to merge 1 commit intoEventual-Inc:mainfrom
plotor:zhenchao-read-text
Open

feat: Add support for reading whole text files to read_text#6354
plotor wants to merge 1 commit intoEventual-Inc:mainfrom
plotor:zhenchao-read-text

Conversation

@plotor
Copy link
Collaborator

@plotor plotor commented Mar 6, 2026

Changes Made

Add a whole_text option to the read_text API to support reading whole text contents as a single line. Consider scenarios such as inference scenarios where the content of a text might be a complete prompt, in which case it shouldn't be read line by line.

Related Issues

@github-actions github-actions bot added the feat label Mar 6, 2026
@plotor plotor force-pushed the zhenchao-read-text branch from 3abc79b to ecaddb9 Compare March 6, 2026 08:54
@plotor plotor marked this pull request as ready for review March 6, 2026 08:57
@plotor plotor requested a review from a team as a code owner March 6, 2026 08:57
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 6, 2026

Greptile Summary

This PR adds a whole_text option to read_text so that each file is read as a single row instead of one row per line — a useful pattern for prompt-engineering and document-ingestion workloads. The change is cleanly propagated from the Python API through TextSourceConfig, TextConvertOptions, and the Rust execution layer, with a new read_into_whole_text_stream helper in src/daft-text/src/read.rs that handles reading, decompression, blank-file skipping, and limit push-down.

Key observations:

  • The feature is correctly wired end-to-end and the public API additions (whole_text param, updated docstrings, type stubs) are clear and well-documented.
  • The limit == Some(0) early-exit guard correctly prevents reading files when the overall query limit has already been satisfied by prior scan tasks.
  • read_into_whole_text_stream duplicates the file-open / compression-wrapping boilerplate from read_into_line_chunk_stream; extracting a shared open_reader helper would reduce maintenance burden.
  • The new helper always uses an 8 MiB default buffer regardless of source type, while the line-reading path uses 256 KiB for local files; this inconsistency could cause unnecessary memory allocation for small local files.
  • Test coverage is comprehensive (single/multi-file, path column, empty file with both skip_blank_lines settings, glob patterns, gzip), but there is no test exercising the limit == Some(0) guard in whole_text mode.

Confidence Score: 4/5

  • Safe to merge; the feature is correctly implemented and well-tested, with only minor style and consistency issues.
  • The core logic is sound and all edge cases (empty files, compressed files, blank-file skipping, limit push-down) are handled. The deductions are for duplicated boilerplate in read_into_whole_text_stream, inconsistent buffer-size defaults, and the absence of a limit-pushdown test for the new code path — none of which are blocking correctness issues.
  • src/daft-text/src/read.rs — duplicated file-open/compression setup and inconsistent buffer size defaults.

Important Files Changed

Filename Overview
src/daft-text/src/read.rs Adds read_into_whole_text_stream for whole-file reading; duplicates file-open/compression setup from read_into_line_chunk_stream and uses an inconsistent (larger) default buffer size for local files.
src/daft-text/src/options.rs Cleanly adds whole_text: bool field to TextConvertOptions and updates the constructor and Default impl accordingly.
src/common/file-formats/src/file_format_config.rs Adds whole_text field to TextSourceConfig, updates the pyo3 constructor, multiline_display, and provides a Default impl — all straightforward and correct.
src/daft-local-execution/src/sources/scan_task.rs Correctly threads cfg.whole_text through to TextConvertOptions::new; no issues.
daft/io/_text.py Adds whole_text parameter to read_text, updates docstring with clear explanation including a note that _chunk_size has no effect in whole_text mode.
daft/daft/init.pyi Type stub updated to reflect the new whole_text field and constructor parameter for TextSourceConfig.
tests/io/test_text.py Good coverage of the new feature (single/multi-file, path column, empty file, glob, gzip); missing a test for whole_text=True with a pushdown limit to exercise the limit == Some(0) guard.

Sequence Diagram

sequenceDiagram
    participant User
    participant read_text (Python)
    participant TextSourceConfig
    participant stream_scan_task (Rust)
    participant stream_text (Rust)
    participant read_into_whole_text_stream
    participant read_into_line_chunk_stream

    User->>read_text (Python): read_text(path, whole_text=True/False)
    read_text (Python)->>TextSourceConfig: TextSourceConfig(encoding, skip_blank_lines, whole_text, ...)
    TextSourceConfig-->>read_text (Python): config
    read_text (Python)-->>User: DataFrame (lazy)

    User->>stream_scan_task (Rust): collect / execute
    stream_scan_task (Rust)->>stream_text (Rust): TextConvertOptions{whole_text, limit, ...}

    alt whole_text = true
        stream_text (Rust)->>read_into_whole_text_stream: uri, convert_options, ...
        read_into_whole_text_stream->>read_into_whole_text_stream: check limit == Some(0) → early return
        read_into_whole_text_stream->>read_into_whole_text_stream: read_to_string()
        read_into_whole_text_stream->>read_into_whole_text_stream: skip if blank & skip_blank_lines
        read_into_whole_text_stream-->>stream_text (Rust): Stream<String> (0 or 1 item per file)
    else whole_text = false
        stream_text (Rust)->>read_into_line_chunk_stream: uri, convert_options, ...
        read_into_line_chunk_stream->>read_into_line_chunk_stream: iterate lines, chunk, respect limit
        read_into_line_chunk_stream-->>stream_text (Rust): Stream<Vec<String>>
    end

    stream_text (Rust)-->>stream_scan_task (Rust): BoxStream<RecordBatch>
    stream_scan_task (Rust)-->>User: RecordBatches
Loading

Comments Outside Diff (1)

  1. tests/io/test_text.py, line 158-251 (link)

    Missing test for limit pushdown in whole_text mode

    The PR explicitly guards against limit == Some(0) in read_into_whole_text_stream (to avoid reading and yielding a row for a file that should be skipped entirely when the overall query limit is already satisfied). However none of the new tests exercise this code path in whole_text mode.

    Consider adding a test analogous to the existing test_read_with_limit_pushdown:

    def test_read_whole_text_with_limit_pushdown(tmp_path):
        for i in range(10):
            (tmp_path / f"file_{i}.txt").write_text(f"content {i}", encoding="utf-8")
    
        df = daft.read_text(str(tmp_path / "*.txt"), whole_text=True).limit(3)
        result = df.to_pydict()
        assert len(result["text"]) == 3

    This would catch any regression in the limit == Some(0) early-exit guard.

Last reviewed commit: 977676c

Comment on lines +108 to +118
Ok(try_stream! {
let mut content = String::new();
reader.read_to_string(&mut content).await?;

// Apply skip_blank_lines if needed (for whole file, this means skip if entire content is blank)
if convert_options.skip_blank_lines && content.trim().is_empty() {
return;
}

yield content;
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limit pushdown not respected in whole_text mode

convert_options.limit is completely ignored inside read_into_whole_text_stream. In the existing line-oriented path (read_into_line_chunk_stream), the limit is enforced via a remaining counter that short-circuits the loop when it reaches zero. Here, if a scan task is given limit = Some(0) (i.e., the overall query limit is already satisfied by prior scan tasks), this function will still read the entire file and yield one row — producing incorrect results.

A minimal guard at the top of the try_stream! block would address this:

Suggested change
Ok(try_stream! {
let mut content = String::new();
reader.read_to_string(&mut content).await?;
// Apply skip_blank_lines if needed (for whole file, this means skip if entire content is blank)
if convert_options.skip_blank_lines && content.trim().is_empty() {
return;
}
yield content;
})
Ok(try_stream! {
// Respect limit pushdown: in whole-text mode each file is exactly one row.
if convert_options.limit == Some(0) {
return;
}
let mut content = String::new();
reader.read_to_string(&mut content).await?;
// Apply skip_blank_lines if needed (for whole file, this means skip if entire content is blank)
if convert_options.skip_blank_lines && content.trim().is_empty() {
return;
}
yield content;
})

@plotor plotor force-pushed the zhenchao-read-text branch from ecaddb9 to 977676c Compare March 6, 2026 09:52
@plotor
Copy link
Collaborator Author

plotor commented Mar 6, 2026

This is a supplementary implementation for #6111, adding a whole_text parameter to control whether load the entire text as a single line. Please giving a review when you have time. TKS @desmondcheongzx

@plotor
Copy link
Collaborator Author

plotor commented Mar 12, 2026

@greptile update summary

Comment on lines +76 to +124
async fn read_into_whole_text_stream(
uri: String,
convert_options: TextConvertOptions,
read_options: TextReadOptions,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<impl Stream<Item = DaftResult<String>> + Send> {
let buffer_size = read_options.buffer_size.unwrap_or(8 * 1024 * 1024);

let reader: Box<dyn AsyncBufRead + Unpin + Send> = match io_client
.single_url_get(uri.clone(), None, io_stats)
.await?
{
GetResult::File(file) => Box::new(BufReader::with_capacity(
buffer_size,
File::open(file.path).await?,
)),
GetResult::Stream(stream, ..) => Box::new(BufReader::with_capacity(
buffer_size,
StreamReader::new(stream),
)),
};

// If file is compressed, wrap stream in decoding stream.
let mut reader: Box<dyn AsyncBufRead + Unpin + Send> = match CompressionCodec::from_uri(&uri) {
Some(compression) => Box::new(BufReader::with_capacity(
buffer_size,
compression.to_decoder(reader),
)),
None => reader,
};

Ok(try_stream! {
// Check limit first - if limit is 0, don't read the file at all
if convert_options.limit == Some(0) {
return;
}

let mut content = String::new();
reader.read_to_string(&mut content).await?;

// Apply skip_blank_lines if needed (for whole file, this means skip if entire content is blank)
if convert_options.skip_blank_lines && content.trim().is_empty() {
return;
}

yield content;
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two separate functions instead of a single parametrized helper

read_into_whole_text_stream duplicates the file-open / compression-wrapping boilerplate that already exists in read_into_line_chunk_stream (the single_url_get match arm, the CompressionCodec wrapping). Keeping them as separate free-standing functions means any future change to how files are opened or decompressed must be made in two places.

Consider extracting the shared setup into a small helper:

async fn open_reader(
    uri: &str,
    buffer_size: usize,
    io_client: Arc<IOClient>,
    io_stats: Option<IOStatsRef>,
) -> DaftResult<Box<dyn AsyncBufRead + Unpin + Send>> {
    let raw: Box<dyn AsyncBufRead + Unpin + Send> = match io_client
        .single_url_get(uri.to_string(), None, io_stats)
        .await?
    {
        GetResult::File(file) => Box::new(BufReader::with_capacity(
            buffer_size,
            File::open(file.path).await?,
        )),
        GetResult::Stream(stream, ..) => Box::new(BufReader::with_capacity(
            buffer_size,
            StreamReader::new(stream),
        )),
    };
    Ok(match CompressionCodec::from_uri(uri) {
        Some(codec) => Box::new(BufReader::with_capacity(buffer_size, codec.to_decoder(raw))),
        None => raw,
    })
}

Both read_into_whole_text_stream and read_into_line_chunk_stream would then call open_reader, keeping the algorithm-specific logic in each while eliminating the duplication.

Rule Used: Prefer single parametrized functions over multiple... (source)

Learnt From
Eventual-Inc/Daft#5207

io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<impl Stream<Item = DaftResult<String>> + Send> {
let buffer_size = read_options.buffer_size.unwrap_or(8 * 1024 * 1024);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent default buffer size for local files

In read_into_line_chunk_stream, local files (GetResult::File) use a 256 KiB default buffer:

let buffer_size = read_options.buffer_size.unwrap_or(256 * 1024);

Here, read_into_whole_text_stream always falls back to 8 MiB regardless of whether the source is a local file or a remote stream. For small local files this allocates a 32× larger buffer than necessary.

Consider using distinct defaults for local vs. remote sources to stay consistent with the rest of the codebase:

Suggested change
let buffer_size = read_options.buffer_size.unwrap_or(8 * 1024 * 1024);
let buffer_size = read_options.buffer_size.unwrap_or(8 * 1024 * 1024);

(or mirror the GetResult::File / GetResult::Stream split that read_into_line_chunk_stream uses).

Copy link
Collaborator

@desmondcheongzx desmondcheongzx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good, thanks for the PR @plotor! I left some nits but they're non-blocking

yield content;
})
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge deal, but I think greptile is right here!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense

// Check limit first - if limit is 0, don't read the file at all
if convert_options.limit == Some(0) {
return;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're missing a test for this path

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@plotor plotor force-pushed the zhenchao-read-text branch from 977676c to 384ebad Compare March 18, 2026 07:33
Signed-off-by: plotor <zhenchao.wang@hotmail.com>
@plotor plotor force-pushed the zhenchao-read-text branch from 384ebad to eb50eb4 Compare March 18, 2026 07:35
@plotor
Copy link
Collaborator Author

plotor commented Mar 18, 2026

@desmondcheongzx Thanks for review, please take a look again~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants