Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion app/api/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from fastapi import APIRouter

from app.api.naming import path_prefix, tag
from app.api.routes import health, image_queries, ping
from app.api.routes import edge_config, edge_detector_readiness, health, image_queries, ping

IMAGE_QUERIES = "image-queries"
IMAGE_QUERIES_PREFIX = path_prefix(IMAGE_QUERIES)
Expand All @@ -23,3 +23,19 @@

health_router = APIRouter()
health_router.include_router(health.router, prefix=HEALTH_PREFIX, tags=[HEALTH_TAG])

EDGE_CONFIG = "edge-config"
EDGE_CONFIG_PREFIX = path_prefix(EDGE_CONFIG)
EDGE_CONFIG_TAG = tag(EDGE_CONFIG)

edge_config_router = APIRouter()
edge_config_router.include_router(edge_config.router, prefix=EDGE_CONFIG_PREFIX, tags=[EDGE_CONFIG_TAG])

EDGE_DETECTOR_READINESS = "edge-detector-readiness"
EDGE_DETECTOR_READINESS_PREFIX = path_prefix(EDGE_DETECTOR_READINESS)
EDGE_DETECTOR_READINESS_TAG = tag(EDGE_DETECTOR_READINESS)

edge_detector_readiness_router = APIRouter()
edge_detector_readiness_router.include_router(
edge_detector_readiness.router, prefix=EDGE_DETECTOR_READINESS_PREFIX, tags=[EDGE_DETECTOR_READINESS_TAG]
)
28 changes: 28 additions & 0 deletions app/api/routes/edge_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging

from fastapi import APIRouter, Body, Depends
from groundlight.edge import EdgeEndpointConfig

from app.core.app_state import AppState, get_app_state
from app.core.edge_config_loader import EdgeConfigManager, reconcile_config

logger = logging.getLogger(__name__)

router = APIRouter()


@router.get("")
async def get_edge_config():
"""Returns the currently active edge endpoint configuration."""
return EdgeConfigManager.active().to_payload()


@router.put("")
async def set_edge_config(
body: dict = Body(...),
app_state: AppState = Depends(get_app_state),
):
"""Replaces the active edge endpoint configuration with the provided configuration."""
new_config = EdgeEndpointConfig.from_payload(body)
reconcile_config(new_config, app_state.db_manager)
return new_config.to_payload()
30 changes: 30 additions & 0 deletions app/api/routes/edge_detector_readiness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import logging

from fastapi import APIRouter

from app.core.edge_config_loader import EdgeConfigManager
from app.core.edge_inference import is_edge_inference_ready
from app.core.naming import get_edge_inference_service_name

logger = logging.getLogger(__name__)

router = APIRouter()


@router.get("")
async def get_edge_detector_readiness():
"""Return readiness status for each configured detector.

Checks whether the inference pod (and OODD pod) for each detector
is responding to health checks.
"""
config = EdgeConfigManager.active()
detector_ids = [d.detector_id for d in config.detectors if d.detector_id]

result = {}
for detector_id in detector_ids:
primary_ready = is_edge_inference_ready(get_edge_inference_service_name(detector_id) + ":8000")
oodd_ready = is_edge_inference_ready(get_edge_inference_service_name(detector_id, is_oodd=True) + ":8000")
result[detector_id] = {"ready": primary_ready and oodd_ready}

