-
Notifications
You must be signed in to change notification settings - Fork 433
feat/perf: Row-group-level parallelism for parquet scans #6552
Description
Problem
When reading parquet files in distributed mode, Daft creates 1 scan task per file. This means a single large parquet file (or a dataset with very few files) will only utilize 1 worker, even on a cluster with hundreds of workers.
This is a common pattern for lookup/metadata tables:
df = daft.read_parquet("s3://bucket/large_single_file.parquet")
df = df.with_column(
"data",
col("url").download().image_decode("png").resize(224, 224),
)With a 200-worker cluster, this runs entirely on a single worker because there's only 1 file = 1 scan task.
Current Workaround
The suggested workaround is to force materialization and repartition:
df = daft.read_parquet("s3://bucket/large_single_file.parquet")
df = df.into_batches(1024)This is suboptimal because it introduces a full materialization + shuffle barrier before any downstream work can begin.
Proposed Solution
Support row-group-level parallelism for parquet scans. Instead of always creating 1 scan task per file, allow splitting a single file into N scan tasks where N is the number of row groups in that file.
A parquet file with 50 row groups would produce 50 independent scan tasks, each reading a single row group. This gives natural parallelism without any materialization or shuffle.
Possible API
# Flag on execution config
daft.set_execution_config(split_parquet_row_groups=True)
df = daft.read_parquet("s3://bucket/large_file.parquet")
df = df.with_column(
"data",
col("url").download().image_decode("png").resize(224, 224),
)This keeps the read API clean and makes it a global execution-level concern rather than a per-read option. It also leaves the door open for the runtime to make smarter decisions in the future (e.g., automatically splitting when file count is low relative to cluster parallelism).
Impact
This is especially important for physical AI / ML workloads where a common pattern is:
- Read a metadata/manifest parquet file (URLs, labels, etc.) -- often a single file
- Apply expensive per-row UDFs (download, decode, transform)
Step 2 is embarrassingly parallel but currently bottlenecked by step 1 producing only 1 partition.