Skip to content

Commit 9f2501f

Browse files
committed
fix(backend): use http adapter context for task worker prefect client
This avoids overhead due to SSL context initialization within Prefect client. Signed-off-by: Fatih Acar <[email protected]>
1 parent da27bb2 commit 9f2501f

File tree

5 files changed

+19
-4
lines changed

5 files changed

+19
-4
lines changed

backend/infrahub/services/adapters/http/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@
33
from typing import TYPE_CHECKING, Any
44

55
if TYPE_CHECKING:
6+
import ssl
7+
68
import httpx
79

810

911
class InfrahubHTTP:
12+
def verify_tls(self, verify: bool | None = None) -> bool | ssl.SSLContext:
13+
raise NotImplementedError()
14+
1015
async def get(
1116
self,
1217
url: str,

backend/infrahub/services/adapters/workflow/worker.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
from typing import TYPE_CHECKING, Any, overload
44

55
from prefect.client.schemas.objects import StateType
6+
from prefect.context import AsyncClientContext
67
from prefect.deployments import run_deployment
78

9+
from infrahub.services.adapters.http.httpx import HttpxAdapter
810
from infrahub.workers.utils import inject_context_parameter
911
from infrahub.workflows.initialization import setup_task_manager
1012
from infrahub.workflows.models import WorkflowInfo
@@ -19,6 +21,8 @@
1921

2022

2123
class WorkflowWorkerExecution(InfrahubWorkflow):
24+
_http_adapter = HttpxAdapter()
25+
2226
@staticmethod
2327
async def initialize(component_is_primary_server: bool) -> None:
2428
if component_is_primary_server:
@@ -79,5 +83,6 @@ async def submit_workflow(
7983
parameters = dict(parameters) if parameters is not None else {}
8084
inject_context_parameter(func=flow_func, parameters=parameters, context=context)
8185

82-
flow_run = await run_deployment(name=workflow.full_name, timeout=0, parameters=parameters or {}, tags=tags) # type: ignore[return-value, misc]
86+
async with AsyncClientContext(httpx_settings={"verify": self._http_adapter.verify_tls()}):
87+
flow_run = await run_deployment(name=workflow.full_name, timeout=0, parameters=parameters or {}, tags=tags) # type: ignore[return-value, misc]
8388
return WorkflowInfo.from_flow(flow_run=flow_run)

backend/infrahub/workers/dependencies.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from infrahub_sdk.config import Config
77
from infrahub_sdk.types import HTTPMethod
88

9-
from infrahub import config, services
9+
from infrahub import config
1010
from infrahub.components import ComponentType
1111
from infrahub.constants.environment import INSTALLATION_TYPE
1212
from infrahub.database import InfrahubDatabase, get_db

backend/infrahub/workers/infrahub_async.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from infrahub_sdk.exceptions import Error as SdkError
99
from prefect import settings as prefect_settings
1010
from prefect.client.schemas.objects import FlowRun
11+
from prefect.context import AsyncClientContext
1112
from prefect.flow_engine import run_flow_async
1213
from prefect.logging.handlers import APILogHandler
1314
from prefect.workers.base import BaseJobConfiguration, BaseVariables, BaseWorker, BaseWorkerResult
@@ -27,6 +28,7 @@
2728
get_cache,
2829
get_component,
2930
get_database,
31+
get_http,
3032
get_message_bus,
3133
get_workflow,
3234
set_component_type,
@@ -154,7 +156,9 @@ async def run(
154156
if task_status:
155157
task_status.started(True)
156158

157-
await run_flow_async(flow=flow_func, flow_run=flow_run, parameters=params, return_type="state")
159+
async with AsyncClientContext(httpx_settings={"verify": get_http().verify_tls()}) as ctx:
160+
ctx._httpx_settings = None # Hack to make all child task/flow runs use the same client
161+
await run_flow_async(flow=flow_func, flow_run=flow_run, parameters=params, return_type="state")
158162

159163
return InfrahubWorkerAsyncResult(status_code=0, identifier=str(flow_run.id))
160164

backend/infrahub/workflows/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from infrahub.core.constants import GLOBAL_BRANCH_NAME
1010
from infrahub.core.registry import registry
1111
from infrahub.tasks.registry import refresh_branches
12+
from infrahub.workers.dependencies import get_http
1213

1314
from .constants import TAG_NAMESPACE, WorkflowTag
1415

@@ -26,7 +27,7 @@ async def add_tags(
2627
namespace: bool = True,
2728
db_change: bool = False,
2829
) -> None:
29-
client = get_client(sync_client=False)
30+
client = get_client(httpx_settings={"verify": get_http().verify_tls()}, sync_client=False)
3031
current_flow_run_id = flow_run.id
3132
current_tags: list[str] = flow_run.tags
3233
branch_tags = (

0 commit comments

Comments
 (0)