return result
10 changes: 6 additions & 4 deletions app/api/routes/image_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
get_groundlight_sdk_instance,
refresh_detector_metadata_if_needed,
)
from app.core.edge_inference import get_edge_inference_model_name
from app.core.edge_config_loader import EdgeConfigManager
from app.core.naming import get_edge_inference_model_name
from app.core.utils import create_iq, generate_iq_id, generate_metadata_dict, generate_request_id
from app.escalation_queue.models import SubmitImageQueryParams
from app.escalation_queue.queue_utils import safe_escalate_with_queue_write, write_escalation_to_queue
Expand Down Expand Up @@ -106,7 +107,8 @@ async def post_image_query( # noqa: PLR0913, PLR0915, PLR0912
request_id = request.headers.get("x-request-id") or generate_request_id()

require_human_review = human_review == "ALWAYS"
detector_inference_config = app_state.edge_inference_manager.detector_inference_configs.get(detector_id)
edge_config = EdgeConfigManager.active()
detector_inference_config = EdgeConfigManager.detector_config(edge_config, detector_id)
return_edge_prediction = (
detector_inference_config.always_return_edge_prediction if detector_inference_config is not None else False
)
Expand Down Expand Up @@ -201,7 +203,7 @@ async def post_image_query( # noqa: PLR0913, PLR0915, PLR0912
return image_query

if is_confident_enough: # Audit confident edge predictions at the specified rate
if random.random() < app_state.edge_config.global_config.confident_audit_rate:
if random.random() < edge_config.global_config.confident_audit_rate:
logger.debug(
f"Auditing confident edge prediction with confidence {ml_confidence} for detector {detector_id=}."
)
Expand Down Expand Up @@ -232,7 +234,7 @@ async def post_image_query( # noqa: PLR0913, PLR0915, PLR0912
# Escalate after returning edge prediction if escalation is enabled and we have low confidence.
if not is_confident_enough:
# Only escalate if we haven't escalated on this detector too recently.
if app_state.edge_inference_manager.escalation_cooldown_complete(detector_id=detector_id):
if app_state.edge_inference_manager.escalation_cooldown_complete(detector_id, edge_config):
logger.debug(
f"Escalating to cloud due to low confidence: {ml_confidence} < thresh={confidence_threshold}"
)
Expand Down
7 changes: 1 addition & 6 deletions app/core/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from app.escalation_queue.queue_writer import QueueWriter

from .database import DatabaseManager
from .edge_config_loader import get_detector_inference_configs, load_edge_config
from .edge_inference import EdgeInferenceManager
from .utils import TimestampedCache, safe_call_sdk

Expand Down Expand Up @@ -96,15 +95,11 @@ def get_detector_metadata(detector_id: str, gl: Groundlight) -> Detector:

class AppState:
def __init__(self):
self.edge_config = load_edge_config()
# We only launch a separate OODD inference pod if we are not using the minimal image.
# Pipelines used in the minimal image include OODD inference and confidence adjustment,
# so they do not need to be adjusted separately.
self.separate_oodd_inference = not USE_MINIMAL_IMAGE
detector_inference_configs = get_detector_inference_configs(root_edge_config=self.edge_config)
self.edge_inference_manager = EdgeInferenceManager(
detector_inference_configs=detector_inference_configs, separate_oodd_inference=self.separate_oodd_inference
)
self.edge_inference_manager = EdgeInferenceManager(separate_oodd_inference=self.separate_oodd_inference)
self.db_manager = DatabaseManager()
self.is_ready = False
self.queue_writer = QueueWriter()
Expand Down
40 changes: 30 additions & 10 deletions app/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,13 @@ def create_or_update_inference_deployment_record(self, deployment: Dict[str, str

def _handle_existing_detector(self, deployment: Dict[str, str]) -> None:
"""
Handles the case where a detector with the same ID already exists in the database.
If the API token has changed, it updates the record with the new API token.
Handles the case where a detector with the same model_name already exists in the database.
Always applies the incoming fields so that flags like pending_deletion and deployment_created
are reset when a detector is re-added.
:param deployment: A dictionary containing the deployment details.
"""
logger.debug(f"Model name {deployment['model_name']} already exists in the database.")
detectors = self.get_inference_deployment_records(model_name=deployment["model_name"])
if len(detectors) != 1:
raise AssertionError("Expected exactly one detector to be returned.")

existing_api_token = detectors[0].api_token
if existing_api_token != deployment["api_token"]: # type: ignore
logger.info(f"Updating API token for model name {deployment['model_name']}.")
self.update_inference_deployment_record(model_name=deployment["model_name"], fields_to_update=deployment)
self.update_inference_deployment_record(model_name=deployment["model_name"], fields_to_update=deployment)

def update_inference_deployment_record(self, model_name: str, fields_to_update: Dict[str, Any]):
"""
Expand Down Expand Up @@ -118,6 +112,32 @@ def get_inference_deployment_records(self, **kwargs) -> Sequence[InferenceDeploy
query_results = session.execute(query)
return query_results.scalars().all()

def mark_detector_pending_deletion(self, detector_id: str) -> None:
"""Mark all records for a detector as pending deletion."""
with self.session_maker() as session:
query = select(InferenceDeployment).filter_by(detector_id=detector_id)
existing = session.execute(query).scalars().all()
if existing:
for record in existing:
record.pending_deletion = True
session.commit()
else:
logger.error(f"No DB records found for detector {detector_id} when marking for deletion.")

def get_pending_deletions(self) -> list[str]:
"""Return distinct detector_ids that are pending deletion."""
with self.session_maker() as session:
query = select(InferenceDeployment.detector_id).filter_by(pending_deletion=True).distinct()
return list(session.execute(query).scalars().all())

def delete_inference_deployment_records(self, detector_id: str) -> None:
"""Delete all records for a given detector_id."""
with self.session_maker() as session:
query = select(InferenceDeployment).filter_by(detector_id=detector_id)
for record in session.execute(query).scalars().all():
session.delete(record)
session.commit()

def create_tables(self) -> None:
"""Create the database tables, if they don't already exist."""
with self._engine.begin() as connection:
Expand Down
146 changes: 109 additions & 37 deletions app/core/edge_config_loader.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,128 @@
import logging
import os
from typing import Dict

import yaml
from groundlight.edge import EdgeEndpointConfig, InferenceConfig

from .file_paths import DEFAULT_EDGE_CONFIG_PATH
from .database import DatabaseManager
from .file_paths import ACTIVE_EDGE_CONFIG_PATH, HELM_EDGE_CONFIG_PATH
from .naming import get_edge_inference_model_name

logger = logging.getLogger(__name__)

GROUNDLIGHT_API_TOKEN = os.environ.get("GROUNDLIGHT_API_TOKEN", "")

def load_edge_config() -> EdgeEndpointConfig:
"""
Reads the edge config from the EDGE_CONFIG environment variable if it exists.
If EDGE_CONFIG is not set, reads the default edge config file.
"""
yaml_config = os.environ.get("EDGE_CONFIG", "").strip()
if yaml_config:
return EdgeEndpointConfig.from_yaml(yaml_str=yaml_config)

logger.info(
f"EDGE_CONFIG environment variable not set. Checking default Edge Config path: {DEFAULT_EDGE_CONFIG_PATH}."
)
return EdgeEndpointConfig.from_yaml(filename=DEFAULT_EDGE_CONFIG_PATH)
class EdgeConfigManager:
"""Manages the lifecycle of the edge endpoint configuration: startup loading, saving, and
mtime-cached reading of the active config file on PVC."""

_cached_config: EdgeEndpointConfig = EdgeEndpointConfig()
_cached_mtime: float = 0.0

def get_detector_inference_configs(
root_edge_config: EdgeEndpointConfig,
) -> dict[str, InferenceConfig] | None:
"""
Produces a dict mapping detector IDs to their associated `InferenceConfig`.
Returns None if there are no detectors in the config file.
"""
# Mapping of config names to InferenceConfig objects
edge_inference_configs: dict[str, InferenceConfig] = root_edge_config.edge_inference_configs
@classmethod
def load_startup_config(cls) -> EdgeEndpointConfig:
"""Load edge config at startup.

Sources checked in order:
1. EDGE_CONFIG env var (Docker tests, non-Helm setups)
2. Helm-mounted ConfigMap (always wins when present)
3. Active config on PVC (previous set_edge_config survives restarts)
4. Pydantic defaults
"""
yaml_config = os.environ.get("EDGE_CONFIG", "").strip()
if yaml_config:
return EdgeEndpointConfig.from_yaml(yaml_str=yaml_config)
if os.path.exists(HELM_EDGE_CONFIG_PATH):
return EdgeEndpointConfig.from_yaml(filename=HELM_EDGE_CONFIG_PATH)
if os.path.exists(ACTIVE_EDGE_CONFIG_PATH):
return EdgeEndpointConfig.from_yaml(filename=ACTIVE_EDGE_CONFIG_PATH)
return EdgeEndpointConfig()

@classmethod
def save(cls, config: EdgeEndpointConfig) -> None:
"""Write the active config to disk."""
os.makedirs(os.path.dirname(ACTIVE_EDGE_CONFIG_PATH), exist_ok=True)
with open(ACTIVE_EDGE_CONFIG_PATH, "w") as f:
yaml.dump(config.to_payload(), f, default_flow_style=False)

@classmethod
def active(cls) -> EdgeEndpointConfig:
"""Return the current active config, re-reading from disk only when the file changes."""
try:
mtime = os.path.getmtime(ACTIVE_EDGE_CONFIG_PATH)
except FileNotFoundError:
logger.error("Active config file not found at %s", ACTIVE_EDGE_CONFIG_PATH, exc_info=True)
return cls._cached_config
if mtime != cls._cached_mtime:
cls._cached_mtime = mtime
cls._cached_config = EdgeEndpointConfig.from_yaml(filename=ACTIVE_EDGE_CONFIG_PATH)
return cls._cached_config

@staticmethod
def detector_configs(config: EdgeEndpointConfig) -> dict[str, InferenceConfig]:
"""Return a mapping of detector IDs to their InferenceConfig."""
detectors = [d for d in config.detectors if d.detector_id]
if not detectors:
return {}
return {d.detector_id: config.edge_inference_configs[d.edge_inference_config] for d in detectors}

# Filter out detectors whose IDs are empty strings.
detectors = [detector for detector in root_edge_config.detectors if detector.detector_id != ""]
@staticmethod
def detector_config(config: EdgeEndpointConfig, detector_id: str) -> InferenceConfig | None:
"""Return the InferenceConfig for a single detector, or None."""
for d in config.detectors:
if d.detector_id == detector_id:
return config.edge_inference_configs[d.edge_inference_config]
return None

detector_to_inference_config: dict[str, InferenceConfig] | None = None
if detectors:
detector_to_inference_config = {
detector.detector_id: edge_inference_configs[detector.edge_inference_config] for detector in detectors
}

return detector_to_inference_config
def get_active_detector_ids(db_manager: DatabaseManager) -> set[str]:
"""Return detector IDs from the DB that are not pending deletion."""
all_records = db_manager.get_inference_deployment_records()
return {r.detector_id for r in all_records if not r.pending_deletion}


def get_detector_edge_configs_by_id() -> Dict[str, InferenceConfig]:
def apply_detector_changes(removed: set[str], added: set[str], db_manager: DatabaseManager) -> None:
"""Mark removed detectors for deletion and create DB records for added ones."""
for detector_id in removed:
logger.info(f"Marking detector {detector_id} for deletion")
db_manager.mark_detector_pending_deletion(detector_id)

for detector_id in added:
logger.info(f"Creating deployment record for new detector {detector_id}")
for is_oodd in [False, True]:
model_name = get_edge_inference_model_name(detector_id, is_oodd=is_oodd)
db_manager.create_or_update_inference_deployment_record(
deployment={
"model_name": model_name,
"detector_id": detector_id,
"api_token": GROUNDLIGHT_API_TOKEN,
"deployment_created": False,
"pending_deletion": False,
}
)


def compute_detector_diff(current_detector_ids: set[str], new_config: EdgeEndpointConfig) -> tuple[set[str], set[str]]:
"""Compute which detectors to remove and add.

Returns (removed_ids, added_ids).
"""
desired = {d.detector_id for d in new_config.detectors if d.detector_id}
return current_detector_ids - desired, desired - current_detector_ids


def reconcile_config(new_config: EdgeEndpointConfig, db_manager: DatabaseManager) -> None:
"""
Convenience helper that loads the edge config and returns detector-level inference configs,
defaulting to an empty dict when none are defined.
Compute the diff between a provided config and the DB state. Apply the new config. Write the new
config to disk.
"""
root_config = load_edge_config()
detector_configs = get_detector_inference_configs(root_config)
return detector_configs or {}
current = get_active_detector_ids(db_manager)
removed, added = compute_detector_diff(current, new_config)

apply_detector_changes(removed, added, db_manager)
EdgeConfigManager.save(new_config)
logger.info(
f"Config reconciled: {len(removed)} detector(s) removed, {len(added)} detector(s) added. "
f"Removed detectors: {removed} | Added detectors: {added}"
)
Loading
Loading