|
15 | 15 |
|
16 | 16 | import yaml |
17 | 17 | from databricks.sdk import WorkspaceClient |
18 | | -from databricks.sdk.errors import NotFound, OperationFailed |
| 18 | +from databricks.sdk.errors import InvalidParameterValue, NotFound, OperationFailed |
19 | 19 | from databricks.sdk.mixins.compute import SemVer |
20 | 20 | from databricks.sdk.service import compute, jobs |
21 | 21 | from databricks.sdk.service.sql import EndpointInfoWarehouseType, SpotInstancePolicy |
@@ -457,24 +457,35 @@ def _create_jobs(self): |
457 | 457 | settings = self._job_settings(step_name, remote_wheel) |
458 | 458 | if self._override_clusters: |
459 | 459 | settings = self._apply_cluster_overrides(settings, self._override_clusters, wheel_runner) |
460 | | - if step_name in self._state.jobs: |
461 | | - job_id = self._state.jobs[step_name] |
462 | | - logger.info(f"Updating configuration for step={step_name} job_id={job_id}") |
463 | | - self._ws.jobs.reset(job_id, jobs.JobSettings(**settings)) |
464 | | - else: |
465 | | - logger.info(f"Creating new job configuration for step={step_name}") |
466 | | - job_id = self._ws.jobs.create(**settings).job_id |
467 | | - self._state.jobs[step_name] = job_id |
| 460 | + self._deploy_workflow(step_name, settings) |
468 | 461 |
|
469 | 462 | for step_name, job_id in self._state.jobs.items(): |
470 | 463 | if step_name not in desired_steps: |
471 | | - logger.info(f"Removing job_id={job_id}, as it is no longer needed") |
472 | | - self._ws.jobs.delete(job_id) |
| 464 | + try: |
| 465 | + logger.info(f"Removing job_id={job_id}, as it is no longer needed") |
| 466 | + self._ws.jobs.delete(job_id) |
| 467 | + except InvalidParameterValue: |
| 468 | + logger.warning(f"step={step_name} does not exist anymore for some reason") |
| 469 | + continue |
473 | 470 |
|
474 | 471 | self._state.save() |
475 | 472 | self._create_readme() |
476 | 473 | self._create_debug(remote_wheel) |
477 | 474 |
|
| 475 | + def _deploy_workflow(self, step_name: str, settings): |
| 476 | + if step_name in self._state.jobs: |
| 477 | + try: |
| 478 | + job_id = self._state.jobs[step_name] |
| 479 | + logger.info(f"Updating configuration for step={step_name} job_id={job_id}") |
| 480 | + return self._ws.jobs.reset(job_id, jobs.JobSettings(**settings)) |
| 481 | + except InvalidParameterValue: |
| 482 | + del self._state.jobs[step_name] |
| 483 | + logger.warning(f"step={step_name} does not exist anymore for some reason") |
| 484 | + return self._deploy_workflow(step_name, settings) |
| 485 | + logger.info(f"Creating new job configuration for step={step_name}") |
| 486 | + job_id = self._ws.jobs.create(**settings).job_id |
| 487 | + self._state.jobs[step_name] = job_id |
| 488 | + |
478 | 489 | def _deployed_steps_pre_v06(self): |
479 | 490 | deployed_steps = {} |
480 | 491 | logger.debug(f"Fetching all jobs to determine already deployed steps for app={self._app}") |
|
0 commit comments