diff --git a/src/gradient/resources/knowledge_bases/indexing_jobs.py b/src/gradient/resources/knowledge_bases/indexing_jobs.py index 95898c2a..1998ead3 100644 --- a/src/gradient/resources/knowledge_bases/indexing_jobs.py +++ b/src/gradient/resources/knowledge_bases/indexing_jobs.py @@ -259,6 +259,60 @@ def update_cancel( cast_to=IndexingJobUpdateCancelResponse, ) + def wait_for_completion( + self, + uuid: str, + *, + timeout: float | None = 300.0, + poll_interval: float = 2.0, + raise_on_failure: bool = True, + ) -> IndexingJobRetrieveResponse: + """ + Poll the indexing job until it reaches a terminal state. + + Args: + uuid: The indexing job uuid to poll. + timeout: Maximum time in seconds to wait (None for no timeout). Defaults to 300s. + poll_interval: Seconds between polls. Defaults to 2.0s. + raise_on_failure: If True, raise a RuntimeError when the job reaches a failed/cancelled/error state. + + Returns: + The final `IndexingJobRetrieveResponse` when the job is in a terminal state. + + Raises: + TimeoutError: If the timeout is exceeded. + RuntimeError: If `raise_on_failure` is True and the job failed/cancelled/errored. + """ + import time + + if not uuid: + raise ValueError(f"Expected a non-empty value for `uuid` but received {uuid!r}") + + start = time.time() + while True: + resp = self.retrieve(uuid) + job = getattr(resp, "job", None) + phase = getattr(job, "phase", None) if job is not None else None + status = getattr(job, "status", None) if job is not None else None + + # Terminal phases + if phase == "BATCH_JOB_PHASE_SUCCEEDED" or status in ("INDEX_JOB_STATUS_COMPLETED",): + return resp + + # Failure phases + if phase in ("BATCH_JOB_PHASE_FAILED", "BATCH_JOB_PHASE_ERROR", "BATCH_JOB_PHASE_CANCELLED") or ( + status in ("INDEX_JOB_STATUS_FAILED",) + ): + if raise_on_failure: + raise RuntimeError(f"Indexing job {uuid} finished with failure phase={phase} status={status}") + return resp + + # Still running or pending + if timeout is not None and (time.time() - start) > timeout: + raise TimeoutError(f"Timed out waiting for indexing job {uuid} after {timeout} seconds") + + time.sleep(poll_interval) + class AsyncIndexingJobsResource(AsyncAPIResource): @cached_property @@ -490,6 +544,61 @@ async def update_cancel( cast_to=IndexingJobUpdateCancelResponse, ) + async def wait_for_completion( + self, + uuid: str, + *, + timeout: float | None = 300.0, + poll_interval: float = 2.0, + raise_on_failure: bool = True, + ) -> IndexingJobRetrieveResponse: + """ + Asynchronously poll the indexing job until it reaches a terminal state. + + Args: + uuid: The indexing job uuid to poll. + timeout: Maximum time in seconds to wait (None for no timeout). Defaults to 300s. + poll_interval: Seconds between polls. Defaults to 2.0s. + raise_on_failure: If True, raise a RuntimeError when the job reaches a failed/cancelled/error state. + + Returns: + The final `IndexingJobRetrieveResponse` when the job is in a terminal state. + + Raises: + TimeoutError: If the timeout is exceeded. + RuntimeError: If `raise_on_failure` is True and the job failed/cancelled/errored. + """ + import time + import asyncio + + if not uuid: + raise ValueError(f"Expected a non-empty value for `uuid` but received {uuid!r}") + + start = time.time() + while True: + resp = await self.retrieve(uuid) + job = getattr(resp, "job", None) + phase = getattr(job, "phase", None) if job is not None else None + status = getattr(job, "status", None) if job is not None else None + + # Terminal phases + if phase == "BATCH_JOB_PHASE_SUCCEEDED" or status in ("INDEX_JOB_STATUS_COMPLETED",): + return resp + + # Failure phases + if phase in ("BATCH_JOB_PHASE_FAILED", "BATCH_JOB_PHASE_ERROR", "BATCH_JOB_PHASE_CANCELLED") or ( + status in ("INDEX_JOB_STATUS_FAILED",) + ): + if raise_on_failure: + raise RuntimeError(f"Indexing job {uuid} finished with failure phase={phase} status={status}") + return resp + + # Still running or pending + if timeout is not None and (time.time() - start) > timeout: + raise TimeoutError(f"Timed out waiting for indexing job {uuid} after {timeout} seconds") + + await asyncio.sleep(poll_interval) + class IndexingJobsResourceWithRawResponse: def __init__(self, indexing_jobs: IndexingJobsResource) -> None: