Skip to content

Commit 9c957ab

Browse files
authored
Merge pull request #1324 from DalgoT4D/abstract-out-dbt-deps-clean-from-orchestration
abstract out dbt deps/clean steps in orchestration
2 parents a31afa7 + dd0a2fd commit 9c957ab

6 files changed

Lines changed: 323 additions & 29 deletions

File tree

ddpui/api/orgtask_api.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
from ddpui.utils import timezone
4242
from ddpui.utils.constants import (
4343
TASK_GITPULL,
44+
TASK_DBTCLEAN,
45+
TASK_DBTDEPS,
4446
TRANSFORM_TASKS_SEQ,
4547
TASK_GENERATE_EDR,
4648
LONG_RUNNING_TASKS,
@@ -227,7 +229,10 @@ def get_elemetary_task_lock(request):
227229
@orgtask_router.get("transform/")
228230
@has_permission(["can_view_orgtasks"])
229231
def get_prefect_transformation_tasks(request, exclude_git: bool = False):
230-
"""Fetch all dbt tasks for an org; client or system"""
232+
"""Fetch all dbt tasks for an org; client or system.
233+
When exclude_git=True (used by pipeline page), auto-managed tasks
234+
(git, dbt-clean, dbt-deps) are excluded since they are automatically
235+
added during pipeline creation/updation."""
231236
orguser: OrgUser = request.orguser
232237

233238
task_types = [TaskType.DBT, TaskType.DBTCLOUD]
@@ -252,10 +257,15 @@ def get_prefect_transformation_tasks(request, exclude_git: bool = False):
252257

253258
res = []
254259

260+
auto_managed_task_slugs = {TASK_DBTCLEAN, TASK_DBTDEPS}
261+
255262
for org_task in org_tasks:
256263
if org_task.task.slug not in TRANSFORM_TASKS_SEQ:
257264
continue
258265

266+
if exclude_git and org_task.task.slug in auto_managed_task_slugs:
267+
continue
268+
259269
# git pull : "git" + " " + "pull"
260270
# dbt run --full-refresh : "dbt" + " " + "run --full-refresh"
261271
command = None

ddpui/core/orchestrate/pipeline_service.py

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from ddpui.utils.constants import (
2525
TASK_AIRBYTESYNC,
2626
TASK_DBTRUN,
27+
TASK_DBTCLEAN,
28+
TASK_DBTDEPS,
2729
TASK_GITPULL,
2830
TASK_AIRBYTECLEAR,
2931
TASK_GITCLONE,
@@ -101,6 +103,10 @@ def _build_transform_tasks(
101103
dbt_orgtasks = []
102104
git_orgtasks = []
103105
dbt_cloud_orgtasks = []
106+
auto_managed_dbt_orgtasks = []
107+
108+
# Task slugs that are auto-managed and should not come from frontend
109+
auto_managed_task_slugs = {TASK_DBTCLEAN, TASK_DBTDEPS}
104110

105111
transform_tasks_payload.sort(key=lambda task: task.seq)
106112

@@ -111,21 +117,27 @@ def _build_transform_tasks(
111117
f"transform task with uuid {transform_task.uuid} not found"
112118
)
113119

114-
if org_task.task.type == TaskType.DBT:
115-
dbt_orgtasks.append(org_task)
116-
elif org_task.task.type == TaskType.GIT:
120+
if org_task.task.type == TaskType.GIT:
117121
# Skip git tasks - they should not come from frontend anymore
118122
logger.warning(
119123
f"Ignoring git task {org_task.task.slug} from frontend - git tasks are auto-managed"
120124
)
121125
continue
126+
elif (
127+
org_task.task.type == TaskType.DBT and org_task.task.slug in auto_managed_task_slugs
128+
):
129+
# Skip dbt-clean and dbt-deps - they are auto-managed
130+
logger.warning(f"Ignoring {org_task.task.slug} from frontend - auto-managed")
131+
continue
132+
elif org_task.task.type == TaskType.DBT:
133+
dbt_orgtasks.append(org_task)
122134
elif org_task.task.type == TaskType.DBTCLOUD:
123135
dbt_cloud_orgtasks.append(org_task)
124136

125137
logger.info(f"{len(dbt_orgtasks)} DBT cli tasks being pushed to the pipeline")
126138
logger.info(f"{len(dbt_cloud_orgtasks)} Dbt cloud tasks being pushed to the pipeline")
127139

128-
# Add git step automatically based on workpool type
140+
# Auto-add git and dbt-clean/dbt-deps steps when there are DBT tasks
129141
if len(dbt_orgtasks) > 0:
130142
if PipelineService._is_workpool_eks(org):
131143
logger.info("EKS workpool detected, adding git clone step before DBT tasks")
@@ -136,6 +148,12 @@ def _build_transform_tasks(
136148
git_pull_orgtask = PipelineService._get_or_create_git_pull_orgtask(org)
137149
git_orgtasks.insert(0, git_pull_orgtask)
138150

151+
# Auto-add dbt clean and dbt deps before other DBT tasks
152+
logger.info("Adding dbt clean and dbt deps steps before DBT tasks")
153+
dbt_clean_orgtask = PipelineService._get_or_create_dbt_clean_orgtask(org)
154+
dbt_deps_orgtask = PipelineService._get_or_create_dbt_deps_orgtask(org)
155+
auto_managed_dbt_orgtasks = [dbt_clean_orgtask, dbt_deps_orgtask]
156+
139157
# dbt cli profile block - only needed if we have DBT tasks
140158
cli_block = None
141159
if len(dbt_orgtasks) > 0:
@@ -151,9 +169,10 @@ def _build_transform_tasks(
151169
raise PipelineConfigurationError("dbt cloud creds block not found")
152170

153171
# get the deployment task configs
172+
all_orgtasks = git_orgtasks + auto_managed_dbt_orgtasks + dbt_orgtasks + dbt_cloud_orgtasks
154173
task_configs, error = pipeline_with_orgtasks(
155174
org,
156-
git_orgtasks + dbt_orgtasks + dbt_cloud_orgtasks,
175+
all_orgtasks,
157176
cli_block=cli_block,
158177
dbt_project_params=dbt_project_params,
159178
start_seq=len(existing_task_configs),
@@ -163,7 +182,7 @@ def _build_transform_tasks(
163182
if error:
164183
raise PipelineConfigurationError(error)
165184

166-
map_org_tasks = git_orgtasks + dbt_orgtasks + dbt_cloud_orgtasks
185+
map_org_tasks = all_orgtasks
167186
return task_configs, map_org_tasks
168187

169188
@staticmethod
@@ -734,3 +753,43 @@ def _get_or_create_git_pull_orgtask(org: Org) -> OrgTask:
734753
logger.info(f"Created git pull OrgTask for org {org.slug}")
735754

736755
return git_pull_orgtask
756+
757+
@staticmethod
758+
def _get_or_create_dbt_clean_orgtask(org: Org) -> OrgTask:
759+
"""Get or create dbt clean OrgTask for the organization"""
760+
dbt_clean_task = Task.objects.filter(slug=TASK_DBTCLEAN).first()
761+
if not dbt_clean_task:
762+
raise PipelineConfigurationError("dbt-clean task not found in database")
763+
764+
orgdbt = org.dbt
765+
if not orgdbt:
766+
raise PipelineConfigurationError("dbt configuration not found for organization")
767+
768+
dbt_clean_orgtask, created = OrgTask.objects.get_or_create(
769+
org=org, task=dbt_clean_task, dbt=orgdbt, defaults={"parameters": {}}
770+
)
771+
772+
if created:
773+
logger.info(f"Created dbt clean OrgTask for org {org.slug}")
774+
775+
return dbt_clean_orgtask
776+
777+
@staticmethod
778+
def _get_or_create_dbt_deps_orgtask(org: Org) -> OrgTask:
779+
"""Get or create dbt deps OrgTask for the organization"""
780+
dbt_deps_task = Task.objects.filter(slug=TASK_DBTDEPS).first()
781+
if not dbt_deps_task:
782+
raise PipelineConfigurationError("dbt-deps task not found in database")
783+
784+
orgdbt = org.dbt
785+
if not orgdbt:
786+
raise PipelineConfigurationError("dbt configuration not found for organization")
787+
788+
dbt_deps_orgtask, created = OrgTask.objects.get_or_create(
789+
org=org, task=dbt_deps_task, dbt=orgdbt, defaults={"parameters": {}}
790+
)
791+
792+
if created:
793+
logger.info(f"Created dbt deps OrgTask for org {org.slug}")
794+
795+
return dbt_deps_orgtask
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
"""
2+
Management command to backfill auto-managed tasks (git pull/clone, dbt clean, dbt deps)
3+
in all existing pipelines.
4+
5+
For each orchestrate pipeline that has transform tasks, this command will
6+
re-run update_pipeline which automatically adds the missing auto-managed steps
7+
based on the org's workpool configuration.
8+
"""
9+
10+
from django.core.management.base import BaseCommand
11+
from ddpui.models.org import Org, OrgDataFlowv1
12+
from ddpui.models.tasks import DataflowOrgTask, TaskType
13+
from ddpui.ddpprefect.schema import PrefectDataFlowUpdateSchema3
14+
from ddpui.ddpprefect import prefect_service
15+
from ddpui.utils.constants import TASK_DBTCLEAN, TASK_DBTDEPS
16+
from ddpui.utils.unified_logger import get_logger
17+
from ddpui.core.orchestrate.pipeline_service import PipelineService
18+
19+
logger = get_logger()
20+
21+
22+
class Command(BaseCommand):
23+
help = "Backfill auto-managed tasks (git pull/clone, dbt clean, dbt deps) in all existing pipelines"
24+
25+
def add_arguments(self, parser):
26+
parser.add_argument(
27+
"--org-slug",
28+
type=str,
29+
required=False,
30+
help="Only backfill for a specific organization (optional)",
31+
)
32+
parser.add_argument(
33+
"--dry-run",
34+
action="store_true",
35+
help="Show what would be changed without making actual changes",
36+
)
37+
38+
def handle(self, *args, **options):
39+
org_slug = options.get("org_slug")
40+
dry_run = options["dry_run"]
41+
42+
if org_slug:
43+
orgs = Org.objects.filter(slug=org_slug)
44+
if not orgs.exists():
45+
self.stdout.write(self.style.ERROR(f"Organization '{org_slug}' not found"))
46+
return
47+
else:
48+
orgs = Org.objects.all()
49+
50+
total_updated = 0
51+
total_skipped = 0
52+
total_errors = 0
53+
54+
for org in orgs:
55+
updated, skipped, errors = self.process_org(org, dry_run)
56+
total_updated += updated
57+
total_skipped += skipped
58+
total_errors += errors
59+
60+
self.stdout.write(f"\n{'[DRY RUN] ' if dry_run else ''}Summary:")
61+
self.stdout.write(f" Pipelines updated: {total_updated}")
62+
self.stdout.write(f" Pipelines skipped (no transform tasks): {total_skipped}")
63+
self.stdout.write(f" Errors: {total_errors}")
64+
65+
def process_org(self, org: Org, dry_run: bool):
66+
"""Process all orchestrate pipelines for an organization"""
67+
dataflows = OrgDataFlowv1.objects.filter(org=org, dataflow_type="orchestrate")
68+
69+
if not dataflows.exists():
70+
return 0, 0, 0
71+
72+
self.stdout.write(f"\nOrg: {org.slug} ({org.name})")
73+
74+
updated = 0
75+
skipped = 0
76+
errors = 0
77+
78+
for dataflow in dataflows:
79+
# Check if this pipeline has transform tasks
80+
has_transform = DataflowOrgTask.objects.filter(
81+
dataflow=dataflow,
82+
orgtask__task__type=TaskType.DBT,
83+
).exists()
84+
85+
if not has_transform:
86+
self.stdout.write(f" → Skipping {dataflow.deployment_name} (no transform tasks)")
87+
skipped += 1
88+
continue
89+
90+
# Check if dbt-clean and dbt-deps are already present
91+
has_dbt_clean = DataflowOrgTask.objects.filter(
92+
dataflow=dataflow, orgtask__task__slug=TASK_DBTCLEAN
93+
).exists()
94+
has_dbt_deps = DataflowOrgTask.objects.filter(
95+
dataflow=dataflow, orgtask__task__slug=TASK_DBTDEPS
96+
).exists()
97+
98+
if has_dbt_clean and has_dbt_deps:
99+
self.stdout.write(
100+
f" → Skipping {dataflow.deployment_name} (already has dbt-clean and dbt-deps)"
101+
)
102+
skipped += 1
103+
continue
104+
105+
missing = []
106+
if not has_dbt_clean:
107+
missing.append("dbt-clean")
108+
if not has_dbt_deps:
109+
missing.append("dbt-deps")
110+
111+
if dry_run:
112+
self.stdout.write(
113+
f" [DRY RUN] Would update {dataflow.deployment_name} "
114+
f"(missing: {', '.join(missing)})"
115+
)
116+
updated += 1
117+
continue
118+
119+
try:
120+
self.update_pipeline(org, dataflow)
121+
self.stdout.write(
122+
self.style.SUCCESS(
123+
f" ✓ Updated {dataflow.deployment_name} (added: {', '.join(missing)})"
124+
)
125+
)
126+
updated += 1
127+
except Exception as e:
128+
self.stdout.write(
129+
self.style.ERROR(f" ✗ Failed to update {dataflow.deployment_name}: {str(e)}")
130+
)
131+
logger.error(
132+
f"Failed to backfill auto-managed tasks for {dataflow.deployment_name}: {str(e)}"
133+
)
134+
errors += 1
135+
136+
return updated, skipped, errors
137+
138+
def update_pipeline(self, org: Org, dataflow: OrgDataFlowv1):
139+
"""Re-run update_pipeline to backfill auto-managed tasks"""
140+
pipeline_details = PipelineService.get_pipeline_details(org, dataflow.deployment_id)
141+
142+
transform_tasks = pipeline_details.get("transformTasks", [])
143+
144+
# Convert UUIDs to strings for Pydantic validation
145+
transform_tasks_str = [
146+
{"uuid": str(task["uuid"]), "seq": task["seq"]} for task in transform_tasks
147+
]
148+
149+
update_payload = PrefectDataFlowUpdateSchema3(
150+
name=pipeline_details["name"],
151+
cron=pipeline_details["cron"],
152+
connections=pipeline_details["connections"],
153+
transformTasks=transform_tasks_str,
154+
)
155+
156+
PipelineService.update_pipeline(org, dataflow.deployment_id, update_payload)
157+
158+
# Toggle schedule inactive → active to clear pre-scheduled runs.
159+
# Prefect schedules runs 1-2 days in advance; those won't pick up the
160+
# updated deployment params unless the schedule is reset.
161+
# Only do this for pipelines that have an active schedule.
162+
if dataflow.cron and pipeline_details.get("isScheduleActive", False):
163+
PipelineService.set_pipeline_schedule(org, dataflow.deployment_id, "inactive")
164+
PipelineService.set_pipeline_schedule(org, dataflow.deployment_id, "active")

ddpui/management/commands/migrate_org_queue.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,13 @@ def perform_migration(
268268

269269
for dataflow in dataflows:
270270
try:
271+
# Update queue/workpool for all dataflows first
272+
self.update_dataflow_queue(dataflow, new_queue, final_workpool)
273+
271274
# For scheduled pipelines, update pipeline which will handle git steps automatically
275+
# This runs after queue update so the schedule toggle picks up the new queue
272276
if queue_type == "scheduled_pipeline_queue":
273277
self.update_scheduled_pipeline(dataflow)
274-
275-
# Update queue/workpool for all dataflows
276-
self.update_dataflow_queue(dataflow, new_queue, final_workpool)
277278
self.stdout.write(f" ✓ Updated dataflow: {dataflow.deployment_name}")
278279
updated_count += 1
279280
except Exception as e:
@@ -363,6 +364,17 @@ def update_scheduled_pipeline(self, dataflow: OrgDataFlowv1):
363364
# Update pipeline using PipelineService (handles git steps based on workpool)
364365
PipelineService.update_pipeline(dataflow.org, dataflow.deployment_id, update_payload)
365366

367+
# Toggle schedule inactive → active to clear pre-scheduled runs.
368+
# Prefect schedules runs 1-2 days in advance; those won't pick up
369+
# the updated deployment params unless the schedule is reset.
370+
if dataflow.cron and pipeline_details.get("isScheduleActive", False):
371+
PipelineService.set_pipeline_schedule(
372+
dataflow.org, dataflow.deployment_id, "inactive"
373+
)
374+
PipelineService.set_pipeline_schedule(
375+
dataflow.org, dataflow.deployment_id, "active"
376+
)
377+
366378
logger.info(f"Updated scheduled pipeline {dataflow.deployment_name}")
367379

368380
except Exception as e:

0 commit comments

Comments
 (0)