Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/infrahub/services/adapters/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
import ssl

import httpx


class InfrahubHTTP:
def verify_tls(self, verify: bool | None = None) -> bool | ssl.SSLContext:
raise NotImplementedError()

async def get(
self,
url: str,
Expand Down
10 changes: 9 additions & 1 deletion backend/infrahub/services/adapters/workflow/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from typing import TYPE_CHECKING, Any, overload

from prefect.client.schemas.objects import StateType
from prefect.context import AsyncClientContext
from prefect.deployments import run_deployment

from infrahub.services.adapters.http.httpx import HttpxAdapter
from infrahub.workers.utils import inject_context_parameter
from infrahub.workflows.initialization import setup_task_manager
from infrahub.workflows.models import WorkflowInfo
Expand All @@ -19,6 +21,11 @@


class WorkflowWorkerExecution(InfrahubWorkflow):
# This is required to grab a cached SSLContext from the HttpAdapter.
# We cannot use the get_http() dependency since it introduces a circular dependency.
# We could remove this later on by introducing a cached SSLContext outside of this adapter.
_http_adapter = HttpxAdapter()

@staticmethod
async def initialize(component_is_primary_server: bool) -> None:
if component_is_primary_server:
Expand Down Expand Up @@ -79,5 +86,6 @@ async def submit_workflow(
parameters = dict(parameters) if parameters is not None else {}
inject_context_parameter(func=flow_func, parameters=parameters, context=context)

flow_run = await run_deployment(name=workflow.full_name, timeout=0, parameters=parameters or {}, tags=tags) # type: ignore[return-value, misc]
async with AsyncClientContext(httpx_settings={"verify": self._http_adapter.verify_tls()}):
flow_run = await run_deployment(name=workflow.full_name, timeout=0, parameters=parameters or {}, tags=tags) # type: ignore[return-value, misc]
return WorkflowInfo.from_flow(flow_run=flow_run)
6 changes: 5 additions & 1 deletion backend/infrahub/workers/infrahub_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from infrahub_sdk.exceptions import Error as SdkError
from prefect import settings as prefect_settings
from prefect.client.schemas.objects import FlowRun
from prefect.context import AsyncClientContext
from prefect.flow_engine import run_flow_async
from prefect.logging.handlers import APILogHandler
from prefect.workers.base import BaseJobConfiguration, BaseVariables, BaseWorker, BaseWorkerResult
Expand All @@ -27,6 +28,7 @@
get_cache,
get_component,
get_database,
get_http,
get_message_bus,
get_workflow,
set_component_type,
Expand Down Expand Up @@ -154,7 +156,9 @@ async def run(
if task_status:
task_status.started(True)

await run_flow_async(flow=flow_func, flow_run=flow_run, parameters=params, return_type="state")
async with AsyncClientContext(httpx_settings={"verify": get_http().verify_tls()}) as ctx:
ctx._httpx_settings = None # Hack to make all child task/flow runs use the same client
await run_flow_async(flow=flow_func, flow_run=flow_run, parameters=params, return_type="state")

Comment on lines +159 to 162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Avoid DI-dependent get_http() and hardening around private attribute

  • get_http() relies on FastAPI Depends; outside DI it may return a Depends object, causing AttributeError on .verify_tls(). Use a class-level HttpxAdapter to fetch the cached SSLContext.
  • Mutating ctx._httpx_settings touches a private attribute; guard with hasattr to avoid breakage across Prefect versions.

Apply within this block:

-        async with AsyncClientContext(httpx_settings={"verify": get_http().verify_tls()}) as ctx:
-            ctx._httpx_settings = None  # Hack to make all child task/flow runs use the same client
+        async with AsyncClientContext(httpx_settings={"verify": self._http_adapter.verify_tls()}) as ctx:
+            if hasattr(ctx, "_httpx_settings"):
+                ctx._httpx_settings = None  # Maintain shared client only if attribute exists
             await run_flow_async(flow=flow_func, flow_run=flow_run, parameters=params, return_type="state")

Add these supporting changes outside the snippet:

# imports
from infrahub.services.adapters.http.httpx import HttpxAdapter
# in class InfrahubWorkerAsync
_http_adapter = HttpxAdapter()  # reuse cached SSLContext across runs

If you prefer to keep get_http(), ensure it returns a concrete InfrahubHTTP in this runtime; otherwise this will fail.


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Show get_http definition and where it's used without DI
rg -n -C2 'def get_http\(' backend/infrahub/workers/dependencies.py || true
rg -n -C3 '\bget_http\s*\(\s*\)' -g '!**/tests/**' backend | sed -n '1,200p'

Length of output: 3418


🏁 Script executed:

#!/bin/bash
set -euo pipefail
head -n 200 backend/infrahub/workers/infrahub_async.py
rg -n 'HttpxAdapter' -n backend/infrahub/workers/infrahub_async.py || true
rg -n '_http_adapter' -n backend/infrahub/workers/infrahub_async.py || true

Length of output: 8203


🏁 Script executed:

#!/bin/bash
set -euo pipefail
head -n 100 backend/infrahub/services/adapters/workflow/worker.py
rg -n "HttpxAdapter" -n backend/infrahub/services/adapters/workflow/worker.py

Length of output: 3706


Replace DI-dependent get_http() with class-level HttpxAdapter and guard private attribute

get_http() uses FastAPI Depends and will return a Depends object outside DI, breaking .verify_tls(). Use a dedicated HttpxAdapter instance and only mutate the private attribute if it exists:

--- a/backend/infrahub/workers/infrahub_async.py
+++ b/backend/infrahub/workers/infrahub_async.py
@@ async def run(self, flow_run: FlowRun, configuration: BaseJobConfiguration, task_status: TaskStatus | None = None,) -> BaseWorkerResult:
-        async with AsyncClientContext(httpx_settings={"verify": get_http().verify_tls()}) as ctx:
-            ctx._httpx_settings = None  # Hack to make all child task/flow runs use the same client
+        async with AsyncClientContext(httpx_settings={"verify": self._http_adapter.verify_tls()}) as ctx:
+            if hasattr(ctx, "_httpx_settings"):
+                ctx._httpx_settings = None  # Maintain shared client only if supported
             await run_flow_async(flow=flow_func, flow_run=flow_run, parameters=params, return_type="state")

Add at the top of the file:

from infrahub.services.adapters.http.httpx import HttpxAdapter

And inside class InfrahubWorkerAsync:

_http_adapter = HttpxAdapter()
🤖 Prompt for AI Agents
In backend/infrahub/workers/infrahub_async.py around lines 159-162, replace the
DI-dependent get_http() call (which returns a Depends outside DI) with a
class-level HttpxAdapter instance and guard mutation of the private attribute:
add "from infrahub.services.adapters.http.httpx import HttpxAdapter" at the top
of the file, declare a class attribute _http_adapter = HttpxAdapter() inside
InfrahubWorkerAsync, use _http_adapter.verify_tls() when creating
AsyncClientContext, and only set ctx._httpx_settings = None if hasattr(ctx,
"_httpx_settings") to avoid unguarded mutation.

return InfrahubWorkerAsyncResult(status_code=0, identifier=str(flow_run.id))

Expand Down
3 changes: 2 additions & 1 deletion backend/infrahub/workflows/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from infrahub.core.constants import GLOBAL_BRANCH_NAME
from infrahub.core.registry import registry
from infrahub.tasks.registry import refresh_branches
from infrahub.workers.dependencies import get_http
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Avoid using FastAPI Depends-based get_http() here; also use an async context for get_client

  • get_http() returns a FastAPI Depends wrapper outside DI, so get_http().verify_tls() may raise at runtime. Use a module-level HttpxAdapter instead to obtain a cached SSLContext.
  • Wrap get_client in an async context to avoid leaking HTTP clients.

As per coding guidelines

Apply this diff here:

- from infrahub.workers.dependencies import get_http
+ from infrahub.services.adapters.http.httpx import HttpxAdapter
-    client = get_client(httpx_settings={"verify": get_http().verify_tls()}, sync_client=False)
+    async with get_client(
+        httpx_settings={"verify": _HTTP_ADAPTER.verify_tls()}, sync_client=False
+    ) as client:

And add this at module scope:

# at top-level
_HTTP_ADAPTER = HttpxAdapter()  # caches SSLContext internally

And adjust the call site:

-    await client.update_flow_run(current_flow_run_id, tags=list(new_tags))
+    await client.update_flow_run(current_flow_run_id, tags=list(new_tags))

Verification script to confirm DI usage of get_http:

Also applies to: 30-30


🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "Definition of get_http:"
rg -n -C2 'def get_http\(' backend/infrahub/workers/dependencies.py || true
echo
echo "Imports of Depends in dependencies module:"
rg -n -C2 '\bDepends\(' backend/infrahub/workers/dependencies.py || true
echo
echo "Call sites of get_http() outside FastAPI routes:"
rg -n -C3 '\bget_http\s*\(\s*\)' -g '!**/tests/**' backend | sed -n '1,200p'

Length of output: 4978


🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "Inspect HttpxAdapter definition:"
rg -n "class HttpxAdapter" -C3 backend/infrahub/services/adapters/http/httpx.py || true
echo
echo "Inspect verify_tls in HttpxAdapter:"
rg -n "def verify_tls" -C3 backend/infrahub/services/adapters/http/httpx.py || true
echo
echo "Context around imports and usage in utils.py:"
rg -n -C5 "get_http" -C5 backend/infrahub/workflows/utils.py || true

Length of output: 1387


Replace get_http with a module‐level HttpxAdapter and wrap the client in an async context

  • At top of backend/infrahub/workflows/utils.py, remove:
    from infrahub.workers.dependencies import get_http
    and add:
    from infrahub.services.adapters.http.httpx import HttpxAdapter
    
    _HTTP_ADAPTER = HttpxAdapter()  # caches SSLContext internally
  • Change the client instantiation:
    - client = get_client(httpx_settings={"verify": get_http().verify_tls()}, sync_client=False)
    + async with get_client(
    +     httpx_settings={"verify": _HTTP_ADAPTER.verify_tls()}, sync_client=False
    + ) as client:
🤖 Prompt for AI Agents
In backend/infrahub/workflows/utils.py around line 12, remove the import of
get_http and instead import HttpxAdapter from
infrahub.services.adapters.http.httpx and create a module-level adapter instance
named _HTTP_ADAPTER = HttpxAdapter() (it caches SSLContext internally); then
update any client instantiation to obtain the client from the adapter and use it
within an async context manager (e.g., use the adapter's async client
acquisition so you do "async with _HTTP_ADAPTER.client() as client" where the
client is needed) to ensure proper async lifecycle and reuse of the adapter.


from .constants import TAG_NAMESPACE, WorkflowTag

Expand All @@ -26,7 +27,7 @@ async def add_tags(
namespace: bool = True,
db_change: bool = False,
) -> None:
client = get_client(sync_client=False)
client = get_client(httpx_settings={"verify": get_http().verify_tls()}, sync_client=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit confusing in the codebase that we use get_client() where we sometimes want an InfrahubClient and in other locations we want a Prefect client. Did you just update this call as a hotspot where you needed this the most? Wondering if we should have a helper function to get the prefect client where we can update these settings in one location. I think we're good to go for this PR but that we should consider the approach for this for all the other calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you just update this call as a hotspot where you needed this the most?

Exactly, add_tags() is called within almost all workflows.
I agree it's confusing to have get_client for both InfrahubClient and PrefectClient...

current_flow_run_id = flow_run.id
current_tags: list[str] = flow_run.tags
branch_tags = (
Expand Down
12 changes: 11 additions & 1 deletion backend/tests/adapters/http.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
from typing import Any
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import httpx

from infrahub.services.adapters.http import InfrahubHTTP

if TYPE_CHECKING:
import ssl

import httpx


class MemoryHTTP(InfrahubHTTP):
def __init__(self) -> None:
self._get_response: dict[str, httpx.Response] = {}
self._post_response: dict[str, httpx.Response] = {}

def verify_tls(self, verify: bool | None = None) -> bool | ssl.SSLContext:
return False

async def get(
self,
url: str,
Expand Down