|
3 | 3 | from typing import TYPE_CHECKING, Any, overload |
4 | 4 |
|
5 | 5 | from prefect.client.schemas.objects import StateType |
| 6 | +from prefect.context import AsyncClientContext |
6 | 7 | from prefect.deployments import run_deployment |
7 | 8 |
|
| 9 | +from infrahub.services.adapters.http.httpx import HttpxAdapter |
8 | 10 | from infrahub.workers.utils import inject_context_parameter |
9 | 11 | from infrahub.workflows.initialization import setup_task_manager |
10 | 12 | from infrahub.workflows.models import WorkflowInfo |
|
19 | 21 |
|
20 | 22 |
|
21 | 23 | class WorkflowWorkerExecution(InfrahubWorkflow): |
| 24 | + # This is required to grab a cached SSLContext from the HttpAdapter. |
| 25 | + # We cannot use the get_http() dependency since it introduces a circular dependency. |
| 26 | + # We could remove this later on by introducing a cached SSLContext outside of this adapter. |
| 27 | + _http_adapter = HttpxAdapter() |
| 28 | + |
22 | 29 | @staticmethod |
23 | 30 | async def initialize(component_is_primary_server: bool) -> None: |
24 | 31 | if component_is_primary_server: |
@@ -79,5 +86,6 @@ async def submit_workflow( |
79 | 86 | parameters = dict(parameters) if parameters is not None else {} |
80 | 87 | inject_context_parameter(func=flow_func, parameters=parameters, context=context) |
81 | 88 |
|
82 | | - flow_run = await run_deployment(name=workflow.full_name, timeout=0, parameters=parameters or {}, tags=tags) # type: ignore[return-value, misc] |
| 89 | + async with AsyncClientContext(httpx_settings={"verify": self._http_adapter.verify_tls()}): |
| 90 | + flow_run = await run_deployment(name=workflow.full_name, timeout=0, parameters=parameters or {}, tags=tags) # type: ignore[return-value, misc] |
83 | 91 | return WorkflowInfo.from_flow(flow_run=flow_run) |
0 commit comments