Daft checkpoint design #5868
Replies: 9 comments 13 replies
-
|
Looking forward to discussing our checkpoint implementation with the community! We’ve built v1 of the checkpoint and would love to contribute! |
Beta Was this translation helpful? Give feedback.
-
|
Great work on the checkpoint proposal! This is a solid step forward for Daft's robustness. Excited to see this feature take shape! I have a few questions about this:
|
Beta Was this translation helpful? Give feedback.
-
Is there a mistake here? Should it be |
Beta Was this translation helpful? Give feedback.
-
Why is it necessary to add some |
Beta Was this translation helpful? Give feedback.
-
|
If needed later, I would be glad to participate in co-construction. |
Beta Was this translation helpful? Give feedback.
-
|
Thank you for the clear design and thorough documentation! I have a question about how the current checkpointing mechanism handles “row splits”. For example, in a RAG pipeline using Daft, a single PDF file (identified originally by its file_path) might be split into multiple text chunks during processing. As a result, the primary key for each output row changes from just file_path to a composite key like file_path + chunk_id. In this scenario, it becomes challenging for the sink to determine when all chunks corresponding to a given file_path have been fully processed and can safely be included in a checkpoint. How does Daft’s checkpointing design address this case? Another related issue arises when a PDF file is filtered out midway through the pipeline (e.g., due to quality or relevance criteria) and never reaches the sink. In that case, since no output rows are emitted for that file, it won’t be recorded in the checkpoint at all. Consequently, if the pipeline restarts, the same file would be reprocessed—even though it was already handled (and legitimately discarded) in a prior run. How does Daft’s checkpointing system address these cases to ensure exactly-once processing semantics and avoid redundant work? |
Beta Was this translation helpful? Give feedback.
-
|
I believe this proposal would benefit significantly from PR #5924. |
Beta Was this translation helpful? Give feedback.
-
|
#5931 |
Beta Was this translation helpful? Give feedback.
-
|
I want to revisit the benchmark comparison from the discussion. The benchmark compares the actor-based filter against anti-join(broadcast), and shows OOM at 189M rows. But Daft also supports hash join, which repartitions both sides by key hash so no single node holds the full key set. Could you re-run the benchmark with strategy="hash"? Curious to see how the numbers compare. |
Beta Was this translation helpful? Give feedback.

Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Context
A long-running job may fail and terminate due to various reasons (such as resource limitations, unstable environment, code bugs, etc.). Once a failure occurs during the intermediate process, restarting often means the entire workflow runs from the beginning, which leads to the already processed data being re-executed. This redundant computation is a huge waste of resources and time.
Therefore, we propose a design of checkpoint: to implement "incremental processing". For example, if the previous run terminates after processing and writing part of the data, subsequent runs will skip already processed data and only complete the missing part.
Design
The checkpoint in Daft enables incremental processing. Its core principle is using primary key (or composite primary key) to filter out rows that have already been processed, ensuring that only new data is processed and appended to target path.
This is achieved by injecting a filter predicate into the logical plan, immediately after the source node. When a write operation is initiated with a checkpoint_config, Daft first reads the primary keys from the existing data at the destination. This set of primary keys is then loaded into memory, distributed across a pool of checkpoint actors. During execution, the injected filter (actually is a UDF Actor) consults these actors to efficiently discard rows with primary keys that already exist. The DataFrame.write_* APIs should have been extended to accept checkpoint_config as a parameter, which controls this behavior mentioned above.
Planning
Milestone 1: Checkpointing for major and basic scenarios
Status: ✅ Completed
Tasks:
checkpoint_configparameter: must be a dictionary containing:- key_column: The name of the column(s) to use as the primary key/composite primary keys.
- num_buckets(optional): The number of checkpoint actors to create for sharding the primary keys set.
- num_cpus(optional): The number of CPUs to allocate for each checkpoint actor.
- batch_size(optional): The batch size of checkpoint filter operation.
Limits in Milestone 1:
Milestone 2: Checkpoint Enhancement.
Status: ⌛️ In Progress
Tasks:
Limits in Milestone 2:
Milestone 3: Checkpoint available for all formats, like Flink CDC
Today: an actor-based filter delivers incremental processes without external state.
Long term: we could consider a stateful checkpointing mode inspired by Flink CDC/checkpoints
Tasks:
Limits in Milestone 3:
Benchmark
We conducted a test: reading data from parquet files, and dedupping all the data. We tested two methods:
It is observed that this Milestone 1 checkpoint exhibits greater stability compared to anti-join-based deduplication, and can support larger-scale datasets without triggering OOM. This advantage stems from the fact that actor-based checkpointing eliminates the need for costly data shuffling operations.
Note about the above dedup benchmark:
Beta Was this translation helpful? Give feedback.
All reactions