Skip to content

Commit 904b452

Browse files
author
Max Azatian
committed
rewrite: using kubernetes' watch
1 parent 611e75d commit 904b452

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

backend/app/services/kubernetes_service.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from fastapi import Depends, Request
1616
from kubernetes import client as k8s_client
1717
from kubernetes import config as k8s_config
18+
from kubernetes import watch
1819
from kubernetes.client.rest import ApiException
1920

2021

@@ -258,20 +259,27 @@ async def get_pod_logs(self, execution_id: str) -> tuple[dict, str]:
258259
async def _wait_for_pod_completion(self, pod_name: str) -> k8s_client.V1Pod:
259260
if not self.v1:
260261
raise KubernetesServiceError(_K8S_CLIENT_NOT_INITIALIZED_MSG)
261-
logger.info(f"Waiting for pod '{pod_name}' to complete...")
262-
for _ in range(self.POD_RETRY_ATTEMPTS):
263-
try:
264-
pod = await asyncio.to_thread(self.v1.read_namespaced_pod, pod_name, self.NAMESPACE)
265-
if pod.status and pod.status.phase in self.POD_SUCCESS_STATES:
266-
logger.info(f"Pod '{pod_name}' reached terminal phase: {pod.status.phase}")
267-
return pod
268-
except ApiException as e:
269-
if e.status == 404:
270-
logger.warning(f"Pod '{pod_name}' not found, retrying...")
271-
else:
272-
logger.error(f"API Error while waiting for pod '{pod_name}': {e.reason}")
273-
await asyncio.sleep(self.POD_RETRY_INTERVAL)
274-
raise KubernetesPodError(f"Timeout waiting for pod '{pod_name}' to complete.")
262+
263+
w = watch.Watch()
264+
return await asyncio.to_thread(self._watch_pod, w, pod_name)
265+
266+
def _watch_pod(self, w: watch.Watch, pod_name: str) -> k8s_client.V1Pod:
267+
for event in w.stream(
268+
self.v1.list_namespaced_pod,
269+
namespace=self.NAMESPACE,
270+
field_selector=f"metadata.name={pod_name}",
271+
timeout_seconds=300
272+
):
273+
pod = event['object']
274+
275+
if event['type'] == 'DELETED':
276+
raise KubernetesPodError(f"Pod '{pod_name}' was deleted")
277+
278+
if pod.status and pod.status.phase in self.POD_SUCCESS_STATES:
279+
logger.info(f"Pod '{pod_name}' completed: {pod.status.phase}")
280+
return pod
281+
282+
raise KubernetesPodError(f"Pod '{pod_name}' watch timeout")
275283

276284
async def _get_container_logs(self, pod_name: str, container_name: str) -> str:
277285
if not self.v1:

0 commit comments

Comments
 (0)