|
4 | 4 | """Platform management functions""" |
5 | 5 |
|
6 | 6 | import logging |
| 7 | +from time import sleep |
7 | 8 |
|
8 | 9 | import kubernetes |
9 | 10 | from kubernetes.client.exceptions import ApiException |
@@ -166,7 +167,7 @@ def _replace_statefulset_with_fetch_retry( |
166 | 167 | raise |
167 | 168 |
|
168 | 169 |
|
169 | | -def _patch_platform(config: UpgradeConfig, replicas: int) -> None: # noqa: C901, PLR0912, PLR0915 |
| 170 | +def _patch_platform(config: UpgradeConfig, replicas: int, revert: bool = False) -> None: # noqa: C901, PLR0912, PLR0915 |
170 | 171 | """Patch Platform by stopping or starting all Platform's pods.""" |
171 | 172 | KubernetesConfigHandler(kube_config=config.kube_config.value) |
172 | 173 |
|
@@ -257,6 +258,9 @@ def _patch_platform(config: UpgradeConfig, replicas: int) -> None: # noqa: C901 |
257 | 258 | for deployment in deployments: |
258 | 259 | _ensure_desired_replicas(core_api=core_api, obj=deployment, replicas=replicas) |
259 | 260 |
|
| 261 | + if revert: |
| 262 | + restart_jobs_after_revert() |
| 263 | + |
260 | 264 | for stateful_set in stateful_sets: |
261 | 265 | _ensure_desired_replicas(core_api=core_api, obj=stateful_set, replicas=replicas) |
262 | 266 |
|
@@ -285,9 +289,9 @@ def stop_platform(config: UpgradeConfig) -> None: |
285 | 289 | _patch_platform(config=config, replicas=0) |
286 | 290 |
|
287 | 291 |
|
288 | | -def restore_platform(config: UpgradeConfig) -> None: |
| 292 | +def restore_platform(config: UpgradeConfig, revert: bool = False) -> None: |
289 | 293 | """Restore Platform by starting all Platform's pods.""" |
290 | | - _patch_platform(config=config, replicas=1) |
| 294 | + _patch_platform(config=config, replicas=1, revert=revert) |
291 | 295 |
|
292 | 296 |
|
293 | 297 | def get_ordered_deployments(apps_api: kubernetes.client.AppsV1Api) -> list[kubernetes.client.V1Deployment]: |
@@ -479,3 +483,55 @@ def are_pods_present(): |
479 | 483 | except PodStillPresent as err: |
480 | 484 | logger.exception(err) |
481 | 485 | raise |
| 486 | + |
| 487 | + |
| 488 | +def restart_jobs_after_revert() -> None: |
| 489 | + """Restarts specific jobs after revert operation.""" |
| 490 | + for job_name in ( |
| 491 | + "impt-etcd-auth", |
| 492 | + "impt-kafka-provisioning", |
| 493 | + ): |
| 494 | + _restart_job(job_name=job_name, namespace=PLATFORM_NAMESPACE) |
| 495 | + |
| 496 | + |
| 497 | +def _restart_job(job_name: str, namespace: str) -> None: |
| 498 | + """Restart a Kubernetes Job by deleting it and making its copy.""" |
| 499 | + logger.info(f"Restarting job: {job_name} in namespace: {namespace}") |
| 500 | + with kubernetes.client.ApiClient() as client: |
| 501 | + batch_api = kubernetes.client.BatchV1Api(client) |
| 502 | + try: |
| 503 | + # Get the existing Job manifest |
| 504 | + job = batch_api.read_namespaced_job(name=job_name, namespace=namespace) |
| 505 | + # Delete the existing Job |
| 506 | + batch_api.delete_namespaced_job( |
| 507 | + name=job_name, |
| 508 | + namespace=namespace, |
| 509 | + body=kubernetes.client.V1DeleteOptions(propagation_policy="Foreground"), |
| 510 | + ) |
| 511 | + # Wait for the Job to be fully deleted |
| 512 | + for _ in range(10): |
| 513 | + try: |
| 514 | + batch_api.read_namespaced_job(name=job_name, namespace=namespace) |
| 515 | + except ApiException as e: |
| 516 | + if e.status == 404: |
| 517 | + break |
| 518 | + sleep(1) |
| 519 | + # Remove fields that should not be set on creation |
| 520 | + job.metadata.creation_timestamp = None |
| 521 | + job.metadata.resource_version = None |
| 522 | + job.metadata.uid = None |
| 523 | + job.status = None |
| 524 | + if job.metadata.labels.get("controller-uid"): |
| 525 | + del job.metadata.labels["controller-uid"] |
| 526 | + if job.metadata.labels.get("batch.kubernetes.io/controller-uid"): |
| 527 | + del job.metadata.labels["batch.kubernetes.io/controller-uid"] |
| 528 | + if job.spec.template.metadata.labels.get("controller-uid"): |
| 529 | + del job.spec.template.metadata.labels["controller-uid"] |
| 530 | + if job.spec.template.metadata.labels.get("batch.kubernetes.io/controller-uid"): |
| 531 | + del job.spec.template.metadata.labels["batch.kubernetes.io/controller-uid"] |
| 532 | + job.spec.selector = None |
| 533 | + batch_api.create_namespaced_job(namespace=namespace, body=job) |
| 534 | + except ApiException as e: |
| 535 | + logger.error(f"Exception when restarting job {job_name}: {e}") |
| 536 | + except Exception as e: |
| 537 | + logger.error(f"Unexpected error when restarting job {job_name}: {e}") |
0 commit comments