Skip to content

Commit 82e1800

Browse files
committed
Add /stats and /latest_frame endpoints for device-manager
Adds two new HTTP endpoints behind the ENABLE_STREAM_API flag: - GET /stats: returns aggregated camera_fps, inference_fps, and stream_count across all active pipelines. Reuses existing list_pipelines/get_status IPC -- no new commands needed. - GET /inference_pipelines/{pipeline_id}/latest_frame: returns the most recent frame as a base64-encoded JPEG with metadata. Adds a new LATEST_FRAME IPC command that peeks at the buffer non-destructively.
1 parent af014f7 commit 82e1800

File tree

6 files changed

+541
-0
lines changed

6 files changed

+541
-0
lines changed

inference/core/interfaces/http/http_api.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@
231231
InferencePipelineStatusResponse,
232232
InitializeWebRTCPipelineResponse,
233233
InitializeWebRTCResponse,
234+
LatestFrameResponse,
234235
ListPipelinesResponse,
235236
)
236237
from inference.core.interfaces.stream_manager.api.stream_manager_client import (
@@ -1893,6 +1894,64 @@ async def consume(
18931894
excluded_fields=request.excluded_fields,
18941895
)
18951896

1897+
@app.get(
1898+
"/stats",
1899+
summary="Aggregated pipeline statistics",
1900+
)
1901+
@with_route_exceptions_async
1902+
async def get_stats():
1903+
stream_count = 0
1904+
camera_fps_values = []
1905+
inference_fps_values = []
1906+
if self.stream_manager_client is not None:
1907+
try:
1908+
pipelines_resp = (
1909+
await self.stream_manager_client.list_pipelines()
1910+
)
1911+
pipeline_ids = pipelines_resp.pipelines
1912+
stream_count = len(pipeline_ids)
1913+
for pid in pipeline_ids:
1914+
status_resp = (
1915+
await self.stream_manager_client.get_status(pid)
1916+
)
1917+
report = status_resp.report
1918+
throughput = report.get("inference_throughput", 0.0)
1919+
if throughput and throughput > 0:
1920+
inference_fps_values.append(throughput)
1921+
for src in report.get("sources_metadata", []):
1922+
props = src.get("source_properties") or {}
1923+
fps = props.get("fps")
1924+
if fps and fps > 0:
1925+
camera_fps_values.append(fps)
1926+
except Exception:
1927+
pass
1928+
return {
1929+
"camera_fps": (
1930+
sum(camera_fps_values) / len(camera_fps_values)
1931+
if camera_fps_values
1932+
else None
1933+
),
1934+
"inference_fps": (
1935+
sum(inference_fps_values) / len(inference_fps_values)
1936+
if inference_fps_values
1937+
else None
1938+
),
1939+
"stream_count": stream_count,
1940+
}
1941+
1942+
@app.get(
1943+
"/inference_pipelines/{pipeline_id}/latest_frame",
1944+
response_model=LatestFrameResponse,
1945+
summary="[EXPERIMENTAL] Get latest frame from InferencePipeline",
1946+
)
1947+
@with_route_exceptions_async
1948+
async def latest_frame(
1949+
pipeline_id: str,
1950+
) -> LatestFrameResponse:
1951+
return await self.stream_manager_client.get_latest_frame(
1952+
pipeline_id=pipeline_id
1953+
)
1954+
18961955
class ModelInitState:
18971956
"""Class to track model initialization state."""
18981957

inference/core/interfaces/stream_manager/api/entities.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ class ConsumePipelineResponse(CommandResponse):
3737
frames_metadata: List[FrameMetadata]
3838

3939

40+
class LatestFrameResponse(CommandResponse):
41+
frame_data: Optional[str] = Field(
42+
default=None, description="Base64-encoded JPEG image"
43+
)
44+
frame_id: Optional[int] = Field(default=None)
45+
frame_timestamp: Optional[datetime] = Field(default=None)
46+
source_id: Optional[int] = Field(default=None)
47+
48+
4049
class InitializeWebRTCPipelineResponse(CommandResponse):
4150
sdp: str
4251
type: str

inference/core/interfaces/stream_manager/api/stream_manager_client.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
FrameMetadata,
1414
InferencePipelineStatusResponse,
1515
InitializeWebRTCPipelineResponse,
16+
LatestFrameResponse,
1617
ListPipelinesResponse,
1718
)
1819
from inference.core.interfaces.stream_manager.api.errors import (
@@ -200,6 +201,26 @@ async def consume_pipeline_result(
200201
],
201202
)
202203

204+
async def get_latest_frame(self, pipeline_id: str) -> LatestFrameResponse:
205+
command = {
206+
TYPE_KEY: CommandType.LATEST_FRAME,
207+
PIPELINE_ID_KEY: pipeline_id,
208+
}
209+
response = await self._handle_command(command=command)
210+
status = response[RESPONSE_KEY][STATUS_KEY]
211+
context = CommandContext(
212+
request_id=response.get(REQUEST_ID_KEY),
213+
pipeline_id=response.get(PIPELINE_ID_KEY),
214+
)
215+
return LatestFrameResponse(
216+
status=status,
217+
context=context,
218+
frame_data=response[RESPONSE_KEY].get("frame_data"),
219+
frame_id=response[RESPONSE_KEY].get("frame_id"),
220+
frame_timestamp=response[RESPONSE_KEY].get("frame_timestamp"),
221+
source_id=response[RESPONSE_KEY].get("source_id"),
222+
)
223+
203224
async def _handle_command(self, command: dict) -> dict:
204225
response = await send_command(
205226
host=self._host,

inference/core/interfaces/stream_manager/manager_app/entities.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class CommandType(str, Enum):
4949
TERMINATE = "terminate"
5050
LIST_PIPELINES = "list_pipelines"
5151
CONSUME_RESULT = "consume_result"
52+
LATEST_FRAME = "latest_frame"
5253

5354

5455
class VideoConfiguration(BaseModel):

inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import base64
23
import json
34
import os
45
import signal
@@ -148,6 +149,8 @@ def _handle_command(self, request_id: str, payload: dict) -> None:
148149
return self._get_pipeline_status(request_id=request_id)
149150
if command_type is CommandType.CONSUME_RESULT:
150151
return self._consume_results(request_id=request_id, payload=payload)
152+
if command_type is CommandType.LATEST_FRAME:
153+
return self._handle_latest_frame(request_id=request_id)
151154
raise NotImplementedError(
152155
f"Command type `{command_type}` cannot be handled"
153156
)
@@ -636,6 +639,56 @@ def _consume_results(self, request_id: str, payload: dict) -> None:
636639
error_type=ErrorType.OPERATION_ERROR,
637640
)
638641

642+
def _handle_latest_frame(self, request_id: str) -> None:
643+
try:
644+
if self._buffer_sink is None or self._buffer_sink.empty():
645+
response_payload = {
646+
STATUS_KEY: OperationStatus.SUCCESS,
647+
"frame_data": None,
648+
"frame_id": None,
649+
"frame_timestamp": None,
650+
"source_id": None,
651+
}
652+
self._responses_queue.put((request_id, response_payload))
653+
return None
654+
# Peek at the last item in the buffer (non-destructive)
655+
predictions, frames = self._buffer_sink._buffer[-1]
656+
# Find the last non-None VideoFrame
657+
frame = None
658+
for f in reversed(frames):
659+
if f is not None:
660+
frame = f
661+
break
662+
if frame is None:
663+
response_payload = {
664+
STATUS_KEY: OperationStatus.SUCCESS,
665+
"frame_data": None,
666+
"frame_id": None,
667+
"frame_timestamp": None,
668+
"source_id": None,
669+
}
670+
self._responses_queue.put((request_id, response_payload))
671+
return None
672+
_, jpeg_bytes = cv.imencode(
673+
".jpg", frame.image, [cv.IMWRITE_JPEG_QUALITY, 70]
674+
)
675+
frame_b64 = base64.b64encode(jpeg_bytes.tobytes()).decode("ascii")
676+
response_payload = {
677+
STATUS_KEY: OperationStatus.SUCCESS,
678+
"frame_data": frame_b64,
679+
"frame_id": frame.frame_id,
680+
"frame_timestamp": frame.frame_timestamp.isoformat(),
681+
"source_id": frame.source_id,
682+
}
683+
self._responses_queue.put((request_id, response_payload))
684+
except Exception as error:
685+
self._handle_error(
686+
request_id=request_id,
687+
error=error,
688+
public_error_message="Unexpected error retrieving latest frame.",
689+
error_type=ErrorType.OPERATION_ERROR,
690+
)
691+
639692
def _handle_error(
640693
self,
641694
request_id: str,

0 commit comments

Comments
 (0)