Skip to content

Add enable_partition_merge_within_scan_task in execution_config #6078

@everySympathy

Description

@everySympathy

Is your feature request related to a problem?

When we are reading a dataset consisting of an extremely large number of small files,with enable_scan_task_split_and_merge enabled in execution_config, Daft reduces the number of distributed scan tasks (good for scheduling), but the number of output partitions produced by a single scan task can still remain very large. This happens because scan execution still yields many MicroPartition s/ObjectRefs per task due to streaming reads and internal splitting.

This becomes a bottleneck for jobs that need to collect or fan out partition refs (e.g. df.iter_partitions()), where the driver overhead scales with the number of ObjectRefs(very large) rather than the number of tasks(merged, become much smaller).

Describe the solution you'd like

Introduce a new execution config flag:

  • enable_partition_merge_within_scan_task: bool

Semantics:

  • When enabled, Daft should coalesce/merge output partitions within a single scan task before yielding them downstream.
  • This is a local, per-task operation (no cross-task shuffle). The goal is to reduce the number of yielded MicroPartition s/ObjectRefs per scan task after task-level merging.

Expected Benefit

  • Keeps the scheduling benefits of enable_scan_task_split_and_merge while also reducing downstream/driver pressure caused by excessive partition outputs.

Describe alternatives you've considered

No response

Additional Context

No response

Would you like to implement a fix?

Yes

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions