Skip to content

Commit b3f35a2

Browse files
committed
adding detector readiness check
1 parent 157ea54 commit b3f35a2

File tree

1 file changed

+62
-5
lines changed

1 file changed

+62
-5
lines changed

src/groundlight/experimental_api.py

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"""
99

1010
import json
11+
import time
1112
from io import BufferedReader, BytesIO
1213
from pathlib import Path
1314
from typing import Any, Dict, List, Optional, Union
@@ -819,18 +820,74 @@ def make_generic_api_request( # noqa: PLR0913 # pylint: disable=too-many-argume
819820
_preload_content=False, # This returns the urllib3 response rather than trying any type of processing
820821
)
821822

823+
def _edge_base_url(self) -> str:
824+
"""Return the scheme+host+port of the configured endpoint, without the /device-api path."""
825+
from urllib.parse import urlparse, urlunparse
826+
827+
parsed = urlparse(self.configuration.host)
828+
return urlunparse((parsed.scheme, parsed.netloc, "", "", "", ""))
829+
822830
def get_edge_config(self) -> EdgeEndpointConfig:
823831
"""Retrieve the active edge endpoint configuration.
824832
825833
Only works when the client is pointed at an edge endpoint
826834
(via GROUNDLIGHT_ENDPOINT or the endpoint constructor arg).
827835
"""
828-
from urllib.parse import urlparse, urlunparse
829-
830-
parsed = urlparse(self.configuration.host)
831-
base_url = urlunparse((parsed.scheme, parsed.netloc, "", "", "", ""))
832-
url = f"{base_url}/edge-config"
836+
url = f"{self._edge_base_url()}/edge-config"
833837
headers = self.get_raw_headers()
834838
response = requests.get(url, headers=headers, verify=self.configuration.verify_ssl)
835839
response.raise_for_status()
836840
return EdgeEndpointConfig.from_payload(response.json())
841+
842+
def get_edge_detector_readiness(self) -> dict[str, bool]:
843+
"""Check which configured detectors have inference pods ready to serve.
844+
845+
Only works when the client is pointed at an edge endpoint.
846+
847+
:return: Dict mapping detector_id to readiness (True/False).
848+
"""
849+
url = f"{self._edge_base_url()}/edge-detector-readiness"
850+
headers = self.get_raw_headers()
851+
response = requests.get(url, headers=headers, verify=self.configuration.verify_ssl)
852+
response.raise_for_status()
853+
return {det_id: info["ready"] for det_id, info in response.json().items()}
854+
855+
def set_edge_config(
856+
self,
857+
config: EdgeEndpointConfig,
858+
mode: str = "REPLACE",
859+
timeout_sec: float = 300,
860+
poll_interval_sec: float = 1,
861+
) -> EdgeEndpointConfig:
862+
"""Send a new edge endpoint configuration and wait until all detectors are ready.
863+
864+
Only works when the client is pointed at an edge endpoint.
865+
866+
:param config: The new configuration to apply.
867+
:param mode: Currently only "REPLACE" is supported.
868+
:param timeout_sec: Max seconds to wait for all detectors to become ready.
869+
:param poll_interval_sec: How often to poll readiness while waiting.
870+
:return: The applied configuration as reported by the edge endpoint.
871+
"""
872+
if mode != "REPLACE":
873+
raise ValueError(f"Unsupported mode: {mode!r}. Currently only 'REPLACE' is supported.")
874+
875+
url = f"{self._edge_base_url()}/edge-config"
876+
headers = self.get_raw_headers()
877+
response = requests.put(
878+
url, json=config.to_payload(), headers=headers, verify=self.configuration.verify_ssl
879+
)
880+
response.raise_for_status()
881+
882+
desired_ids = {d.detector_id for d in config.detectors if d.detector_id}
883+
deadline = time.time() + timeout_sec
884+
while time.time() < deadline:
885+
readiness = self.get_edge_detector_readiness()
886+
if desired_ids and all(readiness.get(did, False) for did in desired_ids):
887+
return self.get_edge_config()
888+
time.sleep(poll_interval_sec)
889+
890+
raise TimeoutError(
891+
f"Edge detectors were not all ready within {timeout_sec}s. "
892+
"The edge endpoint may still be converging."
893+
)

0 commit comments

Comments
 (0)