Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,18 @@ class TextSourceConfig:

encoding: str
skip_blank_lines: bool
whole_text: bool
buffer_size: int | None
chunk_size: int | None

def __init__(self, encoding: str, skip_blank_lines: bool, buffer_size: int | None, chunk_size: int | None): ...
def __init__(
self,
encoding: str,
skip_blank_lines: bool,
whole_text: bool,
buffer_size: int | None,
chunk_size: int | None,
): ...

class FileFormatConfig:
"""Configuration for parsing a particular file format (Parquet, CSV, JSON)."""
Expand Down
15 changes: 14 additions & 1 deletion daft/io/_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def read_text(
*,
encoding: str = "utf-8",
skip_blank_lines: bool = True,
whole_text: bool = False,
file_path_column: str | None = None,
hive_partitioning: bool = False,
io_config: IOConfig | None = None,
Expand All @@ -26,15 +27,21 @@ def read_text(
path: Path to text file(s). Supports wildcards and remote URLs such as ``s3://`` or ``gs://``.
encoding: Encoding of the input files, defaults to ``"utf-8"``.
skip_blank_lines: Whether to skip empty lines (after stripping whitespace). Defaults to ``True``.
When ``whole_text=True``, this skips files that are entirely blank.
whole_text: Whether to read each file as a single row. Defaults to ``False``.
When ``False``, each line in the file becomes a row in the DataFrame.
When ``True``, the entire content of each file becomes a single row in the DataFrame.
file_path_column: Include the source path(s) as a column with this name. Defaults to ``None``.
hive_partitioning: Whether to infer hive-style partitions from file paths and include them as
columns in the DataFrame. Defaults to ``False``.
io_config: IO configuration for the native downloader.
_buffer_size: Optional tuning parameter for the underlying streaming reader buffer size (bytes).
_chunk_size: Optional tuning parameter for the underlying streaming reader chunk size (rows).
Has no effect when ``whole_text=True``.

Returns:
DataFrame: A DataFrame with a single ``"text"`` column containing lines from the input files.
DataFrame: A DataFrame with a single ``"text"`` column containing lines from the input files
(when ``whole_text=False``) or entire file contents (when ``whole_text=True``).

Examples:
Read a text file from a local path:
Expand All @@ -49,6 +56,11 @@ def read_text(
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
>>> df = daft.read_text("s3://path/to/files-*.txt", io_config=io_config)
>>> df.show()

Read multiple small files, each as a single row:

>>> df = daft.read_text("/path/to/files/*.txt", whole_text=True)
>>> df.show()
"""
if isinstance(path, list) and len(path) == 0:
raise ValueError("Cannot read DataFrame from empty list of text filepaths")
Expand All @@ -57,6 +69,7 @@ def read_text(
text_config = TextSourceConfig(
encoding=encoding,
skip_blank_lines=skip_blank_lines,
whole_text=whole_text,
buffer_size=_buffer_size,
chunk_size=_chunk_size,
)
Expand Down
1 change: 1 addition & 0 deletions src/daft-local-execution/src/sources/scan_task_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ async fn read_text(
let convert_options = TextConvertOptions::new(
&cfg.encoding,
cfg.skip_blank_lines,
cfg.whole_text,
Some(schema_of_file),
scan_task.pushdowns.limit,
);
Expand Down
19 changes: 18 additions & 1 deletion src/daft-scan/src/file_format_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ impl_bincode_py_state_serialization!(WarcSourceConfig);
pub struct TextSourceConfig {
pub encoding: String,
pub skip_blank_lines: bool,
pub whole_text: bool,
pub buffer_size: Option<usize>,
pub chunk_size: Option<usize>,
}
Expand All @@ -455,18 +456,21 @@ impl TextSourceConfig {
#[pyo3(signature = (
encoding,
skip_blank_lines,
whole_text=false,
buffer_size=None,
chunk_size=None
chunk_size=None,
))]
fn new(
encoding: String,
skip_blank_lines: bool,
whole_text: bool,
buffer_size: Option<usize>,
chunk_size: Option<usize>,
) -> PyResult<Self> {
Ok(Self {
encoding,
skip_blank_lines,
whole_text,
buffer_size,
chunk_size,
})
Expand All @@ -479,6 +483,7 @@ impl TextSourceConfig {
let mut res = vec![];
res.push(format!("Encoding = {}", self.encoding));
res.push(format!("Skip blank lines = {}", self.skip_blank_lines));
res.push(format!("Whole text = {}", self.whole_text));
if let Some(buffer_size) = self.buffer_size {
res.push(format!("Buffer size = {buffer_size}"));
}
Expand All @@ -489,4 +494,16 @@ impl TextSourceConfig {
}
}

impl Default for TextSourceConfig {
fn default() -> Self {
Self {
encoding: "utf-8".to_string(),
skip_blank_lines: true,
whole_text: false,
buffer_size: None,
chunk_size: None,
}
}
}

impl_bincode_py_state_serialization!(TextSourceConfig);
3 changes: 3 additions & 0 deletions src/daft-text/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ tokio = {workspace = true}
tokio-stream = {workspace = true, features = ["io-util"]}
tokio-util = {workspace = true}

[dev-dependencies]
flate2 = {version = "1.1", features = ["zlib-rs"], default-features = false}

[features]
python = [
"common-error/python",
Expand Down
5 changes: 4 additions & 1 deletion src/daft-text/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
pub struct TextConvertOptions {
pub encoding: String,
pub skip_blank_lines: bool,
pub whole_text: bool,
pub schema: Option<SchemaRef>,
pub limit: Option<usize>,
}
Expand All @@ -15,12 +16,14 @@ impl TextConvertOptions {
pub fn new(
encoding: &str,
skip_blank_lines: bool,
whole_text: bool,
schema: Option<SchemaRef>,
limit: Option<usize>,
) -> Self {
Self {
encoding: encoding.to_string(),
skip_blank_lines,
whole_text,
schema,
limit,
}
Expand All @@ -29,7 +32,7 @@ impl TextConvertOptions {

impl Default for TextConvertOptions {
fn default() -> Self {
Self::new("utf-8", true, None, None)
Self::new("utf-8", true, false, None, None)
}
}

Expand Down
Loading
Loading