|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
| 3 | +import asyncio |
3 | 4 | from typing import TYPE_CHECKING, Any |
4 | 5 |
|
5 | 6 | from prefect.client.orchestration import PrefectClient, get_client |
6 | 7 | from prefect.events.schemas.events import Event as PrefectEventModel |
7 | 8 | from pydantic import BaseModel, Field, TypeAdapter |
8 | 9 |
|
9 | 10 | from infrahub.core.constants import GLOBAL_BRANCH_NAME |
| 11 | +from infrahub.exceptions import ServiceUnavailableError |
10 | 12 | from infrahub.log import get_logger |
11 | 13 | from infrahub.utils import get_nested_dict |
12 | 14 |
|
@@ -192,8 +194,17 @@ async def query_events( |
192 | 194 | ) -> PrefectEventResponse: |
193 | 195 | body = {"limit": limit, "filter": filters.model_dump(mode="json", exclude_none=True), "offset": offset} |
194 | 196 |
|
195 | | - response = await client._client.post("/infrahub/events/filter", json=body) |
196 | | - response.raise_for_status() |
| 197 | + # Retry due to https://github.com/PrefectHQ/prefect/issues/16299 |
| 198 | + for _ in range(1, 5): |
| 199 | + response = await client._client.post("/infrahub/events/filter", json=body) |
| 200 | + if response.status_code == 200: |
| 201 | + break |
| 202 | + await asyncio.sleep(0.1) |
| 203 | + |
| 204 | + if response.status_code != 200: |
| 205 | + raise ServiceUnavailableError( |
| 206 | + message=f"Unable to query prefect due to invalid response from the server (status_code={response.status_code})" |
| 207 | + ) |
197 | 208 | data: dict[str, Any] = response.json() |
198 | 209 |
|
199 | 210 | return PrefectEventResponse( |
|
0 commit comments