Skip to content

Commit 97c6b08

Browse files
committed
observability: add tenant analytics dashboard
1 parent c8405db commit 97c6b08

22 files changed

+1003
-54
lines changed

src/nimbus/common/metrics.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,11 @@ def register(self, metric: object) -> object:
7676
def render(self) -> str:
7777
return "\n".join(metric.render() for metric in self._metrics.values()) + "\n"
7878

79+
def get_metric(self, name: str) -> object:
80+
return self._metrics[name]
81+
82+
def get(self, name: str) -> object | None:
83+
return self._metrics.get(name)
84+
7985

8086
GLOBAL_REGISTRY = MetricsRegistry()

src/nimbus/common/observability.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
1111
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
1212
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
13-
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
13+
try: # pragma: no cover - optional dependency
14+
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
15+
except ModuleNotFoundError: # pragma: no cover - fallback when extra not installed
16+
HTTPXClientInstrumentor = None # type: ignore[assignment]
1417
from opentelemetry.sdk.resources import Resource
1518
from opentelemetry.sdk.trace import TracerProvider
1619
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
@@ -111,7 +114,7 @@ def configure_tracing(
111114
_tracer_configured = True
112115

113116
global _httpx_instrumented
114-
if not _httpx_instrumented:
117+
if not _httpx_instrumented and HTTPXClientInstrumentor is not None:
115118
HTTPXClientInstrumentor().instrument()
116119
_httpx_instrumented = True
117120

src/nimbus/control_plane/app.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from . import db
5151
from .github import GitHubAppClient
5252
from .jobs import QUEUE_KEY, enqueue_job, lease_job, lease_job_with_fence
53+
from .observability import build_org_overview
5354
from ..common.security import mint_cache_token
5455
from ..common.observability import configure_logging, configure_tracing, instrument_fastapi_app
5556
from ..common.networking import (
@@ -1207,12 +1208,13 @@ async def job_status(
12071208
@app.get("/api/jobs/recent", response_model=list[JobRecord])
12081209
async def recent_jobs(
12091210
limit: int = 50,
1211+
org_id: Optional[int] = None,
12101212
_: str = Depends(verify_agent_token),
12111213
session: AsyncSession = Depends(get_session),
12121214
) -> list[JobRecord]:
12131215
REQUEST_COUNTER.inc()
12141216
limit = max(1, min(limit, 200))
1215-
rows = await db.list_recent_jobs(session, limit=limit)
1217+
rows = await db.list_recent_jobs(session, limit=limit, org_id=org_id)
12161218
return [JobRecord.model_validate(row) for row in rows]
12171219

12181220
@app.get("/api/status", status_code=status.HTTP_200_OK)
@@ -1229,6 +1231,34 @@ async def service_status(
12291231
"jobs_by_status": counts,
12301232
}
12311233

1234+
@app.get("/api/observability/orgs")
1235+
async def observability_orgs_endpoint(
1236+
limit: int = 50,
1237+
hours_back: Optional[int] = None,
1238+
_: str = Depends(verify_admin_token),
1239+
session: AsyncSession = Depends(get_session),
1240+
) -> list[dict]:
1241+
REQUEST_COUNTER.inc()
1242+
limit = max(1, min(limit, 200))
1243+
org_ids = await db.distinct_org_ids(session, hours_back=hours_back)
1244+
if limit and len(org_ids) > limit:
1245+
org_ids = org_ids[:limit]
1246+
status_rows = await db.org_job_status_counts(session, hours_back=hours_back)
1247+
last_activity = await db.org_last_activity(session, hours_back=hours_back)
1248+
agents = await db.org_active_agents(session, hours_back=24)
1249+
failures: dict[int, list[dict]] = {}
1250+
for org_id in org_ids:
1251+
failure_rows = await db.list_recent_failures(session, org_id, limit=5)
1252+
failures[org_id] = failure_rows
1253+
summaries = build_org_overview(
1254+
org_ids,
1255+
status_rows=status_rows,
1256+
last_activity=last_activity,
1257+
active_agents=agents,
1258+
failures=failures,
1259+
)
1260+
return summaries
1261+
12321262
@app.post("/api/agents/token", response_model=AgentTokenResponse)
12331263
async def mint_agent_token_endpoint(
12341264
request_body: AgentTokenMintRequest,

src/nimbus/control_plane/db.py

Lines changed: 135 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,19 @@ def session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
138138
return async_sessionmaker(engine, expire_on_commit=False)
139139

140140

141+
def _normalise_job_rows(rows: Iterable[dict]) -> list[dict]:
142+
normalised: list[dict] = []
143+
for row in rows:
144+
payload = dict(row)
145+
payload["repo_private"] = True if payload.get("repo_private") == "true" else False
146+
for key in ("queued_at", "leased_at", "completed_at", "updated_at"):
147+
value = payload.get(key)
148+
if isinstance(value, datetime):
149+
payload[key] = value.isoformat()
150+
normalised.append(payload)
151+
return normalised
152+
153+
141154
async def record_job_queued(session: AsyncSession, assignment: JobAssignment) -> None:
142155
now = datetime.now(timezone.utc)
143156
repo = assignment.repository
@@ -209,30 +222,143 @@ async def record_status_update(session: AsyncSession, update_payload: JobStatusU
209222

210223

211224
async def list_recent_jobs(
212-
session: AsyncSession, limit: int = 50
225+
session: AsyncSession, limit: int = 50, org_id: Optional[int] = None
213226
) -> Iterable[dict]:
214227
stmt = (
215228
select(jobs_table)
216229
.order_by(jobs_table.c.updated_at.desc())
217230
.limit(limit)
218231
)
232+
if org_id is not None:
233+
stmt = stmt.where(jobs_table.c.org_id == org_id)
219234
result = await session.execute(stmt)
220235
rows = [dict(row) for row in result.mappings()]
221-
for row in rows:
222-
row["repo_private"] = True if row.get("repo_private") == "true" else False
223-
for key in ("queued_at", "leased_at", "completed_at", "updated_at"):
224-
value = row.get(key)
225-
if isinstance(value, datetime):
226-
row[key] = value.isoformat()
227-
return rows
236+
return _normalise_job_rows(rows)
228237

229238

230-
async def job_status_counts(session: AsyncSession) -> dict[str, int]:
239+
async def job_status_counts(
240+
session: AsyncSession,
241+
*,
242+
org_id: Optional[int] = None,
243+
) -> dict[str, int]:
231244
stmt = select(jobs_table.c.status, func.count().label("count")).group_by(jobs_table.c.status)
245+
if org_id is not None:
246+
stmt = stmt.where(jobs_table.c.org_id == org_id)
232247
result = await session.execute(stmt)
233248
return {row.status: row.count for row in result}
234249

235250

251+
async def distinct_org_ids(
252+
session: AsyncSession,
253+
*,
254+
hours_back: Optional[int] = None,
255+
) -> list[int]:
256+
stmt = select(jobs_table.c.org_id).distinct().where(jobs_table.c.org_id.isnot(None))
257+
if hours_back is not None:
258+
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours_back)
259+
stmt = stmt.where(jobs_table.c.updated_at >= cutoff)
260+
result = await session.execute(stmt)
261+
org_ids = [int(row.org_id) for row in result if row.org_id is not None]
262+
return sorted(set(org_ids))
263+
264+
265+
async def org_job_status_counts(
266+
session: AsyncSession,
267+
*,
268+
org_id: Optional[int] = None,
269+
hours_back: Optional[int] = None,
270+
) -> list[dict]:
271+
stmt = select(
272+
jobs_table.c.org_id,
273+
jobs_table.c.status,
274+
func.count().label("count"),
275+
)
276+
if org_id is not None:
277+
stmt = stmt.where(jobs_table.c.org_id == org_id)
278+
if hours_back is not None:
279+
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours_back)
280+
stmt = stmt.where(jobs_table.c.updated_at >= cutoff)
281+
stmt = stmt.group_by(jobs_table.c.org_id, jobs_table.c.status)
282+
result = await session.execute(stmt)
283+
return [
284+
{
285+
"org_id": int(row.org_id),
286+
"status": row.status,
287+
"count": row.count,
288+
}
289+
for row in result
290+
if row.org_id is not None
291+
]
292+
293+
294+
async def org_last_activity(
295+
session: AsyncSession,
296+
*,
297+
org_id: Optional[int] = None,
298+
hours_back: Optional[int] = None,
299+
) -> dict[int, datetime]:
300+
stmt = select(
301+
jobs_table.c.org_id,
302+
func.max(jobs_table.c.updated_at).label("last_updated"),
303+
)
304+
if org_id is not None:
305+
stmt = stmt.where(jobs_table.c.org_id == org_id)
306+
if hours_back is not None:
307+
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours_back)
308+
stmt = stmt.where(jobs_table.c.updated_at >= cutoff)
309+
stmt = stmt.group_by(jobs_table.c.org_id)
310+
result = await session.execute(stmt)
311+
activity: dict[int, datetime] = {}
312+
for row in result:
313+
if row.org_id is None:
314+
continue
315+
activity[int(row.org_id)] = row.last_updated
316+
return activity
317+
318+
319+
async def org_active_agents(
320+
session: AsyncSession,
321+
*,
322+
org_id: Optional[int] = None,
323+
hours_back: int = 24,
324+
) -> dict[int, set[str]]:
325+
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours_back)
326+
stmt = select(jobs_table.c.org_id, jobs_table.c.agent_id).where(
327+
jobs_table.c.agent_id.isnot(None),
328+
jobs_table.c.updated_at >= cutoff,
329+
)
330+
if org_id is not None:
331+
stmt = stmt.where(jobs_table.c.org_id == org_id)
332+
result = await session.execute(stmt)
333+
mapping: dict[int, set[str]] = {}
334+
for row in result:
335+
if row.org_id is None or not row.agent_id:
336+
continue
337+
bucket = mapping.setdefault(int(row.org_id), set())
338+
bucket.add(str(row.agent_id))
339+
return mapping
340+
341+
342+
async def list_recent_failures(
343+
session: AsyncSession,
344+
org_id: int,
345+
*,
346+
limit: int = 10,
347+
) -> list[dict]:
348+
stmt = (
349+
select(jobs_table)
350+
.where(
351+
jobs_table.c.org_id == org_id,
352+
jobs_table.c.status.in_(["failed", "cancelled"]),
353+
)
354+
.order_by(jobs_table.c.updated_at.desc())
355+
.limit(limit)
356+
)
357+
result = await session.execute(stmt)
358+
rows = [dict(row) for row in result.mappings()]
359+
return _normalise_job_rows(rows)
360+
361+
236362
async def get_job(session: AsyncSession, job_id: int) -> Optional[dict]:
237363
stmt = select(jobs_table).where(jobs_table.c.job_id == job_id).limit(1)
238364
result = await session.execute(stmt)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""Tenant observability aggregations for the Nimbus control plane."""
2+
3+
from __future__ import annotations
4+
5+
from __future__ import annotations
6+
7+
from datetime import datetime
8+
from typing import Iterable
9+
10+
11+
def build_org_overview(
12+
org_ids: Iterable[int],
13+
*,
14+
status_rows: list[dict],
15+
last_activity: dict[int, datetime],
16+
active_agents: dict[int, set[str]],
17+
failures: dict[int, list[dict]],
18+
) -> list[dict]:
19+
status_map: dict[int, dict[str, int]] = {}
20+
for row in status_rows:
21+
org_id = int(row["org_id"])
22+
status_map.setdefault(org_id, {})[row["status"]] = row["count"]
23+
24+
summaries: list[dict] = []
25+
for org_id in org_ids:
26+
summaries.append(
27+
{
28+
"org_id": int(org_id),
29+
"status_counts": status_map.get(org_id, {}),
30+
"last_activity": last_activity.get(org_id),
31+
"active_agents": sorted(active_agents.get(org_id, set())),
32+
"recent_failures": failures.get(org_id, []),
33+
}
34+
)
35+
return summaries

tests/conftest.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import sys
2+
from types import ModuleType, SimpleNamespace
3+
4+
5+
def _ensure_module(name: str, **attrs) -> None:
6+
if name in sys.modules:
7+
return
8+
module = ModuleType(name)
9+
for attr, value in attrs.items():
10+
setattr(module, attr, value)
11+
sys.modules[name] = module
12+
13+
14+
class _SigstoreVerify(ModuleType):
15+
def __getattr__(self, name):
16+
raise ModuleNotFoundError(f"optional sigstore.verify dependency not installed: {name}")
17+
18+
19+
_ensure_module(
20+
"boto3",
21+
client=lambda *_args, **_kwargs: SimpleNamespace(),
22+
session=SimpleNamespace(Session=lambda: SimpleNamespace(client=lambda *_a, **_k: SimpleNamespace())),
23+
)
24+
_ensure_module(
25+
"botocore.exceptions",
26+
ClientError=type(
27+
"ClientError",
28+
(Exception,),
29+
{"__init__": lambda self, error_response=None, operation_name=None: Exception.__init__(self)},
30+
),
31+
)
32+
_ensure_module(
33+
"pyroute2",
34+
IPRoute=SimpleNamespace,
35+
NetNS=SimpleNamespace,
36+
NetlinkError=Exception,
37+
netns=SimpleNamespace,
38+
)
39+
sigstore_module = ModuleType("sigstore")
40+
sigstore_verify = _SigstoreVerify("sigstore.verify")
41+
sigstore_module.verify = sigstore_verify
42+
_ensure_module("sigstore", **{"verify": sigstore_verify})
43+
_ensure_module(
44+
"sigstore.verify",
45+
VerificationMaterials=object,
46+
verifier=lambda *args, **kwargs: SimpleNamespace(verify=lambda *a, **kw: None),
47+
)
48+
# SAML optional dependency stubs
49+
saml_module = ModuleType("nimbus.control_plane.saml")
50+
saml_module.SamlSettings = lambda **_kwargs: SimpleNamespace(**_kwargs) # type: ignore[attr-defined]
51+
52+
53+
class _DummySamlAuthenticator:
54+
def __init__(self, *_args, **_kwargs):
55+
pass
56+
57+
def generate_session_token(self, *_args, **_kwargs):
58+
return SimpleNamespace(to_dict=lambda: {"token": "stub"})
59+
60+
61+
saml_module.SamlAuthenticator = _DummySamlAuthenticator # type: ignore[attr-defined]
62+
saml_module.SamlValidationError = Exception # type: ignore[attr-defined]
63+
sys.modules.setdefault("nimbus.control_plane.saml", saml_module)

0 commit comments

Comments
 (0)