abstract out dbt deps/clean steps in orchestration#1324
Conversation
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
WalkthroughThis PR automates the management of DBT cleanup tasks ( Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant API as API Layer
participant PipelineService as PipelineService
participant OrgTaskDB as OrgTask DB
participant Prefect as Prefect Backend
User->>API: Submit pipeline with user transform tasks (exclude dbt-clean/deps)
API->>PipelineService: Update pipeline with transform tasks
PipelineService->>PipelineService: Filter out auto-managed DBT tasks
PipelineService->>OrgTaskDB: Check for existing dbt-clean task
alt dbt-clean missing
OrgTaskDB-->>PipelineService: Not found
PipelineService->>OrgTaskDB: Create dbt-clean task
else dbt-clean exists
OrgTaskDB-->>PipelineService: Return existing task
end
PipelineService->>OrgTaskDB: Check for existing dbt-deps task
alt dbt-deps missing
OrgTaskDB-->>PipelineService: Not found
PipelineService->>OrgTaskDB: Create dbt-deps task
else dbt-deps exists
OrgTaskDB-->>PipelineService: Return existing task
end
PipelineService->>PipelineService: Build task sequence: [dbt-clean, dbt-deps, ...user DBT tasks]
PipelineService->>Prefect: Deploy updated pipeline
Prefect-->>API: Deployment confirmation
API-->>User: Success response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 20 minutes and 41 seconds.Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1324 +/- ##
==========================================
+ Coverage 58.37% 58.42% +0.05%
==========================================
Files 132 132
Lines 15615 15652 +37
==========================================
+ Hits 9115 9145 +30
- Misses 6500 6507 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ddpui/core/orchestrate/pipeline_service.py (1)
172-185:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPersisting auto-managed DBT tasks here leaks hidden UUIDs into pipeline details.
all_orgtasksis later remapped intoDataflowOrgTask, andPipelineService.get_pipeline_details()still returns every DBT/DBTCLOUD mapping astransformTasks. After this change, edit/read responses will includedbt-clean/dbt-depsUUIDs even thoughddpui/api/orgtask_api.py:get_prefect_transformation_tasks(..., exclude_git=True)no longer exposes them. That makes the pipeline-details payload non-round-trippable for the task picker and can break edit flows. Please filter the auto-managed slugs back out when serializingtransformTasks.Possible follow-up outside this range
@@ - transform_tasks = [ + auto_managed_task_slugs = {TASK_DBTCLEAN, TASK_DBTDEPS} + transform_tasks = [ {"uuid": dataflow_orgtask.orgtask.uuid, "seq": dataflow_orgtask.seq} for dataflow_orgtask in DataflowOrgTask.objects.filter( dataflow=org_data_flow, orgtask__task__type__in=[TaskType.DBT, TaskType.DBTCLOUD], ) + .exclude(orgtask__task__slug__in=auto_managed_task_slugs) .all() .order_by("seq") ]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ddpui/core/orchestrate/pipeline_service.py` around lines 172 - 185, The pipeline is currently including auto-managed DBT tasks (from auto_managed_dbt_orgtasks / all_orgtasks) into map_org_tasks so PipelineService.get_pipeline_details() serializes those hidden UUIDs into transformTasks; fix by filtering out auto-managed DBT tasks when preparing the mapping/serialization: when you assign map_org_tasks (or right before serializing transformTasks in PipelineService.get_pipeline_details), remove any org tasks that match the auto-managed DBT slugs (e.g., "dbt-clean","dbt-deps") or have the auto-managed marker used in DataflowOrgTask (or compare against the auto_managed_dbt_orgtasks list) so that transformTasks only includes user-visible tasks and the payload remains round-trippable with orgtask_api.get_prefect_transformation_tasks(..., exclude_git=True).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ddpui/management/commands/backfill_auto_managed_tasks.py`:
- Around line 158-164: The current sequence toggles the Prefect schedule
inactive→active via PipelineService.set_pipeline_schedule(org,
dataflow.deployment_id, "inactive") and then "active" which can leave the
pipeline paused if the second call fails; modify the block so that after making
the schedule inactive you attempt to set it back to "active" inside a guarded
retry/restore path (try/except/finally): call
PipelineService.set_pipeline_schedule(...) to reactivate and on any exception
immediately log the error with context (include dataflow.deployment_id and org),
perform a best-effort retry or a compensating call to re-enable the schedule
(e.g., another set_pipeline_schedule(..., "active") attempt) and surface failure
but avoid leaving the schedule disabled; ensure this logic is colocated with the
existing update_pipeline() success flow so update_pipeline() remains committed
and schedules are restored on failure.
- Around line 90-103: The current skip logic checks only for TASK_DBTCLEAN and
TASK_DBTDEPS using DataflowOrgTask and will skip pipelines that still lack the
auto-managed git step; update the condition so we only skip when dbt-clean,
dbt-deps AND the auto-managed git mapping are present. Concretely, augment the
exists check (DataflowOrgTask.objects.filter(...).exists()) to also verify the
git/orgtask mapping (or the relevant git task slug) is present, or alternatively
remove the early continue and always call
PipelineService.update_pipeline(dataflow, ...) for pipelines that have dbt-clean
and/or dbt-deps so update_pipeline can add the missing git step; reference
DataflowOrgTask, TASK_DBTCLEAN, TASK_DBTDEPS and PipelineService.update_pipeline
when making the change.
---
Outside diff comments:
In `@ddpui/core/orchestrate/pipeline_service.py`:
- Around line 172-185: The pipeline is currently including auto-managed DBT
tasks (from auto_managed_dbt_orgtasks / all_orgtasks) into map_org_tasks so
PipelineService.get_pipeline_details() serializes those hidden UUIDs into
transformTasks; fix by filtering out auto-managed DBT tasks when preparing the
mapping/serialization: when you assign map_org_tasks (or right before
serializing transformTasks in PipelineService.get_pipeline_details), remove any
org tasks that match the auto-managed DBT slugs (e.g., "dbt-clean","dbt-deps")
or have the auto-managed marker used in DataflowOrgTask (or compare against the
auto_managed_dbt_orgtasks list) so that transformTasks only includes
user-visible tasks and the payload remains round-trippable with
orgtask_api.get_prefect_transformation_tasks(..., exclude_git=True).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c343216c-0561-4214-b10e-3ac332b846d7
📒 Files selected for processing (5)
ddpui/api/orgtask_api.pyddpui/core/orchestrate/pipeline_service.pyddpui/management/commands/backfill_auto_managed_tasks.pyddpui/tests/api_tests/test_pipeline_api.pyddpui/utils/constants.py
💤 Files with no reviewable changes (1)
- ddpui/utils/constants.py
| # Check if dbt-clean and dbt-deps are already present | ||
| has_dbt_clean = DataflowOrgTask.objects.filter( | ||
| dataflow=dataflow, orgtask__task__slug=TASK_DBTCLEAN | ||
| ).exists() | ||
| has_dbt_deps = DataflowOrgTask.objects.filter( | ||
| dataflow=dataflow, orgtask__task__slug=TASK_DBTDEPS | ||
| ).exists() | ||
|
|
||
| if has_dbt_clean and has_dbt_deps: | ||
| self.stdout.write( | ||
| f" → Skipping {dataflow.deployment_name} (already has dbt-clean and dbt-deps)" | ||
| ) | ||
| skipped += 1 | ||
| continue |
There was a problem hiding this comment.
This skip condition misses pipelines that only lack the auto-managed git step.
A legacy CLI DBT pipeline that already has dbt-clean/dbt-deps but is missing its git pull/clone mapping will be skipped here, even though PipelineService.update_pipeline() would add that missing step. That leaves part of the fleet unbackfilled.
Suggested fix
has_dbt_clean = DataflowOrgTask.objects.filter(
dataflow=dataflow, orgtask__task__slug=TASK_DBTCLEAN
).exists()
has_dbt_deps = DataflowOrgTask.objects.filter(
dataflow=dataflow, orgtask__task__slug=TASK_DBTDEPS
).exists()
+ has_git_task = DataflowOrgTask.objects.filter(
+ dataflow=dataflow, orgtask__task__type=TaskType.GIT
+ ).exists()
- if has_dbt_clean and has_dbt_deps:
+ if has_git_task and has_dbt_clean and has_dbt_deps:
self.stdout.write(
- f" → Skipping {dataflow.deployment_name} (already has dbt-clean and dbt-deps)"
+ f" → Skipping {dataflow.deployment_name} "
+ f"(already has git + dbt-clean + dbt-deps)"
)
skipped += 1
continue
missing = []
+ if not has_git_task:
+ missing.append("git")
if not has_dbt_clean:
missing.append("dbt-clean")
if not has_dbt_deps:
missing.append("dbt-deps")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ddpui/management/commands/backfill_auto_managed_tasks.py` around lines 90 -
103, The current skip logic checks only for TASK_DBTCLEAN and TASK_DBTDEPS using
DataflowOrgTask and will skip pipelines that still lack the auto-managed git
step; update the condition so we only skip when dbt-clean, dbt-deps AND the
auto-managed git mapping are present. Concretely, augment the exists check
(DataflowOrgTask.objects.filter(...).exists()) to also verify the git/orgtask
mapping (or the relevant git task slug) is present, or alternatively remove the
early continue and always call PipelineService.update_pipeline(dataflow, ...)
for pipelines that have dbt-clean and/or dbt-deps so update_pipeline can add the
missing git step; reference DataflowOrgTask, TASK_DBTCLEAN, TASK_DBTDEPS and
PipelineService.update_pipeline when making the change.
| # Toggle schedule inactive → active to clear pre-scheduled runs. | ||
| # Prefect schedules runs 1-2 days in advance; those won't pick up the | ||
| # updated deployment params unless the schedule is reset. | ||
| # Only do this for pipelines that have an active schedule. | ||
| if dataflow.cron and pipeline_details.get("isScheduleActive", False): | ||
| PipelineService.set_pipeline_schedule(org, dataflow.deployment_id, "inactive") | ||
| PipelineService.set_pipeline_schedule(org, dataflow.deployment_id, "active") |
There was a problem hiding this comment.
A failure between inactive and active leaves the pipeline paused.
update_pipeline() has already succeeded by this point. If the second set_pipeline_schedule(..., "active") call errors, the command reports a failure but the schedule stays disabled. This needs a best-effort restore path so a partial backfill does not silently turn off production schedules.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ddpui/management/commands/backfill_auto_managed_tasks.py` around lines 158 -
164, The current sequence toggles the Prefect schedule inactive→active via
PipelineService.set_pipeline_schedule(org, dataflow.deployment_id, "inactive")
and then "active" which can leave the pipeline paused if the second call fails;
modify the block so that after making the schedule inactive you attempt to set
it back to "active" inside a guarded retry/restore path (try/except/finally):
call PipelineService.set_pipeline_schedule(...) to reactivate and on any
exception immediately log the error with context (include dataflow.deployment_id
and org), perform a best-effort retry or a compensating call to re-enable the
schedule (e.g., another set_pipeline_schedule(..., "active") attempt) and
surface failure but avoid leaving the schedule disabled; ensure this logic is
colocated with the existing update_pipeline() success flow so update_pipeline()
remains committed and schedules are restored on failure.
Summary by CodeRabbit
New Features
Chores