Skip to content

Commit f0ead4b

Browse files
authored
Migrates Pipelines crawled during the assessment phase (#2778)
## Changes Added PipelineMigrator to help with migration of DLT pipelines ### Linked issues Adds #3098 ### Functionality - [ ] added relevant user documentation - [x] added a new workflow ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [ ] manually tested - [x] added unit tests - [x] added integration tests - [ ] verified on staging environment (screenshot attached)
1 parent 3094fd2 commit f0ead4b

File tree

7 files changed

+488
-0
lines changed

7 files changed

+488
-0
lines changed

src/databricks/labs/ucx/cli.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,5 +886,22 @@ def enable_hms_federation(w: WorkspaceClient, _: Prompts, ctx: WorkspaceContext
886886
ctx.federation_enabler.enable()
887887

888888

889+
@ucx.command
890+
def migrate_dlt_pipelines(
891+
w: WorkspaceClient,
892+
ctx: WorkspaceContext | None = None,
893+
run_as_collection: bool = False,
894+
a: AccountClient | None = None,
895+
) -> None:
896+
"""Migrate DLT pipelines to UC"""
897+
if ctx:
898+
workspace_contexts = [ctx]
899+
else:
900+
workspace_contexts = _get_workspace_contexts(w, a, run_as_collection)
901+
902+
for workspace_context in workspace_contexts:
903+
workspace_context.pipelines_migrator.migrate_pipelines()
904+
905+
889906
if __name__ == "__main__":
890907
ucx()

src/databricks/labs/ucx/contexts/application.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2
1313
from databricks.labs.lsql.backends import SqlBackend
1414

15+
from databricks.labs.ucx.assessment.jobs import JobsCrawler
16+
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler
17+
from databricks.labs.ucx.hive_metastore.pipelines_migrate import PipelinesMigrator
1518
from databricks.labs.ucx.recon.data_comparator import StandardDataComparator
1619
from databricks.labs.ucx.recon.data_profiler import StandardDataProfiler
1720
from databricks.labs.ucx.recon.metadata_retriever import DatabricksTableMetadataRetriever
@@ -266,6 +269,10 @@ def udf_ownership(self) -> UdfOwnership:
266269
def tables_crawler(self) -> TablesCrawler:
267270
return TablesCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)
268271

272+
@cached_property
273+
def jobs_crawler(self) -> JobsCrawler:
274+
return JobsCrawler(self.workspace_client, self.sql_backend, self.inventory_database)
275+
269276
@cached_property
270277
def table_ownership(self) -> TableOwnership:
271278
return TableOwnership(
@@ -337,6 +344,12 @@ def acl_migrator(self):
337344
def table_ownership_grant_loader(self) -> TableOwnershipGrantLoader:
338345
return TableOwnershipGrantLoader(self.tables_crawler, self.default_securable_ownership)
339346

347+
@cached_property
348+
def pipelines_migrator(self) -> PipelinesMigrator:
349+
return PipelinesMigrator(
350+
self.workspace_client, self.pipelines_crawler, self.jobs_crawler, self.config.ucx_catalog
351+
)
352+
340353
@cached_property
341354
def migrate_grants(self) -> MigrateGrants:
342355
# owner grants have to come first
@@ -365,6 +378,10 @@ def mounts_crawler(self) -> MountsCrawler:
365378
self.config.enable_hms_federation,
366379
)
367380

381+
@cached_property
382+
def pipelines_crawler(self) -> PipelinesCrawler:
383+
return PipelinesCrawler(self.workspace_client, self.sql_backend, self.inventory_database)
384+
368385
@cached_property
369386
def azure_service_principal_crawler(self) -> AzureServicePrincipalCrawler:
370387
return AzureServicePrincipalCrawler(self.workspace_client, self.sql_backend, self.inventory_database)
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
import logging
2+
from functools import partial
3+
from typing import BinaryIO
4+
5+
from databricks.labs.blueprint.parallel import Threads
6+
from databricks.sdk import WorkspaceClient
7+
from databricks.sdk.errors import DatabricksError
8+
from databricks.sdk.service.jobs import PipelineTask, Task, JobSettings
9+
10+
from databricks.labs.ucx.assessment.jobs import JobsCrawler
11+
from databricks.labs.ucx.assessment.pipelines import PipelineInfo, PipelinesCrawler
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class PipelinesMigrator:
17+
"""
18+
PipelinesMigrator is responsible for migrating pipelines from HMS to UC
19+
It uses the DLT Migration API to migrate the pipelines and also updates the jobs associated with the pipelines if any
20+
The class also provides an option to skip the pipelines that are already migrated or the ones that are explicitly skipped
21+
22+
:param ws: WorkspaceClient
23+
:param pipelines_crawler: PipelinesCrawler
24+
:param catalog_name: str
25+
:param skip_pipeline_ids: list[str] | None
26+
"""
27+
28+
def __init__(
29+
self,
30+
ws: WorkspaceClient,
31+
pipelines_crawler: PipelinesCrawler,
32+
jobs_crawler: JobsCrawler,
33+
catalog_name: str,
34+
skip_pipeline_ids: list[str] | None = None,
35+
):
36+
self._ws = ws
37+
self._pipeline_crawler = pipelines_crawler
38+
self._jobs_crawler = jobs_crawler
39+
self._catalog_name = catalog_name
40+
self._skip_pipeline_ids = skip_pipeline_ids or []
41+
self._pipeline_job_tasks_mapping: dict[str, list[dict]] = {}
42+
43+
def _populate_pipeline_job_tasks_mapping(self) -> None:
44+
"""
45+
Populates the pipeline_job_tasks_mapping dictionary with the pipeline_id as key and the list of jobs associated with the pipeline
46+
"""
47+
jobs = self._jobs_crawler.snapshot()
48+
49+
for job in jobs:
50+
if not job.job_id:
51+
continue
52+
53+
job_details = self._ws.jobs.get(int(job.job_id))
54+
if not job_details.settings or not job_details.settings.tasks:
55+
continue
56+
57+
for task in job_details.settings.tasks:
58+
if not task.pipeline_task:
59+
continue
60+
pipeline_id = task.pipeline_task.pipeline_id
61+
job_info = {"job_id": job.job_id, "task_key": task.task_key}
62+
if pipeline_id not in self._pipeline_job_tasks_mapping:
63+
self._pipeline_job_tasks_mapping[pipeline_id] = [job_info]
64+
else:
65+
self._pipeline_job_tasks_mapping[pipeline_id].append(job_info)
66+
logger.info(f"Found job:{job.job_id} task:{task.task_key} associated with pipeline {pipeline_id}")
67+
68+
def _get_pipelines_to_migrate(self) -> list[PipelineInfo]:
69+
"""
70+
Returns the list of pipelines in the current workspace
71+
"""
72+
return list(self._pipeline_crawler.snapshot())
73+
74+
def migrate_pipelines(self) -> None:
75+
"""
76+
Migrate the pipelines from HMS to UC. Public method to be called to start the pipeline migration process
77+
"""
78+
self._populate_pipeline_job_tasks_mapping()
79+
self._migrate_pipelines()
80+
81+
def _migrate_pipelines(self) -> list[partial[dict | bool | list | BinaryIO]]:
82+
"""
83+
Create tasks to parallely migrate the pipelines
84+
"""
85+
# get pipelines to migrate
86+
pipelines_to_migrate = self._get_pipelines_to_migrate()
87+
logger.info(f"Found {len(pipelines_to_migrate)} pipelines to migrate")
88+
89+
tasks = []
90+
for pipeline in pipelines_to_migrate:
91+
if pipeline.pipeline_id in self._skip_pipeline_ids:
92+
logger.info(f"Skipping pipeline {pipeline.pipeline_id}")
93+
continue
94+
tasks.append(partial(self._migrate_pipeline, pipeline))
95+
if not tasks:
96+
return []
97+
Threads.strict("migrate pipelines", tasks)
98+
return tasks
99+
100+
def _migrate_pipeline(self, pipeline: PipelineInfo) -> dict | list | BinaryIO | bool:
101+
"""
102+
Private method to clone the pipeline and handle the exceptions
103+
"""
104+
try:
105+
return self._clone_pipeline(pipeline)
106+
except DatabricksError as e:
107+
if "Cloning from Hive Metastore to Unity Catalog is currently not supported" in str(e):
108+
logger.error(f"{e}: Please contact Databricks to enable DLT HMS to UC migration API on this workspace")
109+
return False
110+
logger.error(f"Failed to migrate pipeline {pipeline.pipeline_id}: {e}")
111+
return False
112+
113+
def _clone_pipeline(self, pipeline: PipelineInfo) -> dict | list | BinaryIO:
114+
"""
115+
This method calls the DLT Migration API to clone the pipeline
116+
Stop and rename the old pipeline before cloning the new pipeline
117+
Call the DLT Migration API to clone the pipeline
118+
Update the jobs associated with the pipeline to point to the new pipeline
119+
"""
120+
# Need to get the pipeline again to get the libraries
121+
# else updating name fails with libraries not provided error
122+
get_pipeline = self._ws.pipelines.get(pipeline.pipeline_id)
123+
if get_pipeline.spec:
124+
if get_pipeline.spec.catalog:
125+
# Skip if the pipeline is already migrated to UC
126+
logger.info(f"Pipeline {pipeline.pipeline_id} is already migrated to UC")
127+
return []
128+
129+
# Stop HMS pipeline
130+
self._ws.pipelines.stop(pipeline.pipeline_id)
131+
# Rename old pipeline first
132+
self._ws.pipelines.update(
133+
pipeline.pipeline_id,
134+
name=f"{pipeline.pipeline_name} [OLD]",
135+
clusters=get_pipeline.spec.clusters if get_pipeline.spec.clusters else None,
136+
storage=get_pipeline.spec.storage if get_pipeline.spec.storage else None,
137+
continuous=get_pipeline.spec.continuous if get_pipeline.spec.continuous else None,
138+
deployment=get_pipeline.spec.deployment if get_pipeline.spec.deployment else None,
139+
target=get_pipeline.spec.target if get_pipeline.spec.target else None,
140+
libraries=get_pipeline.spec.libraries if get_pipeline.spec.libraries else None,
141+
)
142+
143+
# Clone pipeline
144+
headers = {
145+
'Accept': 'application/json',
146+
'Content-Type': 'application/json',
147+
}
148+
body = {
149+
'catalog': self._catalog_name,
150+
'clone_mode': 'MIGRATE_TO_UC',
151+
'configuration': {'pipelines.migration.ignoreExplicitPath': 'true'},
152+
'name': f"{pipeline.pipeline_name}",
153+
}
154+
res = self._ws.api_client.do(
155+
'POST', f'/api/2.0/pipelines/{pipeline.pipeline_id}/clone', body=body, headers=headers
156+
)
157+
assert isinstance(res, dict)
158+
if 'pipeline_id' not in res:
159+
logger.warning(f"Failed to clone pipeline {pipeline.pipeline_id}")
160+
return res
161+
162+
# After successful clone, update jobs
163+
# Currently there is no SDK method available to migrate the DLT pipelines
164+
# We are directly using the DLT Migration API in the interim, once the SDK method is available, we can replace this
165+
if pipeline.pipeline_id in self._pipeline_job_tasks_mapping:
166+
for pipeline_job_task_mapping in self._pipeline_job_tasks_mapping[pipeline.pipeline_id]:
167+
self._ws.jobs.update(
168+
pipeline_job_task_mapping['job_id'],
169+
new_settings=JobSettings(
170+
tasks=[
171+
Task(
172+
pipeline_task=PipelineTask(pipeline_id=str(res.get('pipeline_id'))),
173+
task_key=pipeline_job_task_mapping['task_key'],
174+
)
175+
]
176+
),
177+
)
178+
logger.info(f"Updated job {pipeline_job_task_mapping['job_id']} with new pipeline {res['pipeline_id']}")
179+
180+
# TODO:
181+
# Check the error from UI
182+
# BAD_REQUEST: Standard_D4pds_v6 is a Graviton instance and is not compatible with runtime dlt:14.1.21-delta-pipelines-dlt-release-2024.41-rc0-commit-f32d838-image-894c190.
183+
return res

tests/integration/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,12 @@ def make_dashboard(self, **kwargs) -> Dashboard:
580580
self._dashboards.append(dashboard)
581581
return dashboard
582582

583+
def make_notebook(self, **kwargs):
584+
return self._make_notebook(**kwargs)
585+
586+
def make_catalog(self, **kwargs):
587+
return self._make_catalog(**kwargs)
588+
583589
def make_linting_resources(self) -> None:
584590
"""Make resources to lint."""
585591
self.make_job(content="spark.read.parquet('dbfs://mnt/notebook/')")

0 commit comments

Comments
 (0)