|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
| 3 | +import asyncio |
3 | 4 | from typing import TYPE_CHECKING, Any |
4 | 5 |
|
5 | 6 | from prefect.client.orchestration import PrefectClient, get_client |
6 | 7 | from prefect.events.schemas.events import Event as PrefectEventModel |
7 | 8 | from pydantic import BaseModel, Field, TypeAdapter |
8 | 9 |
|
9 | 10 | from infrahub.core.constants import GLOBAL_BRANCH_NAME |
| 11 | +from infrahub.exceptions import ServiceUnavailableError |
10 | 12 | from infrahub.log import get_logger |
11 | 13 | from infrahub.utils import get_nested_dict |
12 | 14 |
|
@@ -216,8 +218,17 @@ async def query_events( |
216 | 218 | ) -> PrefectEventResponse: |
217 | 219 | body = {"limit": limit, "filter": filters.model_dump(mode="json", exclude_none=True), "offset": offset} |
218 | 220 |
|
219 | | - response = await client._client.post("/infrahub/events/filter", json=body) |
220 | | - response.raise_for_status() |
| 221 | + # Retry due to https://github.com/PrefectHQ/prefect/issues/16299 |
| 222 | + for _ in range(1, 5): |
| 223 | + response = await client._client.post("/infrahub/events/filter", json=body) |
| 224 | + if response.status_code == 200: |
| 225 | + break |
| 226 | + await asyncio.sleep(0.1) |
| 227 | + |
| 228 | + if response.status_code != 200: |
| 229 | + raise ServiceUnavailableError( |
| 230 | + message=f"Unable to query prefect due to invalid response from the server (status_code={response.status_code})" |
| 231 | + ) |
221 | 232 | data: dict[str, Any] = response.json() |
222 | 233 |
|
223 | 234 | return PrefectEventResponse( |
|
0 commit comments