diff --git a/app/api/api.py b/app/api/api.py index 86c372555..86bf114dd 100644 --- a/app/api/api.py +++ b/app/api/api.py @@ -1,7 +1,7 @@ from fastapi import APIRouter from app.api.naming import path_prefix, tag -from app.api.routes import health, image_queries, ping +from app.api.routes import edge_config, edge_detector_readiness, health, image_queries, ping IMAGE_QUERIES = "image-queries" IMAGE_QUERIES_PREFIX = path_prefix(IMAGE_QUERIES) @@ -23,3 +23,19 @@ health_router = APIRouter() health_router.include_router(health.router, prefix=HEALTH_PREFIX, tags=[HEALTH_TAG]) + +EDGE_CONFIG = "edge-config" +EDGE_CONFIG_PREFIX = path_prefix(EDGE_CONFIG) +EDGE_CONFIG_TAG = tag(EDGE_CONFIG) + +edge_config_router = APIRouter() +edge_config_router.include_router(edge_config.router, prefix=EDGE_CONFIG_PREFIX, tags=[EDGE_CONFIG_TAG]) + +EDGE_DETECTOR_READINESS = "edge-detector-readiness" +EDGE_DETECTOR_READINESS_PREFIX = path_prefix(EDGE_DETECTOR_READINESS) +EDGE_DETECTOR_READINESS_TAG = tag(EDGE_DETECTOR_READINESS) + +edge_detector_readiness_router = APIRouter() +edge_detector_readiness_router.include_router( + edge_detector_readiness.router, prefix=EDGE_DETECTOR_READINESS_PREFIX, tags=[EDGE_DETECTOR_READINESS_TAG] +) diff --git a/app/api/routes/edge_config.py b/app/api/routes/edge_config.py new file mode 100644 index 000000000..9afca86fc --- /dev/null +++ b/app/api/routes/edge_config.py @@ -0,0 +1,28 @@ +import logging + +from fastapi import APIRouter, Body, Depends +from groundlight.edge import EdgeEndpointConfig + +from app.core.app_state import AppState, get_app_state +from app.core.edge_config_loader import EdgeConfigManager, reconcile_config + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.get("") +async def get_edge_config(): + """Returns the currently active edge endpoint configuration.""" + return EdgeConfigManager.active().to_payload() + + +@router.put("") +async def set_edge_config( + body: dict = Body(...), + app_state: AppState = Depends(get_app_state), +): + """Replaces the active edge endpoint configuration with the provided configuration.""" + new_config = EdgeEndpointConfig.from_payload(body) + reconcile_config(new_config, app_state.db_manager) + return new_config.to_payload() diff --git a/app/api/routes/edge_detector_readiness.py b/app/api/routes/edge_detector_readiness.py new file mode 100644 index 000000000..3cb45880d --- /dev/null +++ b/app/api/routes/edge_detector_readiness.py @@ -0,0 +1,30 @@ +import logging + +from fastapi import APIRouter + +from app.core.edge_config_loader import EdgeConfigManager +from app.core.edge_inference import is_edge_inference_ready +from app.core.naming import get_edge_inference_service_name + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.get("") +async def get_edge_detector_readiness(): + """Return readiness status for each configured detector. + + Checks whether the inference pod (and OODD pod) for each detector + is responding to health checks. + """ + config = EdgeConfigManager.active() + detector_ids = [d.detector_id for d in config.detectors if d.detector_id] + + result = {} + for detector_id in detector_ids: + primary_ready = is_edge_inference_ready(get_edge_inference_service_name(detector_id) + ":8000") + oodd_ready = is_edge_inference_ready(get_edge_inference_service_name(detector_id, is_oodd=True) + ":8000") + result[detector_id] = {"ready": primary_ready and oodd_ready} + + return result diff --git a/app/api/routes/image_queries.py b/app/api/routes/image_queries.py index af636d2b5..5302f30fd 100644 --- a/app/api/routes/image_queries.py +++ b/app/api/routes/image_queries.py @@ -13,7 +13,8 @@ get_groundlight_sdk_instance, refresh_detector_metadata_if_needed, ) -from app.core.edge_inference import get_edge_inference_model_name +from app.core.edge_config_loader import EdgeConfigManager +from app.core.naming import get_edge_inference_model_name from app.core.utils import create_iq, generate_iq_id, generate_metadata_dict, generate_request_id from app.escalation_queue.models import SubmitImageQueryParams from app.escalation_queue.queue_utils import safe_escalate_with_queue_write, write_escalation_to_queue @@ -106,7 +107,8 @@ async def post_image_query( # noqa: PLR0913, PLR0915, PLR0912 request_id = request.headers.get("x-request-id") or generate_request_id() require_human_review = human_review == "ALWAYS" - detector_inference_config = app_state.edge_inference_manager.detector_inference_configs.get(detector_id) + edge_config = EdgeConfigManager.active() + detector_inference_config = EdgeConfigManager.detector_config(edge_config, detector_id) return_edge_prediction = ( detector_inference_config.always_return_edge_prediction if detector_inference_config is not None else False ) @@ -201,7 +203,7 @@ async def post_image_query( # noqa: PLR0913, PLR0915, PLR0912 return image_query if is_confident_enough: # Audit confident edge predictions at the specified rate - if random.random() < app_state.edge_config.global_config.confident_audit_rate: + if random.random() < edge_config.global_config.confident_audit_rate: logger.debug( f"Auditing confident edge prediction with confidence {ml_confidence} for detector {detector_id=}." ) @@ -232,7 +234,7 @@ async def post_image_query( # noqa: PLR0913, PLR0915, PLR0912 # Escalate after returning edge prediction if escalation is enabled and we have low confidence. if not is_confident_enough: # Only escalate if we haven't escalated on this detector too recently. - if app_state.edge_inference_manager.escalation_cooldown_complete(detector_id=detector_id): + if app_state.edge_inference_manager.escalation_cooldown_complete(detector_id, edge_config): logger.debug( f"Escalating to cloud due to low confidence: {ml_confidence} < thresh={confidence_threshold}" ) diff --git a/app/core/app_state.py b/app/core/app_state.py index 90292f4c8..1c4b9efd5 100644 --- a/app/core/app_state.py +++ b/app/core/app_state.py @@ -12,7 +12,6 @@ from app.escalation_queue.queue_writer import QueueWriter from .database import DatabaseManager -from .edge_config_loader import get_detector_inference_configs, load_edge_config from .edge_inference import EdgeInferenceManager from .utils import TimestampedCache, safe_call_sdk @@ -96,15 +95,11 @@ def get_detector_metadata(detector_id: str, gl: Groundlight) -> Detector: class AppState: def __init__(self): - self.edge_config = load_edge_config() # We only launch a separate OODD inference pod if we are not using the minimal image. # Pipelines used in the minimal image include OODD inference and confidence adjustment, # so they do not need to be adjusted separately. self.separate_oodd_inference = not USE_MINIMAL_IMAGE - detector_inference_configs = get_detector_inference_configs(root_edge_config=self.edge_config) - self.edge_inference_manager = EdgeInferenceManager( - detector_inference_configs=detector_inference_configs, separate_oodd_inference=self.separate_oodd_inference - ) + self.edge_inference_manager = EdgeInferenceManager(separate_oodd_inference=self.separate_oodd_inference) self.db_manager = DatabaseManager() self.is_ready = False self.queue_writer = QueueWriter() diff --git a/app/core/database.py b/app/core/database.py index f3b60dbbe..30d0cefbd 100644 --- a/app/core/database.py +++ b/app/core/database.py @@ -75,19 +75,13 @@ def create_or_update_inference_deployment_record(self, deployment: Dict[str, str def _handle_existing_detector(self, deployment: Dict[str, str]) -> None: """ - Handles the case where a detector with the same ID already exists in the database. - If the API token has changed, it updates the record with the new API token. + Handles the case where a detector with the same model_name already exists in the database. + Always applies the incoming fields so that flags like pending_deletion and deployment_created + are reset when a detector is re-added. :param deployment: A dictionary containing the deployment details. """ logger.debug(f"Model name {deployment['model_name']} already exists in the database.") - detectors = self.get_inference_deployment_records(model_name=deployment["model_name"]) - if len(detectors) != 1: - raise AssertionError("Expected exactly one detector to be returned.") - - existing_api_token = detectors[0].api_token - if existing_api_token != deployment["api_token"]: # type: ignore - logger.info(f"Updating API token for model name {deployment['model_name']}.") - self.update_inference_deployment_record(model_name=deployment["model_name"], fields_to_update=deployment) + self.update_inference_deployment_record(model_name=deployment["model_name"], fields_to_update=deployment) def update_inference_deployment_record(self, model_name: str, fields_to_update: Dict[str, Any]): """ @@ -118,6 +112,32 @@ def get_inference_deployment_records(self, **kwargs) -> Sequence[InferenceDeploy query_results = session.execute(query) return query_results.scalars().all() + def mark_detector_pending_deletion(self, detector_id: str) -> None: + """Mark all records for a detector as pending deletion.""" + with self.session_maker() as session: + query = select(InferenceDeployment).filter_by(detector_id=detector_id) + existing = session.execute(query).scalars().all() + if existing: + for record in existing: + record.pending_deletion = True + session.commit() + else: + logger.error(f"No DB records found for detector {detector_id} when marking for deletion.") + + def get_pending_deletions(self) -> list[str]: + """Return distinct detector_ids that are pending deletion.""" + with self.session_maker() as session: + query = select(InferenceDeployment.detector_id).filter_by(pending_deletion=True).distinct() + return list(session.execute(query).scalars().all()) + + def delete_inference_deployment_records(self, detector_id: str) -> None: + """Delete all records for a given detector_id.""" + with self.session_maker() as session: + query = select(InferenceDeployment).filter_by(detector_id=detector_id) + for record in session.execute(query).scalars().all(): + session.delete(record) + session.commit() + def create_tables(self) -> None: """Create the database tables, if they don't already exist.""" with self._engine.begin() as connection: diff --git a/app/core/edge_config_loader.py b/app/core/edge_config_loader.py index 5f54e7717..ccf5e7d3d 100644 --- a/app/core/edge_config_loader.py +++ b/app/core/edge_config_loader.py @@ -1,56 +1,128 @@ import logging import os -from typing import Dict +import yaml from groundlight.edge import EdgeEndpointConfig, InferenceConfig -from .file_paths import DEFAULT_EDGE_CONFIG_PATH +from .database import DatabaseManager +from .file_paths import ACTIVE_EDGE_CONFIG_PATH, HELM_EDGE_CONFIG_PATH +from .naming import get_edge_inference_model_name logger = logging.getLogger(__name__) +GROUNDLIGHT_API_TOKEN = os.environ.get("GROUNDLIGHT_API_TOKEN", "") -def load_edge_config() -> EdgeEndpointConfig: - """ - Reads the edge config from the EDGE_CONFIG environment variable if it exists. - If EDGE_CONFIG is not set, reads the default edge config file. - """ - yaml_config = os.environ.get("EDGE_CONFIG", "").strip() - if yaml_config: - return EdgeEndpointConfig.from_yaml(yaml_str=yaml_config) - logger.info( - f"EDGE_CONFIG environment variable not set. Checking default Edge Config path: {DEFAULT_EDGE_CONFIG_PATH}." - ) - return EdgeEndpointConfig.from_yaml(filename=DEFAULT_EDGE_CONFIG_PATH) +class EdgeConfigManager: + """Manages the lifecycle of the edge endpoint configuration: startup loading, saving, and + mtime-cached reading of the active config file on PVC.""" + _cached_config: EdgeEndpointConfig = EdgeEndpointConfig() + _cached_mtime: float = 0.0 -def get_detector_inference_configs( - root_edge_config: EdgeEndpointConfig, -) -> dict[str, InferenceConfig] | None: - """ - Produces a dict mapping detector IDs to their associated `InferenceConfig`. - Returns None if there are no detectors in the config file. - """ - # Mapping of config names to InferenceConfig objects - edge_inference_configs: dict[str, InferenceConfig] = root_edge_config.edge_inference_configs + @classmethod + def load_startup_config(cls) -> EdgeEndpointConfig: + """Load edge config at startup. + + Sources checked in order: + 1. EDGE_CONFIG env var (Docker tests, non-Helm setups) + 2. Helm-mounted ConfigMap (always wins when present) + 3. Active config on PVC (previous set_edge_config survives restarts) + 4. Pydantic defaults + """ + yaml_config = os.environ.get("EDGE_CONFIG", "").strip() + if yaml_config: + return EdgeEndpointConfig.from_yaml(yaml_str=yaml_config) + if os.path.exists(HELM_EDGE_CONFIG_PATH): + return EdgeEndpointConfig.from_yaml(filename=HELM_EDGE_CONFIG_PATH) + if os.path.exists(ACTIVE_EDGE_CONFIG_PATH): + return EdgeEndpointConfig.from_yaml(filename=ACTIVE_EDGE_CONFIG_PATH) + return EdgeEndpointConfig() + + @classmethod + def save(cls, config: EdgeEndpointConfig) -> None: + """Write the active config to disk.""" + os.makedirs(os.path.dirname(ACTIVE_EDGE_CONFIG_PATH), exist_ok=True) + with open(ACTIVE_EDGE_CONFIG_PATH, "w") as f: + yaml.dump(config.to_payload(), f, default_flow_style=False) + + @classmethod + def active(cls) -> EdgeEndpointConfig: + """Return the current active config, re-reading from disk only when the file changes.""" + try: + mtime = os.path.getmtime(ACTIVE_EDGE_CONFIG_PATH) + except FileNotFoundError: + logger.error("Active config file not found at %s", ACTIVE_EDGE_CONFIG_PATH, exc_info=True) + return cls._cached_config + if mtime != cls._cached_mtime: + cls._cached_mtime = mtime + cls._cached_config = EdgeEndpointConfig.from_yaml(filename=ACTIVE_EDGE_CONFIG_PATH) + return cls._cached_config + + @staticmethod + def detector_configs(config: EdgeEndpointConfig) -> dict[str, InferenceConfig]: + """Return a mapping of detector IDs to their InferenceConfig.""" + detectors = [d for d in config.detectors if d.detector_id] + if not detectors: + return {} + return {d.detector_id: config.edge_inference_configs[d.edge_inference_config] for d in detectors} - # Filter out detectors whose IDs are empty strings. - detectors = [detector for detector in root_edge_config.detectors if detector.detector_id != ""] + @staticmethod + def detector_config(config: EdgeEndpointConfig, detector_id: str) -> InferenceConfig | None: + """Return the InferenceConfig for a single detector, or None.""" + for d in config.detectors: + if d.detector_id == detector_id: + return config.edge_inference_configs[d.edge_inference_config] + return None - detector_to_inference_config: dict[str, InferenceConfig] | None = None - if detectors: - detector_to_inference_config = { - detector.detector_id: edge_inference_configs[detector.edge_inference_config] for detector in detectors - } - return detector_to_inference_config +def get_active_detector_ids(db_manager: DatabaseManager) -> set[str]: + """Return detector IDs from the DB that are not pending deletion.""" + all_records = db_manager.get_inference_deployment_records() + return {r.detector_id for r in all_records if not r.pending_deletion} -def get_detector_edge_configs_by_id() -> Dict[str, InferenceConfig]: +def apply_detector_changes(removed: set[str], added: set[str], db_manager: DatabaseManager) -> None: + """Mark removed detectors for deletion and create DB records for added ones.""" + for detector_id in removed: + logger.info(f"Marking detector {detector_id} for deletion") + db_manager.mark_detector_pending_deletion(detector_id) + + for detector_id in added: + logger.info(f"Creating deployment record for new detector {detector_id}") + for is_oodd in [False, True]: + model_name = get_edge_inference_model_name(detector_id, is_oodd=is_oodd) + db_manager.create_or_update_inference_deployment_record( + deployment={ + "model_name": model_name, + "detector_id": detector_id, + "api_token": GROUNDLIGHT_API_TOKEN, + "deployment_created": False, + "pending_deletion": False, + } + ) + + +def compute_detector_diff(current_detector_ids: set[str], new_config: EdgeEndpointConfig) -> tuple[set[str], set[str]]: + """Compute which detectors to remove and add. + + Returns (removed_ids, added_ids). + """ + desired = {d.detector_id for d in new_config.detectors if d.detector_id} + return current_detector_ids - desired, desired - current_detector_ids + + +def reconcile_config(new_config: EdgeEndpointConfig, db_manager: DatabaseManager) -> None: """ - Convenience helper that loads the edge config and returns detector-level inference configs, - defaulting to an empty dict when none are defined. + Compute the diff between a provided config and the DB state. Apply the new config. Write the new + config to disk. """ - root_config = load_edge_config() - detector_configs = get_detector_inference_configs(root_config) - return detector_configs or {} + current = get_active_detector_ids(db_manager) + removed, added = compute_detector_diff(current, new_config) + + apply_detector_changes(removed, added, db_manager) + EdgeConfigManager.save(new_config) + logger.info( + f"Config reconciled: {len(removed)} detector(s) removed, {len(added)} detector(s) added. " + f"Removed detectors: {removed} | Added detectors: {added}" + ) diff --git a/app/core/edge_inference.py b/app/core/edge_inference.py index 92bec941a..cdd7a31a5 100644 --- a/app/core/edge_inference.py +++ b/app/core/edge_inference.py @@ -10,11 +10,18 @@ import yaml from cachetools import TTLCache, cached from fastapi import HTTPException, status -from groundlight.edge import InferenceConfig +from groundlight.edge import EdgeEndpointConfig, InferenceConfig from jinja2 import Template from model import ModeEnum +from app.core.edge_config_loader import EdgeConfigManager from app.core.file_paths import MODEL_REPOSITORY_PATH +from app.core.naming import ( + get_detector_models_dir, + get_edge_inference_service_name, + get_oodd_model_dir, + get_primary_edge_model_dir, +) from app.core.speedmon import SpeedMonitor from app.core.utils import ModelInfoBase, ModelInfoWithBinary, parse_model_info @@ -29,7 +36,7 @@ def is_edge_inference_ready(inference_client_url: str) -> bool: model_ready_url = f"http://{inference_client_url}/health/ready" try: - response = requests.get(model_ready_url) + response = requests.get(model_ready_url, timeout=0.5) return response.status_code == status.HTTP_200_OK except requests.exceptions.RequestException as e: logger.warning(f"Failed to connect to {model_ready_url}: {e}") @@ -229,105 +236,29 @@ class EdgeInferenceManager: def __init__( self, - detector_inference_configs: dict[str, InferenceConfig] | None, verbose: bool = False, separate_oodd_inference: bool = True, ) -> None: - """ - Initializes the edge inference manager. - Args: - detector_inference_configs: Dictionary of detector IDs to InferenceConfig objects - verbose: Whether to print verbose logs from the inference server client - separate_oodd_inference: Whether to run inference separately for the OODD model - """ self.verbose = verbose - self.detector_inference_configs, self.inference_client_urls, self.oodd_inference_client_urls = {}, {}, {} self.speedmon = SpeedMonitor() self.separate_oodd_inference = separate_oodd_inference - - if detector_inference_configs: - self.detector_inference_configs = detector_inference_configs - self.inference_client_urls = { - detector_id: get_edge_inference_service_name(detector_id) + ":8000" - for detector_id in self.detector_inference_configs.keys() - if self.detector_configured_for_edge_inference(detector_id) - } - if separate_oodd_inference: - self.oodd_inference_client_urls = { - detector_id: get_edge_inference_service_name(detector_id, is_oodd=True) + ":8000" - for detector_id in self.detector_inference_configs.keys() - if self.detector_configured_for_edge_inference(detector_id) - } - # Last time we escalated to cloud for each detector - self.last_escalation_times = {detector_id: None for detector_id in self.detector_inference_configs.keys()} - # Minimum time between escalations for each detector - self.min_times_between_escalations = { - detector_id: detector_inference_config.min_time_between_escalations - for detector_id, detector_inference_config in self.detector_inference_configs.items() - } - - def update_inference_config(self, detector_id: str, api_token: str) -> None: - """ - Adds a new detector to the inference config at runtime. This is useful when new - detectors are added to the database and we want to create an inference deployment for them. - Args: - detector_id: ID of the detector on which to run local edge inference - api_token: API token required to fetch inference models - - """ - if detector_id not in self.detector_inference_configs.keys(): - self.detector_inference_configs[detector_id] = InferenceConfig( - name="runtime_detector_config", enabled=True, api_token=api_token - ) - self.inference_client_urls[detector_id] = get_edge_inference_service_name(detector_id) + ":8000" - if self.separate_oodd_inference: - logger.info(f"Performing separate OODD inference, updating OODD inference URL for {detector_id}") - self.oodd_inference_client_urls[detector_id] = ( - get_edge_inference_service_name(detector_id, is_oodd=True) + ":8000" - ) - logger.info(f"Set up edge inference for {detector_id}") - - def detector_configured_for_edge_inference(self, detector_id: str) -> bool: - """ - Checks if the detector is configured to run local inference. - Args: - detector_id: ID of the detector on which to run local edge inference - Returns: - True if the detector is configured to run local inference, False otherwise - """ - if not self.detector_inference_configs: - return False - - return ( - detector_id in self.detector_inference_configs.keys() - and self.detector_inference_configs[detector_id].enabled - ) + self.last_escalation_times: dict[str, float | None] = {} def inference_is_available(self, detector_id: str) -> bool: - """ - Queries the inference server to see if everything is ready to perform inference. - Args: - detector_id: ID of the detector on which to run local edge inference - Returns: - True if edge inference for the specified detector is available, False otherwise - """ - try: - inference_client_url = self.inference_client_urls[detector_id] - oodd_inference_client_url = ( - self.oodd_inference_client_urls[detector_id] if self.separate_oodd_inference else None - ) - except KeyError: - logger.info(f"Failed to look up inference clients for {detector_id}") - return False + """Check whether inference pods for this detector are ready to serve.""" + primary_url = get_edge_inference_service_name(detector_id) + ":8000" + oodd_url = ( + get_edge_inference_service_name(detector_id, is_oodd=True) + ":8000" + if self.separate_oodd_inference + else None + ) - # primary inference client is ready, and if we're doing separate OODD inference, the OODD - # inference client is also ready - inference_clients_are_ready = is_edge_inference_ready(inference_client_url) and ( - not self.separate_oodd_inference or is_edge_inference_ready(oodd_inference_client_url) + ready = is_edge_inference_ready(primary_url) and ( + not self.separate_oodd_inference or is_edge_inference_ready(oodd_url) ) - if not inference_clients_are_ready: + if not ready: logger.debug( - f"Edge inference server and/or OODD inference server is not ready. {inference_client_url=}, {oodd_inference_client_url=}" + f"Edge inference server and/or OODD inference server is not ready. {primary_url=}, {oodd_url=}" ) return False return True @@ -349,9 +280,9 @@ def run_inference(self, detector_id: str, image_bytes: bytes, content_type: str, logger.info(f"Submitting image to edge inference service. {detector_id=}") start_time = time.perf_counter() - primary_url = self.inference_client_urls[detector_id] + primary_url = get_edge_inference_service_name(detector_id) + ":8000" if self.separate_oodd_inference: - oodd_url = self.oodd_inference_client_urls[detector_id] + oodd_url = get_edge_inference_service_name(detector_id, is_oodd=True) + ":8000" with ThreadPoolExecutor(max_workers=2) as executor: f_primary = executor.submit(submit_image_for_inference, primary_url, image_bytes, content_type) f_oodd = executor.submit(submit_image_for_inference, oodd_url, image_bytes, content_type) @@ -380,11 +311,8 @@ def update_models_if_available(self, detector_id: str) -> bool: """ logger.debug(f"Checking if there are new models available for {detector_id}") - api_token = ( - self.detector_inference_configs[detector_id].api_token - if self.detector_configured_for_edge_inference(detector_id) - else None - ) + det_config = EdgeConfigManager.detector_config(EdgeConfigManager.active(), detector_id) + api_token = det_config.api_token if det_config and det_config.enabled else None # fallback to env var if we don't have a token in the config api_token = api_token or os.environ.get("GROUNDLIGHT_API_TOKEN", None) @@ -417,20 +345,24 @@ def update_models_if_available(self, detector_id: str) -> bool: ) return True - def escalation_cooldown_complete(self, detector_id: str) -> bool: + def escalation_cooldown_complete(self, detector_id: str, edge_config: EdgeEndpointConfig) -> bool: """ Check if the time since the last escalation is long enough ago that we should escalate again. The minimum time between escalations for a detector is set by the `min_time_between_escalations` field in the - detector's config. If the field is not set, we use a default of 2 seconds. + detector's config. If the field is not set, we use the default defined in EdgeEndpointConfig. Args: detector_id: ID of the detector to check + edge_config: The active edge endpoint configuration. Returns: True if there hasn't been an escalation on this detector in the last `min_time_between_escalations` seconds, False otherwise. """ - min_time_between_escalations = self.min_times_between_escalations.get(detector_id, 2) - last_escalation_time = self.last_escalation_times[detector_id] + det_config = EdgeConfigManager.detector_config(edge_config, detector_id) + min_time_between_escalations = ( + det_config.min_time_between_escalations if det_config else InferenceConfig().min_time_between_escalations + ) + last_escalation_time = self.last_escalation_times.get(detector_id) if last_escalation_time is None or (time.time() - last_escalation_time) > min_time_between_escalations: self.last_escalation_times[detector_id] = time.time() @@ -731,33 +663,3 @@ def delete_model_version(model_dir: str, model_version: int) -> None: logger.info(f"Deleting model version {model_version} for {model_dir}") if os.path.exists(model_version_dir): shutil.rmtree(model_version_dir) - - -def get_edge_inference_service_name(detector_id: str, is_oodd: bool = False) -> str: - """ - Kubernetes service/deployment names have a strict naming convention. - They have to be alphanumeric, lower cased, and can only contain dashes. - We just use `inferencemodel-{'oodd' or 'primary'}-` as the deployment name and - `inference-service-{'oodd' or 'primary'}-` as the service name. - """ - return f"inference-service-{'oodd' if is_oodd else 'primary'}-{detector_id.replace('_', '-').lower()}" - - -def get_edge_inference_deployment_name(detector_id: str, is_oodd: bool = False) -> str: - return f"inferencemodel-{'oodd' if is_oodd else 'primary'}-{detector_id.replace('_', '-').lower()}" - - -def get_edge_inference_model_name(detector_id: str, is_oodd: bool = False) -> str: - return os.path.join(detector_id, "primary" if not is_oodd else "oodd") - - -def get_detector_models_dir(repository_root: str, detector_id: str) -> str: - return os.path.join(repository_root, detector_id) - - -def get_primary_edge_model_dir(repository_root: str, detector_id: str) -> str: - return os.path.join(get_detector_models_dir(repository_root, detector_id), "primary") - - -def get_oodd_model_dir(repository_root: str, detector_id: str) -> str: - return os.path.join(get_detector_models_dir(repository_root, detector_id), "oodd") diff --git a/app/core/file_paths.py b/app/core/file_paths.py index 455269ff0..fb8254c7f 100644 --- a/app/core/file_paths.py +++ b/app/core/file_paths.py @@ -1,4 +1,5 @@ -DEFAULT_EDGE_CONFIG_PATH = "/etc/groundlight/edge-config/edge-config.yaml" +HELM_EDGE_CONFIG_PATH = "/etc/groundlight/edge-config/edge-config.yaml" +ACTIVE_EDGE_CONFIG_PATH = "/opt/groundlight/edge/config/active-edge-config.yaml" INFERENCE_DEPLOYMENT_TEMPLATE_PATH = "/etc/groundlight/inference-deployment/inference_deployment_template.yaml" # A file with the namespace to be operating within diff --git a/app/core/kubernetes_management.py b/app/core/kubernetes_management.py index c8bdb6cd7..54054a67a 100644 --- a/app/core/kubernetes_management.py +++ b/app/core/kubernetes_management.py @@ -8,13 +8,9 @@ from kubernetes import config from kubernetes.client import V1Deployment -from .edge_inference import ( - get_current_model_version, - get_edge_inference_deployment_name, - get_edge_inference_model_name, - get_edge_inference_service_name, -) +from .edge_inference import get_current_model_version from .file_paths import INFERENCE_DEPLOYMENT_TEMPLATE_PATH, KUBERNETES_NAMESPACE_PATH, MODEL_REPOSITORY_PATH +from .naming import get_edge_inference_deployment_name, get_edge_inference_model_name, get_edge_inference_service_name logger = logging.getLogger(__name__) @@ -220,6 +216,40 @@ def update_inference_deployment(self, detector_id: str, is_oodd: bool = False) - ) return True + def delete_inference_deployment(self, detector_id: str, is_oodd: bool = False) -> None: + """Delete the K8s Deployment and Service for a detector. 404s are ignored.""" + deployment_name = get_edge_inference_deployment_name(detector_id, is_oodd) + service_name = get_edge_inference_service_name(detector_id, is_oodd) + + for name, delete_fn in [ + (deployment_name, lambda n: self._app_kube_client.delete_namespaced_deployment(n, self._target_namespace)), + (service_name, lambda n: self._core_kube_client.delete_namespaced_service(n, self._target_namespace)), + ]: + try: + delete_fn(name) + logger.info(f"Deleted {name} in namespace {self._target_namespace}") + except kube_client.rest.ApiException as e: + if e.status == status.HTTP_404_NOT_FOUND: + logger.debug(f"{name} not found in namespace {self._target_namespace}, skipping deletion") + else: + raise + + def is_inference_deployment_fully_deleted(self, detector_id: str, is_oodd: bool = False) -> bool: + """Return True only when the Deployment is gone AND zero pods remain (RAM/VRAM freed).""" + deployment_name = get_edge_inference_deployment_name(detector_id, is_oodd) + if self.get_inference_deployment(deployment_name) is not None: + return False + + instance_label = f"instance-{get_edge_inference_model_name(detector_id, is_oodd).replace('/', '-')}" + label_str = f"app=inference-server,instance={instance_label}" + pod_list = self._core_kube_client.list_namespaced_pod( + namespace=self._target_namespace, label_selector=label_str + ) + if len(pod_list.items) > 0: + logger.debug(f"Deployment {deployment_name} is gone but {len(pod_list.items)} pod(s) still running") + return False + return True + def is_inference_deployment_rollout_complete(self, deployment_name: str) -> bool: """ Checks if the rollout of the inference deployment for a given deployment name is complete. diff --git a/app/core/models.py b/app/core/models.py index f6589e69c..855bf317b 100644 --- a/app/core/models.py +++ b/app/core/models.py @@ -41,6 +41,12 @@ class InferenceDeployment(Base): nullable=True, comment="Name of the kubernetes deployment for the inference server.", ) + pending_deletion = Column( + Boolean, + default=False, + nullable=False, + comment="When True, the model-updater will delete this detector's K8s resources and then remove this row.", + ) created_at = Column( DateTime, nullable=True, default=datetime.datetime.utcnow, comment="Timestamp of record creation" diff --git a/app/core/naming.py b/app/core/naming.py new file mode 100644 index 000000000..0a1a7364d --- /dev/null +++ b/app/core/naming.py @@ -0,0 +1,28 @@ +"""Naming conventions for edge inference resources (K8s deployments, services, model paths).""" + +import os + + +def get_edge_inference_service_name(detector_id: str, is_oodd: bool = False) -> str: + """Kubernetes service names must be alphanumeric, lower-cased, and can only contain dashes.""" + return f"inference-service-{'oodd' if is_oodd else 'primary'}-{detector_id.replace('_', '-').lower()}" + + +def get_edge_inference_deployment_name(detector_id: str, is_oodd: bool = False) -> str: + return f"inferencemodel-{'oodd' if is_oodd else 'primary'}-{detector_id.replace('_', '-').lower()}" + + +def get_edge_inference_model_name(detector_id: str, is_oodd: bool = False) -> str: + return os.path.join(detector_id, "primary" if not is_oodd else "oodd") + + +def get_detector_models_dir(repository_root: str, detector_id: str) -> str: + return os.path.join(repository_root, detector_id) + + +def get_primary_edge_model_dir(repository_root: str, detector_id: str) -> str: + return os.path.join(get_detector_models_dir(repository_root, detector_id), "primary") + + +def get_oodd_model_dir(repository_root: str, detector_id: str) -> str: + return os.path.join(get_detector_models_dir(repository_root, detector_id), "oodd") diff --git a/app/main.py b/app/main.py index b15e538b5..922825880 100644 --- a/app/main.py +++ b/app/main.py @@ -8,41 +8,25 @@ import logging import os -from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI -from app.api.api import api_router, health_router, ping_router +from app.api.api import api_router, edge_config_router, edge_detector_readiness_router, health_router, ping_router from app.api.naming import API_BASE_PATH from app.core.app_state import AppState +from app.core.edge_config_loader import EdgeConfigManager, reconcile_config LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() -DEPLOY_DETECTOR_LEVEL_INFERENCE = bool(int(os.environ.get("DEPLOY_DETECTOR_LEVEL_INFERENCE", 0))) logging.basicConfig( level=LOG_LEVEL, format="%(asctime)s.%(msecs)03d %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) -# The asyncio executor is too verbose at INFO level, so we set it to WARNING -if LOG_LEVEL == "INFO": - logging.getLogger("apscheduler.executors.default").setLevel(logging.WARNING) app = FastAPI(title="edge-endpoint") app.include_router(router=api_router, prefix=API_BASE_PATH) app.include_router(router=ping_router) app.include_router(router=health_router) - -scheduler = AsyncIOScheduler() - - -def update_inference_config(app_state: AppState) -> None: - """Update the App's edge-inference config by querying the database for new detectors.""" - logging.debug("Querying database for updated inference deployment records...") - detectors = app_state.db_manager.get_inference_deployment_records(deployment_created=True) - if detectors: - for detector_record in detectors: - app_state.edge_inference_manager.update_inference_config( - detector_id=detector_record.detector_id, # type: ignore - api_token=detector_record.api_token, # type: ignore - ) +app.include_router(router=edge_config_router) +app.include_router(router=edge_detector_readiness_router) @app.on_event("startup") @@ -52,12 +36,9 @@ async def startup_event(): app.state.app_state = AppState() app.state.app_state.db_manager.reset_database() - logging.info(f"edge_config={app.state.app_state.edge_config}") - - if DEPLOY_DETECTOR_LEVEL_INFERENCE: - # Add job to periodically update the inference config - scheduler.add_job(update_inference_config, "interval", seconds=30, args=[app.state.app_state]) - scheduler.start() + config = EdgeConfigManager.load_startup_config() + reconcile_config(config, app.state.app_state.db_manager) + logging.info(f"edge_config={config}") app.state.app_state.is_ready = True logging.info("Application is ready to serve requests.") @@ -68,5 +49,3 @@ async def shutdown_event(): """Lifecycle event that is triggered when the application is shutting down.""" app.state.app_state.is_ready = False app.state.app_state.db_manager.shutdown() - if DEPLOY_DETECTOR_LEVEL_INFERENCE: - scheduler.shutdown() diff --git a/app/metrics/system_metrics.py b/app/metrics/system_metrics.py index f0f86f48b..3e5fe511b 100644 --- a/app/metrics/system_metrics.py +++ b/app/metrics/system_metrics.py @@ -9,9 +9,10 @@ from groundlight.edge import InferenceConfig from kubernetes import client, config -from app.core.edge_config_loader import get_detector_edge_configs_by_id -from app.core.edge_inference import get_current_pipeline_config, get_predictor_metadata, get_primary_edge_model_dir +from app.core.edge_config_loader import EdgeConfigManager +from app.core.edge_inference import get_current_pipeline_config, get_predictor_metadata from app.core.file_paths import MODEL_REPOSITORY_PATH +from app.core.naming import get_primary_edge_model_dir logger = logging.getLogger(__name__) @@ -304,7 +305,7 @@ def get_detector_details() -> str: if det_id and _get_annotation(pod, "groundlight.dev/model-name") == f"{det_id}/primary": pods_by_detector.setdefault(det_id, []).append(pod) - detector_edge_configs = get_detector_edge_configs_by_id() + detector_edge_configs = EdgeConfigManager.detector_configs(EdgeConfigManager.active()) detector_details: dict[str, dict] = {} for dep in deployments.items: diff --git a/app/model_updater/update_models.py b/app/model_updater/update_models.py index 79521bf38..c77603e35 100644 --- a/app/model_updater/update_models.py +++ b/app/model_updater/update_models.py @@ -2,17 +2,11 @@ import os import time -from groundlight.edge import EdgeEndpointConfig - from app.core.database import DatabaseManager -from app.core.edge_config_loader import get_detector_inference_configs, load_edge_config -from app.core.edge_inference import ( - EdgeInferenceManager, - delete_old_model_versions, - get_edge_inference_deployment_name, - get_edge_inference_model_name, -) +from app.core.edge_config_loader import EdgeConfigManager +from app.core.edge_inference import EdgeInferenceManager, delete_old_model_versions from app.core.kubernetes_management import InferenceDeploymentManager +from app.core.naming import get_edge_inference_deployment_name, get_edge_inference_model_name LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() logging.basicConfig( @@ -140,7 +134,6 @@ def manage_update_models( edge_inference_manager: EdgeInferenceManager, deployment_manager: InferenceDeploymentManager, db_manager: DatabaseManager, - refresh_rate: float, separate_oodd_inference: bool, ) -> None: """ @@ -154,13 +147,12 @@ def manage_update_models( found in the database. Found detectors will be added to the queue of detectors that need an inference deployment. - NOTE: The periodicity of this task is controlled by the refresh_rate parameter. - It is settable in the edge config file (defaults to 2 minutes). + NOTE: The periodicity of this task is controlled by refresh_rate in the active edge config + file. The value is re-read each cycle so it can be changed at runtime. :param edge_inference_manager: the edge inference manager object. :param deployment_manager: the inference deployment manager object. :param db_manager: the database manager object. - :param refresh_rate: the time interval (in seconds) between model update calls. :param separate_oodd_inference: whether to run inference separately for an OODD model. """ deploy_detector_level_inference = bool(int(os.environ.get("DEPLOY_DETECTOR_LEVEL_INFERENCE", 0))) @@ -169,9 +161,42 @@ def manage_update_models( return while True: + # Process pending deletions before any creation, freeing up resources before creating new pods + pending_deletions = frozenset(db_manager.get_pending_deletions()) + if pending_deletions: + logger.info(f"Processing deletion of {len(pending_deletions)} detector(s): {pending_deletions}") + for detector_id in pending_deletions: + deployment_manager.delete_inference_deployment(detector_id) + if separate_oodd_inference: + deployment_manager.delete_inference_deployment(detector_id, is_oodd=True) + + # Poll until all pods are fully terminated + poll_start = time.time() + while time.time() - poll_start < TEN_MINUTES: + all_gone = all( + deployment_manager.is_inference_deployment_fully_deleted(did) + and ( + not separate_oodd_inference + or deployment_manager.is_inference_deployment_fully_deleted(did, is_oodd=True) + ) + for did in pending_deletions + ) + if all_gone: + break + time.sleep(5) + else: + logger.error(f"Timed out waiting for detector pods to terminate: {pending_deletions}") + + for detector_id in pending_deletions: + db_manager.delete_inference_deployment_records(detector_id) + logger.info(f"Finished deleting {len(pending_deletions)} detector(s)") + + # Check for model updates and apply model updates start = time.time() - logger.debug("Starting model update check for existing inference deployments.") - for detector_id in edge_inference_manager.detector_inference_configs.keys(): + deployed_records = db_manager.get_inference_deployment_records(pending_deletion=False) + deployed_detector_ids = {r.detector_id for r in deployed_records} + logger.debug(f"Starting model update check for {len(deployed_detector_ids)} detector(s).") + for detector_id in deployed_detector_ids: try: logger.debug(f"Checking new models and inference deployments for detector_id: {detector_id}") _check_new_models_and_inference_deployments( @@ -186,32 +211,15 @@ def manage_update_models( logger.info(f"Failed to update model for detector_id: {detector_id}. Error: {e}", exc_info=True) elapsed_s = time.time() - start + refresh_rate = EdgeConfigManager.active().global_config.refresh_rate logger.debug(f"Model update check completed in {elapsed_s:.2f} seconds.") if elapsed_s < refresh_rate: sleep_duration = refresh_rate - elapsed_s logger.debug(f"Sleeping for {sleep_duration:.2f} seconds before next update cycle.") time.sleep(sleep_duration) - # Fetch detector IDs that need to be deployed from the database and add them to the config - logger.debug("Fetching undeployed detector IDs from the database.") - undeployed_inference_deployments = db_manager.get_inference_deployment_records(deployment_created=False) - if undeployed_inference_deployments: - unique_detectors = {record.detector_id: record.api_token for record in undeployed_inference_deployments} - logger.info( - f"Found {len(undeployed_inference_deployments)} undeployed inference deployment(s) for {len(unique_detectors)} detector(s). Updating inference config." - ) - for detector_id, api_token in unique_detectors.items(): - logger.debug(f"Updating inference config for detector_id: {detector_id}") - edge_inference_manager.update_inference_config( - detector_id=detector_id, - api_token=api_token, - ) - else: - logger.debug("No undeployed inference deployments found.") - # Update the status of the inference deployments in the database deployment_records = db_manager.get_inference_deployment_records() - # using a set to only get unique detector_ids deployed_detector_ids = set(record.detector_id for record in deployment_records) for detector_id in deployed_detector_ids: primary_deployment_name = get_edge_inference_deployment_name(detector_id) @@ -235,14 +243,8 @@ def manage_update_models( if __name__ == "__main__": logger.info("Starting model updater.") - edge_config: EdgeEndpointConfig = load_edge_config() - logger.info(f"{edge_config=}") - - refresh_rate = edge_config.global_config.refresh_rate - detector_inference_configs = get_detector_inference_configs(root_edge_config=edge_config) - logger.info("Creating edge inference manager, deployment manager, and database manager.") - edge_inference_manager = EdgeInferenceManager(detector_inference_configs=detector_inference_configs, verbose=True) + edge_inference_manager = EdgeInferenceManager(verbose=True) deployment_manager = InferenceDeploymentManager() # We will delegate creation of database tables to the edge-endpoint container. @@ -253,6 +255,5 @@ def manage_update_models( edge_inference_manager=edge_inference_manager, deployment_manager=deployment_manager, db_manager=db_manager, - refresh_rate=refresh_rate, separate_oodd_inference=not USE_MINIMAL_IMAGE, ) diff --git a/deploy/helm/groundlight-edge-endpoint/files/default-edge-config.yaml b/deploy/helm/groundlight-edge-endpoint/files/default-edge-config.yaml deleted file mode 100644 index b9d7bb369..000000000 --- a/deploy/helm/groundlight-edge-endpoint/files/default-edge-config.yaml +++ /dev/null @@ -1,30 +0,0 @@ -# For configuring detectors on the edge endpoint. See CONFIGURING-DETECTORS.md for more information. - -global_config: # These settings affect the overall behavior of the edge endpoint. - refresh_rate: 60 # How often to attempt to fetch updated ML models (in seconds). If not set, defaults to 60. - -edge_inference_configs: # These configs define detector-specific behavior and can be applied to detectors below. - default: # Return the edge model's prediction if sufficiently confident; otherwise, escalate to the cloud. - enabled: true - always_return_edge_prediction: false - disable_cloud_escalation: false - - edge_answers_with_escalation: # Always return the edge model's predictions, but still escalate to cloud if unconfident. - enabled: true - always_return_edge_prediction: true - disable_cloud_escalation: false - min_time_between_escalations: 2.0 - - no_cloud: # Always return the edge model's prediction and never escalate to the cloud. - enabled: true - always_return_edge_prediction: true - disable_cloud_escalation: true - - disabled: # Don't accept image queries on the edge endpoint for the associated detector. - enabled: false - -detectors: # Each entry here defines the edge configuration for a detector. - # To configure a detector, add an entry for it below with its detector ID and the name of the edge inference config - # you want to use. You can use or modify one of the existing edge inference configs listed above, or define your own. - - detector_id: "" - edge_inference_config: "default" diff --git a/deploy/helm/groundlight-edge-endpoint/templates/_helpers.tpl b/deploy/helm/groundlight-edge-endpoint/templates/_helpers.tpl index 7204c646a..867a4d528 100644 --- a/deploy/helm/groundlight-edge-endpoint/templates/_helpers.tpl +++ b/deploy/helm/groundlight-edge-endpoint/templates/_helpers.tpl @@ -112,15 +112,16 @@ Never {{- end -}} {{/* - Get the edge-config.yaml file. If the user supplies one via `--set-file configFile=...yaml` - then use that. Otherwise, use the default version in the `files/` directory. We define this - as a function so that we can use it as a nonce to restart the pod when the config changes. + Get the edge config. If the user supplies one via `--set-file configFile=...yaml`, + use that. Otherwise, fall back to an empty config; the EdgeEndpointConfig pydantic + model in the python-sdk provides all defaults. This helper is also used as a nonce + to restart the pod when the config changes. */}} {{- define "groundlight-edge-endpoint.edgeConfig" -}} {{- if .Values.configFile }} {{- .Values.configFile }} {{- else }} -{{- .Files.Get "files/default-edge-config.yaml" }} +{} {{- end }} {{- end }} diff --git a/deploy/helm/groundlight-edge-endpoint/templates/edge-deployment.yaml b/deploy/helm/groundlight-edge-endpoint/templates/edge-deployment.yaml index c7d02a9ec..a91ee693f 100644 --- a/deploy/helm/groundlight-edge-endpoint/templates/edge-deployment.yaml +++ b/deploy/helm/groundlight-edge-endpoint/templates/edge-deployment.yaml @@ -148,6 +148,8 @@ spec: subPath: dummy-nginx.conf - name: edge-endpoint-persistent-volume mountPath: /opt/groundlight/edge/sqlite + - name: edge-endpoint-persistent-volume + mountPath: /opt/groundlight/edge/config - name: device-info-volume mountPath: /opt/groundlight/device - name: escalation-queue-volume @@ -184,6 +186,8 @@ spec: mountPath: /opt/groundlight/device - name: edge-endpoint-persistent-volume mountPath: /opt/groundlight/edge/serving/model-repo + - name: edge-endpoint-persistent-volume + mountPath: /opt/groundlight/edge/config - name: escalation-queue-volume mountPath: /opt/groundlight/queue @@ -254,6 +258,8 @@ spec: - name: edge-endpoint-persistent-volume mountPath: /opt/groundlight/edge/serving/model-repo + - name: edge-endpoint-persistent-volume + mountPath: /opt/groundlight/edge/config - name: device-info-volume mountPath: /opt/groundlight/device diff --git a/test/core/test_edge_config_loader.py b/test/core/test_edge_config_loader.py new file mode 100644 index 000000000..6338d543e --- /dev/null +++ b/test/core/test_edge_config_loader.py @@ -0,0 +1,97 @@ +import pytest +from groundlight.edge import DetectorConfig, EdgeEndpointConfig +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app.core.database import DatabaseManager +from app.core.edge_config_loader import apply_detector_changes, compute_detector_diff + + +def _config_with_detectors(*detector_ids: str) -> EdgeEndpointConfig: + """Helper to build an EdgeEndpointConfig with the given detector IDs.""" + config = EdgeEndpointConfig() + for did in detector_ids: + config.detectors.append(DetectorConfig(detector_id=did, edge_inference_config="default")) + return config + + +def test_compute_detector_diff_no_changes(): + removed, added = compute_detector_diff({"det_A", "det_B"}, _config_with_detectors("det_A", "det_B")) + assert removed == set() + assert added == set() + + +def test_compute_detector_diff_all_added(): + removed, added = compute_detector_diff(set(), _config_with_detectors("det_A", "det_B")) + assert removed == set() + assert added == {"det_A", "det_B"} + + +def test_compute_detector_diff_all_removed(): + removed, added = compute_detector_diff({"det_A", "det_B"}, _config_with_detectors()) + assert removed == {"det_A", "det_B"} + assert added == set() + + +def test_compute_detector_diff_mixed(): + removed, added = compute_detector_diff( + {"det_A", "det_B", "det_C"}, + _config_with_detectors("det_B", "det_D"), + ) + assert removed == {"det_A", "det_C"} + assert added == {"det_D"} + + +def test_compute_detector_diff_empty_both(): + removed, added = compute_detector_diff(set(), _config_with_detectors()) + assert removed == set() + assert added == set() + + +def test_compute_detector_diff_filters_empty_detector_ids(): + """Detectors with empty-string IDs in the config should be ignored.""" + removed, added = compute_detector_diff(set(), _config_with_detectors("det_A", "")) + assert added == {"det_A"} + + +@pytest.fixture(scope="module") +def db_manager(): + db_manager = DatabaseManager(verbose=False) + engine = create_engine("sqlite:///:memory:", echo=False) + db_manager._engine = engine + db_manager.session_maker = sessionmaker(bind=engine) + db_manager.create_tables() + yield db_manager + db_manager.shutdown() + + +@pytest.fixture(autouse=True) +def reset_db(db_manager): + db_manager.reset_database() + + +def test_apply_detector_changes_adds_records(db_manager): + apply_detector_changes(removed=set(), added={"det_X", "det_Y"}, db_manager=db_manager) + + records = db_manager.get_inference_deployment_records() + detector_ids = {r.detector_id for r in records} + assert detector_ids == {"det_X", "det_Y"} + for r in records: + assert not r.deployment_created + assert not r.pending_deletion + + +def test_apply_detector_changes_marks_removal(db_manager): + apply_detector_changes(removed=set(), added={"det_A"}, db_manager=db_manager) + apply_detector_changes(removed={"det_A"}, added=set(), db_manager=db_manager) + + records = db_manager.get_inference_deployment_records(detector_id="det_A") + assert len(records) > 0 + for r in records: + assert r.pending_deletion + + +def test_apply_detector_changes_noop_when_empty(db_manager): + apply_detector_changes(removed=set(), added=set(), db_manager=db_manager) + records = db_manager.get_inference_deployment_records() + assert len(records) == 0 diff --git a/test/edge_inference/test_edge_inference_manager.py b/test/edge_inference/test_edge_inference_manager.py index 6e388cbeb..2807a421f 100644 --- a/test/edge_inference/test_edge_inference_manager.py +++ b/test/edge_inference/test_edge_inference_manager.py @@ -4,10 +4,10 @@ import pytest import yaml -from groundlight.edge import InferenceConfig from model import ModeEnum -from app.core.edge_inference import EdgeInferenceManager, get_edge_inference_service_name +from app.core.edge_inference import EdgeInferenceManager +from app.core.naming import get_edge_inference_service_name from app.core.utils import ModelInfoBase, ModelInfoNoBinary, ModelInfoWithBinary @@ -98,12 +98,6 @@ def oodd_model_info_no_binary() -> ModelInfoNoBinary: ) -@pytest.fixture -def detector_inference_config(): - """Build a detector inference config map for manager tests.""" - return {"test_detector": InferenceConfig(name="test_config")} - - class TestEdgeInferenceManager: def test_update_model_with_binary(self, edge_model_info_with_binary, oodd_model_info_with_binary): with tempfile.TemporaryDirectory() as temp_dir: @@ -111,7 +105,7 @@ def test_update_model_with_binary(self, edge_model_info_with_binary, oodd_model_ with mock.patch("app.core.edge_inference.get_object_using_presigned_url") as mock_get_from_s3: mock_get_from_s3.return_value = b"test_model" mock_fetch.return_value = (edge_model_info_with_binary, oodd_model_info_with_binary) - edge_manager = EdgeInferenceManager(detector_inference_configs=None) + edge_manager = EdgeInferenceManager() edge_manager.MODEL_REPOSITORY = temp_dir # type: ignore detector_id = "test_detector" edge_manager.update_models_if_available(detector_id) @@ -145,7 +139,7 @@ def test_update_model_no_binary(self, edge_model_info_no_binary, oodd_model_info with tempfile.TemporaryDirectory() as temp_dir: with mock.patch("app.core.edge_inference.fetch_model_info") as mock_fetch: mock_fetch.return_value = (edge_model_info_no_binary, oodd_model_info_no_binary) - edge_manager = EdgeInferenceManager(detector_inference_configs=None) + edge_manager = EdgeInferenceManager() edge_manager.MODEL_REPOSITORY = temp_dir # type: ignore detector_id = "test_detector" edge_manager.update_models_if_available(detector_id) @@ -169,7 +163,7 @@ def test_update_model_no_binary(self, edge_model_info_no_binary, oodd_model_info assert not os.path.exists(os.path.join(temp_dir, detector_id, "primary", "3")) assert not os.path.exists(os.path.join(temp_dir, detector_id, "oodd", "3")) - def test_run_inference_with_oodd(self, detector_inference_config): + def test_run_inference_with_oodd(self): mock_response = { "multi_predictions": None, "predictions": {"confidences": [0.54], "labels": [0]}, @@ -179,7 +173,7 @@ def test_run_inference_with_oodd(self, detector_inference_config): with mock.patch("app.core.edge_inference.submit_image_for_inference") as mock_submit: mock_submit.return_value = mock_response # separate_oodd_inference is True by default - edge_manager = EdgeInferenceManager(detector_inference_configs=detector_inference_config) + edge_manager = EdgeInferenceManager() edge_manager.run_inference("test_detector", b"test_image", "image/jpeg", mode=ModeEnum.BINARY) primary_inference_client_url = get_edge_inference_service_name("test_detector") + ":8000" oodd_inference_client_url = get_edge_inference_service_name("test_detector", is_oodd=True) + ":8000" @@ -193,7 +187,7 @@ def test_run_inference_with_oodd(self, detector_inference_config): assert primary_call in calls assert oodd_call in calls - def test_run_inference_without_oodd(self, detector_inference_config): + def test_run_inference_without_oodd(self): mock_response = { "multi_predictions": None, "predictions": {"confidences": [0.54], "labels": [0]}, @@ -202,9 +196,7 @@ def test_run_inference_without_oodd(self, detector_inference_config): with mock.patch("app.core.edge_inference.submit_image_for_inference") as mock_submit: mock_submit.return_value = mock_response - edge_manager = EdgeInferenceManager( - detector_inference_configs=detector_inference_config, separate_oodd_inference=False - ) + edge_manager = EdgeInferenceManager(separate_oodd_inference=False) edge_manager.run_inference("test_detector", b"test_image", "image/jpeg", mode=ModeEnum.BINARY) primary_inference_client_url = get_edge_inference_service_name("test_detector") + ":8000" diff --git a/test/integration/test-with-k3s-helm.sh b/test/integration/test-with-k3s-helm.sh index 2700cf708..db0abadcf 100755 --- a/test/integration/test-with-k3s-helm.sh +++ b/test/integration/test-with-k3s-helm.sh @@ -36,9 +36,13 @@ export REFRESH_RATE=60 # not actually different than the default, but we may wan # but first, save the template to a temporary file EDGE_CONFIG_FILE="/tmp/edge-config.$$.yaml" -cp deploy/helm/groundlight-edge-endpoint/files/default-edge-config.yaml $EDGE_CONFIG_FILE -sed -i "s/detector_id: \"\"/detector_id: \"$DETECTOR_ID\"/" $EDGE_CONFIG_FILE -sed -i "s/refresh_rate: 60/refresh_rate: $REFRESH_RATE/" $EDGE_CONFIG_FILE +cat > $EDGE_CONFIG_FILE <