Skip to content

Commit e475323

Browse files
plotordesmondcheongzxclaude
authored
feat: Add read_text API to support reading text files (#6111)
## Changes Made <!-- Describe what changes were made and why. Include implementation details if necessary. --> Add the `read_text` API to support reading text files line by line. ## Related Issues <!-- Link to related GitHub issues, e.g., "Closes #123" --> #2859 --------- Signed-off-by: plotor <zhenchao.wang@hotmail.com> Co-authored-by: desmondcheongzx <desmondcheongzx@gmail.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 80d670f commit e475323

File tree

24 files changed

+776
-28
lines changed

24 files changed

+776
-28
lines changed

Cargo.lock

Lines changed: 37 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ daft-compression = {path = "src/daft-compression", default-features = false}
2121
daft-context = {path = "src/daft-context", default-features = false}
2222
daft-core = {path = "src/daft-core", default-features = false}
2323
daft-csv = {path = "src/daft-csv", default-features = false}
24+
daft-text = {path = "src/daft-text", default-features = false}
2425
daft-dashboard = {path = "src/daft-dashboard", default-features = false}
2526
daft-distributed = {path = "src/daft-distributed", default-features = false}
2627
daft-dsl = {path = "src/daft-dsl", default-features = false}
@@ -78,6 +79,7 @@ python = [
7879
"daft-context/python",
7980
"daft-core/python",
8081
"daft-csv/python",
82+
"daft-text/python",
8183
"daft-dashboard/python",
8284
"daft-distributed/python",
8385
"daft-dsl/python",
@@ -228,7 +230,8 @@ members = [
228230
"src/daft-writers",
229231
"src/hyperloglog",
230232
"src/parquet2",
231-
"src/daft-cli"
233+
"src/daft-cli",
234+
"src/daft-text"
232235
]
233236

234237
[workspace.dependencies]

daft/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ def refresh_logger() -> None:
138138
read_json,
139139
read_parquet,
140140
read_sql,
141+
read_text,
141142
read_video_frames,
142143
read_warc,
143144
read_huggingface,
@@ -259,6 +260,7 @@ def __getattr__(name: str) -> object:
259260
"read_parquet",
260261
"read_sql",
261262
"read_table",
263+
"read_text",
262264
"read_video_frames",
263265
"read_warc",
264266
"refresh_logger",

daft/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ def set_execution_config(
172172
csv_target_filesize: int | None = None,
173173
csv_inflation_factor: float | None = None,
174174
json_inflation_factor: float | None = None,
175+
text_inflation_factor: float | None = None,
175176
shuffle_aggregation_default_partitions: int | None = None,
176177
partial_aggregation_threshold: int | None = None,
177178
high_cardinality_aggregation_threshold: float | None = None,
@@ -216,6 +217,7 @@ def set_execution_config(
216217
csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
217218
csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
218219
json_inflation_factor: Inflation Factor of JSON files (In-Memory-Size / File-Size) ratio. Defaults to 0.25
220+
text_inflation_factor: Inflation Factor of Text files (In-Memory-Size / File-Size) ratio. Defaults to 1.0
219221
shuffle_aggregation_default_partitions: Maximum number of partitions to create when performing aggregations on the Ray Runner. Defaults to 200, unless the number of input partitions is less than 200.
220222
partial_aggregation_threshold: Threshold for performing partial aggregations on the Native Runner. Defaults to 10000 rows.
221223
high_cardinality_aggregation_threshold: Threshold selectivity for performing high cardinality aggregations on the Native Runner. Defaults to 0.8.
@@ -253,6 +255,7 @@ def set_execution_config(
253255
csv_target_filesize=csv_target_filesize,
254256
csv_inflation_factor=csv_inflation_factor,
255257
json_inflation_factor=json_inflation_factor,
258+
text_inflation_factor=text_inflation_factor,
256259
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
257260
partial_aggregation_threshold=partial_aggregation_threshold,
258261
high_cardinality_aggregation_threshold=high_cardinality_aggregation_threshold,

daft/daft/__init__.pyi

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,16 @@ class DatabaseSourceConfig:
319319

320320
def __init__(self, sql: str, conn_factory: SQLConnection): ...
321321

322+
class TextSourceConfig:
323+
"""Configuration of a text data source."""
324+
325+
encoding: str
326+
skip_blank_lines: bool
327+
buffer_size: int | None
328+
chunk_size: int | None
329+
330+
def __init__(self, encoding: str, skip_blank_lines: bool, buffer_size: int | None, chunk_size: int | None): ...
331+
322332
class FileFormatConfig:
323333
"""Configuration for parsing a particular file format (Parquet, CSV, JSON)."""
324334

@@ -349,6 +359,11 @@ class FileFormatConfig:
349359
"""Create a database file format config."""
350360
...
351361

362+
@staticmethod
363+
def from_text_config(config: TextSourceConfig) -> FileFormatConfig:
364+
"""Create a Text file format config."""
365+
...
366+
352367
def file_format(self) -> FileFormat:
353368
"""Get the file format for this config."""
354369
...
@@ -883,6 +898,7 @@ class TosConfig:
883898
) -> TosConfig:
884899
"""Replaces values if provided, returning a new TosConfig."""
885900
...
901+
886902
@staticmethod
887903
def from_env() -> TosConfig:
888904
"""Creates a TosConfig, retrieving credentials and configurations from the current environment.
@@ -970,6 +986,7 @@ class CosConfig:
970986
) -> CosConfig:
971987
"""Replaces values if provided, returning a new CosConfig."""
972988
...
989+
973990
@staticmethod
974991
def from_env() -> CosConfig:
975992
"""Creates a CosConfig, retrieving credentials and configurations from the current environment.
@@ -2227,6 +2244,7 @@ class PyDaftExecutionConfig:
22272244
csv_target_filesize: int | None = None,
22282245
csv_inflation_factor: float | None = None,
22292246
json_inflation_factor: float | None = None,
2247+
text_inflation_factor: float | None = None,
22302248
shuffle_aggregation_default_partitions: int | None = None,
22312249
partial_aggregation_threshold: int | None = None,
22322250
high_cardinality_aggregation_threshold: float | None = None,
@@ -2274,6 +2292,8 @@ class PyDaftExecutionConfig:
22742292
@property
22752293
def json_inflation_factor(self) -> float: ...
22762294
@property
2295+
def text_inflation_factor(self) -> float: ...
2296+
@property
22772297
def shuffle_aggregation_default_partitions(self) -> int: ...
22782298
@property
22792299
def partial_aggregation_threshold(self) -> int: ...

daft/io/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818
from daft.lazy_import import LazyImport
1919
from daft.io._csv import read_csv
20+
from daft.io._text import read_text
2021
from daft.io.delta_lake._deltalake import read_deltalake
2122
from daft.io.hudi._hudi import read_hudi
2223
from daft.io.iceberg._iceberg import read_iceberg
@@ -75,6 +76,7 @@ def __getattr__(name: str) -> object:
7576
"read_mcap",
7677
"read_parquet",
7778
"read_sql",
79+
"read_text",
7880
"read_video_frames",
7981
"read_warc",
8082
]

daft/io/_text.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# ruff: noqa: I002
2+
# isort: dont-add-import: from __future__ import annotations
3+
4+
from daft import DataType, context
5+
from daft.api_annotations import PublicAPI
6+
from daft.daft import FileFormatConfig, IOConfig, StorageConfig, TextSourceConfig
7+
from daft.dataframe import DataFrame
8+
from daft.io.common import get_tabular_files_scan
9+
10+
11+
@PublicAPI
12+
def read_text(
13+
path: str | list[str],
14+
*,
15+
encoding: str = "utf-8",
16+
skip_blank_lines: bool = True,
17+
file_path_column: str | None = None,
18+
hive_partitioning: bool = False,
19+
io_config: IOConfig | None = None,
20+
_buffer_size: int | None = None,
21+
_chunk_size: int | None = None,
22+
) -> DataFrame:
23+
"""Creates a DataFrame from line-oriented text file(s).
24+
25+
Args:
26+
path: Path to text file(s). Supports wildcards and remote URLs such as ``s3://`` or ``gs://``.
27+
encoding: Encoding of the input files, defaults to ``"utf-8"``.
28+
skip_blank_lines: Whether to skip empty lines (after stripping whitespace). Defaults to ``True``.
29+
file_path_column: Include the source path(s) as a column with this name. Defaults to ``None``.
30+
hive_partitioning: Whether to infer hive-style partitions from file paths and include them as
31+
columns in the DataFrame. Defaults to ``False``.
32+
io_config: IO configuration for the native downloader.
33+
_buffer_size: Optional tuning parameter for the underlying streaming reader buffer size (bytes).
34+
_chunk_size: Optional tuning parameter for the underlying streaming reader chunk size (rows).
35+
36+
Returns:
37+
DataFrame: A DataFrame with a single ``"text"`` column containing lines from the input files.
38+
39+
Examples:
40+
Read a text file from a local path:
41+
42+
>>> import daft
43+
>>> df = daft.read_text("/path/to/file.txt")
44+
>>> df.show()
45+
46+
Read a text file from a public S3 bucket:
47+
48+
>>> from daft.io import S3Config, IOConfig
49+
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
50+
>>> df = daft.read_text("s3://path/to/files-*.txt", io_config=io_config)
51+
>>> df.show()
52+
"""
53+
if isinstance(path, list) and len(path) == 0:
54+
raise ValueError("Cannot read DataFrame from empty list of text filepaths")
55+
56+
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
57+
text_config = TextSourceConfig(
58+
encoding=encoding,
59+
skip_blank_lines=skip_blank_lines,
60+
buffer_size=_buffer_size,
61+
chunk_size=_chunk_size,
62+
)
63+
file_format_config = FileFormatConfig.from_text_config(text_config)
64+
storage_config = StorageConfig(True, io_config)
65+
66+
# Text schema is fixed
67+
schema = {"text": DataType.string()}
68+
builder = get_tabular_files_scan(
69+
path=path,
70+
infer_schema=False,
71+
schema=schema,
72+
file_format_config=file_format_config,
73+
storage_config=storage_config,
74+
file_path_column=file_path_column,
75+
hive_partitioning=hive_partitioning,
76+
)
77+
return DataFrame(builder)

0 commit comments

Comments
 (0)