Conversation
- Add BranchExecutor for parallel branch execution from common checkpoint - Support common_process and branches configuration - Auto-deduplicate common processing steps - Each branch has its own export_path - Update factory and config to support branch executor type
…mo configs - Remove branch_mvp / branch_mvp_ray / branch_mvp_ray_pure yamls (configs live in docs + dj-hub) - Consolidate MVP examples in BranchExecution.md; point to dj-hub for full configs - Add Future TODOs in branch_executor.py (DAG mixin, Ray resources, validation, tests) - Keep demos/branch_execution_example.yaml as single reference Co-authored-by: Cursor <cursoragent@cursor.com>
…te demos/branch_execution_example.yaml Co-authored-by: Cursor <cursoragent@cursor.com>
Summary of ChangesHello @yxdyc, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a powerful new multi-branch execution capability, allowing users to define a common data processing pipeline followed by multiple divergent branches that operate on the common output. This design promotes efficiency by executing shared steps only once and enables parallel exploration of different processing paths, with support for both local threading and distributed Ray backends. It also includes robust error handling with fail-fast and retry mechanisms, along with detailed documentation to guide usage. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new BranchExecutor for running multi-branch data processing pipelines from a common checkpoint. This is a significant and well-implemented feature, even in its WIP state, that supports both thread and ray backends, includes fail_fast and retry mechanisms, and uses minimal serializable configs for ray for efficiency. The code is well-structured, and the addition of BranchExecution.md provides clear documentation.
My review includes a few minor suggestions for improving code clarity and efficiency, such as avoiding redundant calculations and imports. I also have a suggestion to improve the readability of the new documentation file.
Overall, this is a great addition to the project.
| ref = ray.put(dataset) | ||
| return {"ref": ref} | ||
| except Exception: | ||
| import os |
| if hasattr(self, "_log_event"): | ||
| self._log_event( | ||
| EventType.JOB_COMPLETE, | ||
| f"Branch job completed in {time.time() - run_start:.2f}s", | ||
| duration=time.time() - run_start, | ||
| metadata={"completed": list(branch_results.keys()), "failed": list(errors.keys())}, | ||
| ) |
There was a problem hiding this comment.
The duration time.time() - run_start is calculated twice. It's more efficient and readable to calculate it once and store it in a variable.
| if hasattr(self, "_log_event"): | |
| self._log_event( | |
| EventType.JOB_COMPLETE, | |
| f"Branch job completed in {time.time() - run_start:.2f}s", | |
| duration=time.time() - run_start, | |
| metadata={"completed": list(branch_results.keys()), "failed": list(errors.keys())}, | |
| ) | |
| if hasattr(self, "_log_event"): | |
| job_duration = time.time() - run_start | |
| self._log_event( | |
| EventType.JOB_COMPLETE, | |
| f"Branch job completed in {job_duration:.2f}s", | |
| duration=job_duration, | |
| metadata={"completed": list(branch_results.keys()), "failed": list(errors.keys())}, | |
| ) |
| if hasattr(self, "_log_event"): | ||
| self._log_event( | ||
| EventType.DAG_NODE_COMPLETE, | ||
| f"Branch {branch_name} completed in {time.time() - start:.2f}s", | ||
| operation_name=branch_name, | ||
| duration=time.time() - start, | ||
| ) |
There was a problem hiding this comment.
The duration time.time() - start is calculated twice. It's more efficient and readable to calculate it once and store it in a variable.
| if hasattr(self, "_log_event"): | |
| self._log_event( | |
| EventType.DAG_NODE_COMPLETE, | |
| f"Branch {branch_name} completed in {time.time() - start:.2f}s", | |
| operation_name=branch_name, | |
| duration=time.time() - start, | |
| ) | |
| if hasattr(self, "_log_event"): | |
| duration = time.time() - start | |
| self._log_event( | |
| EventType.DAG_NODE_COMPLETE, | |
| f"Branch {branch_name} completed in {duration:.2f}s", | |
| operation_name=branch_name, | |
| duration=duration, | |
| ) |
| retries: 0 # per-branch retry count (default: 0) | ||
| ``` | ||
|
|
||
| - With `backend: ray`, each branch runs as a Ray remote task; common dataset is shared via object store when possible, else via disk. Branch configs are minimal. When `run_common_on_ray: true`, the common process also runs as one Ray task (dataset_path and work_dir must be worker-accessible). |
There was a problem hiding this comment.
This line is quite long and contains multiple distinct points. For better readability, consider breaking it down into a nested list.
| - With `backend: ray`, each branch runs as a Ray remote task; common dataset is shared via object store when possible, else via disk. Branch configs are minimal. When `run_common_on_ray: true`, the common process also runs as one Ray task (dataset_path and work_dir must be worker-accessible). | |
| - With `backend: ray`: | |
| - Each branch runs as a Ray remote task. | |
| - The common dataset is shared via the object store when possible, otherwise via disk. | |
| - Branch configs are minimal to reduce serialization overhead. | |
| - When `run_common_on_ray: true`, the common process also runs as a Ray task, which requires `dataset_path` and `work_dir` to be accessible by all workers. |
Multi-branch execution from a common checkpoint:
executor_type: branchwithcommon_processand per-branchprocess/export_path.thread(default) andraybackends