Skip to content

Commit a9ecbac

Browse files
authored
feat: add opentelemetry metrics support for worker backend (#72)
1 parent 37ba83b commit a9ecbac

File tree

8 files changed

+612
-13
lines changed

8 files changed

+612
-13
lines changed

worker/pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@ dependencies = [
3333
"pydantic~=2.9",
3434
"uvicorn~=0.32",
3535
"PyYAML~=6.0",
36+
"opentelemetry-api~=1.32",
37+
"opentelemetry-sdk~=1.32",
38+
"opentelemetry-exporter-otlp~=1.32",
3639
]
3740

3841
[project.optional-dependencies]
3942
dev = ["black", "check-manifest", "ruff"]
40-
test = ["coverage", "pytest", "pytest-cov"]
43+
test = ["coverage", "pytest", "pytest-cov==6.0.0"]
4144

4245
[project.urls]
4346
"Homepage" = "https://netboxlabs.com/"

worker/tests/policy/test_runner.py

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from apscheduler.triggers.date import DateTrigger
99

1010
from worker.backend import Backend
11-
from worker.models import Config, DiodeConfig, Policy, Status
11+
from worker.models import Config, DiodeConfig, Metadata, Policy, Status
1212
from worker.policy.runner import PolicyRunner
1313

1414

@@ -151,7 +151,8 @@ def test_run_ingestion_errors(
151151
mock_backend.run.assert_called_once_with(policy_runner.name, sample_policy)
152152
mock_diode_client.ingest.assert_called_once_with(mock_backend.run.return_value)
153153
assert (
154-
"ERROR ingestion failed for test_policy : ['error1', 'error2']" in caplog.text
154+
"Policy test_policy: Ingestion failed with errors: ['error1', 'error2']"
155+
in caplog.text
155156
)
156157

157158

@@ -186,3 +187,108 @@ def test_stop_policy_runner(policy_runner):
186187
# Ensure scheduler shutdown is called and status is updated
187188
mock_shutdown.assert_called_once()
188189
assert policy_runner.status == Status.FINISHED
190+
191+
192+
def test_metrics_during_policy_lifecycle(
193+
policy_runner, sample_policy, mock_diode_client, mock_backend
194+
):
195+
"""Test that metrics are properly updated during the policy lifecycle."""
196+
# Create mock metrics
197+
mock_active_policies = MagicMock()
198+
mock_policy_executions = MagicMock()
199+
mock_backend_execution_success = MagicMock()
200+
mock_backend_execution_failure = MagicMock()
201+
mock_backend_execution_latency = MagicMock()
202+
203+
# Map of metric names to mock objects
204+
mock_metrics = {
205+
"active_policies": mock_active_policies,
206+
"policy_executions": mock_policy_executions,
207+
"backend_execution_success": mock_backend_execution_success,
208+
"backend_execution_failure": mock_backend_execution_failure,
209+
"backend_execution_latency": mock_backend_execution_latency,
210+
}
211+
212+
policy_runner.name = "test_policy"
213+
policy_runner.metadata = Metadata(
214+
name="my_backend",
215+
app_name="test_app",
216+
app_version="1.0",
217+
)
218+
219+
# Setup mock for get_metric function
220+
def mock_get_metric(name):
221+
return mock_metrics.get(name)
222+
223+
with patch("worker.policy.runner.get_metric", side_effect=mock_get_metric):
224+
225+
mock_diode_client.ingest.return_value.errors = []
226+
227+
policy_runner.run(mock_diode_client, mock_backend, sample_policy)
228+
229+
mock_backend.run.assert_called_once_with(policy_runner.name, sample_policy)
230+
mock_diode_client.ingest.assert_called_once_with(mock_backend.run.return_value)
231+
232+
mock_policy_executions.add.assert_called_once_with(1, {"policy": "test_policy"})
233+
mock_backend_execution_success.add.assert_called_once_with(
234+
1,
235+
{
236+
"policy": "test_policy",
237+
"backend": "my_backend",
238+
"app_name": "test_app",
239+
"app_version": "1.0",
240+
},
241+
)
242+
243+
# Test stop - should decrement active_policies
244+
with patch.object(policy_runner.scheduler, "shutdown") as mock_shutdown:
245+
policy_runner.stop()
246+
mock_shutdown.assert_called_once()
247+
mock_active_policies.add.assert_called_with(-1, {"policy": "test_policy"})
248+
249+
250+
def test_metrics_during_failed_discovery(
251+
policy_runner, sample_policy, mock_diode_client, mock_backend
252+
):
253+
"""Test that metrics are properly updated when discovery fails."""
254+
mock_backend_execution_failure = MagicMock()
255+
mock_backend_execution_latency = MagicMock()
256+
257+
mock_metrics = {
258+
"backend_execution_failure": mock_backend_execution_failure,
259+
"backend_execution_latency": mock_backend_execution_latency,
260+
}
261+
262+
policy_runner.name = "test_policy"
263+
policy_runner.metadata = Metadata(
264+
name="my_backend",
265+
app_name="test_app",
266+
app_version="1.0",
267+
)
268+
269+
def mock_get_metric(name):
270+
return mock_metrics.get(name)
271+
272+
# Simulate backend throwing an exception
273+
mock_backend.run.side_effect = Exception("Backend error")
274+
275+
with patch("worker.policy.runner.get_metric", side_effect=mock_get_metric):
276+
mock_diode_client = MagicMock(name="MockDiodeClient")
277+
policy_runner.run(mock_diode_client, sample_diode_config, sample_policy)
278+
# Verify failure metric was called
279+
mock_backend_execution_failure.add.assert_called_once_with(
280+
1,
281+
{
282+
"policy": "test_policy",
283+
"backend": "my_backend",
284+
"app_name": "test_app",
285+
"app_version": "1.0",
286+
},
287+
)
288+
289+
# Verify backend execution latency recorded with failure status
290+
mock_backend_execution_latency.record.assert_called_once()
291+
latency_args = mock_backend_execution_latency.record.call_args[0][0]
292+
latency_kwargs = mock_backend_execution_latency.record.call_args[0][1]
293+
assert latency_args > 0
294+
assert latency_kwargs["backend"] == "my_backend"

worker/tests/test_metrics.py

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
#!/usr/bin/env python
2+
# Copyright 2025 NetBox Labs Inc
3+
"""NetBox Labs - Metrics Unit Tests."""
4+
5+
from unittest.mock import MagicMock, patch
6+
7+
import pytest
8+
9+
from worker.metrics import get_metric, setup_metrics_export
10+
11+
12+
@pytest.fixture
13+
def mock_opentelemetry():
14+
"""Mock the OpenTelemetry SDK imports and components."""
15+
with patch("worker.metrics.OTLPMetricExporter") as mock_exporter, patch(
16+
"worker.metrics.PeriodicExportingMetricReader"
17+
) as mock_reader, patch("opentelemetry.sdk.metrics.Meter") as mock_meter, patch(
18+
"worker.metrics.MeterProvider"
19+
) as mock_provider:
20+
# Setup return values
21+
mock_provider.return_value.get_meter.return_value = mock_meter
22+
23+
yield {
24+
"exporter": mock_exporter,
25+
"reader": mock_reader,
26+
"meter": mock_meter,
27+
"provider": mock_provider,
28+
}
29+
30+
31+
@pytest.fixture
32+
def reset_metrics_cache():
33+
"""Reset the metrics cache before test."""
34+
with patch("worker.metrics._metrics_cache", {}), \
35+
patch("worker.metrics._metric_factories", {}), \
36+
patch("worker.metrics._metrics_enabled", True):
37+
yield
38+
39+
40+
def test_setup_metrics_export(mock_opentelemetry):
41+
"""Test that metrics export setup creates the correct OpenTelemetry components."""
42+
endpoint = "http://localhost:4317"
43+
export_period = 30
44+
45+
setup_metrics_export(endpoint, export_period)
46+
47+
# Verify exporter was created with correct endpoint
48+
mock_opentelemetry["exporter"].assert_called_once()
49+
args, kwargs = mock_opentelemetry["exporter"].call_args
50+
assert kwargs["endpoint"] == endpoint
51+
52+
# Verify reader was created with correct exporter and export interval
53+
mock_opentelemetry["reader"].assert_called_once()
54+
args, kwargs = mock_opentelemetry["reader"].call_args
55+
assert kwargs["export_interval_millis"] == export_period * 1000
56+
57+
# Verify meter provider was configured
58+
mock_opentelemetry["provider"].assert_called_once()
59+
60+
# Verify meter was created
61+
mock_opentelemetry["provider"].return_value.get_meter.assert_called_once_with(
62+
"device-discovery", "0.0.0"
63+
)
64+
65+
66+
def test_setup_metrics_export_no_endpoint(mock_opentelemetry, reset_metrics_cache):
67+
"""Test that metrics export setup is properly disabled when no endpoint is provided."""
68+
with patch("worker.metrics.logger") as mock_logger:
69+
# Call with None endpoint
70+
setup_metrics_export(None, 30)
71+
72+
# Verify logger message
73+
mock_logger.info.assert_called_once_with(
74+
"No metrics endpoint provided, metrics collection is disabled"
75+
)
76+
77+
# Verify no OpenTelemetry components were created
78+
mock_opentelemetry["exporter"].assert_not_called()
79+
mock_opentelemetry["reader"].assert_not_called()
80+
mock_opentelemetry["provider"].assert_not_called()
81+
82+
# Verify get_metric returns None after setup with no endpoint
83+
metric = get_metric("api_requests")
84+
assert metric is None
85+
86+
87+
def test_get_metric_returns_counter(reset_metrics_cache):
88+
"""Test that get_metric returns a counter for counter-type metrics."""
89+
mock_counter = MagicMock()
90+
mock_meter = MagicMock()
91+
mock_meter.create_counter.return_value = mock_counter
92+
93+
with patch("worker.metrics._meter", mock_meter):
94+
# Test accessing a counter metric
95+
metric = get_metric("api_requests")
96+
97+
# Verify counter was created with correct name and description
98+
mock_meter.create_counter.assert_called_once()
99+
args, kwargs = mock_meter.create_counter.call_args
100+
assert kwargs["name"] == "api_requests"
101+
assert "description" in kwargs
102+
103+
# Should return the mock counter
104+
assert metric == mock_counter
105+
106+
107+
def test_get_metric_returns_histogram(reset_metrics_cache):
108+
"""Test that get_metric returns a histogram for latency-type metrics."""
109+
mock_histogram = MagicMock()
110+
mock_meter = MagicMock()
111+
mock_meter.create_histogram.return_value = mock_histogram
112+
113+
with patch("worker.metrics._meter", mock_meter):
114+
# Test accessing a histogram metric
115+
metric = get_metric("api_response_latency")
116+
117+
# Verify histogram was created with correct name and description
118+
mock_meter.create_histogram.assert_called_once()
119+
args, kwargs = mock_meter.create_histogram.call_args
120+
assert kwargs["name"] == "api_response_latency"
121+
assert "description" in kwargs
122+
123+
# Should return the mock histogram
124+
assert metric == mock_histogram
125+
126+
127+
def test_get_metric_returns_none_when_not_initialized():
128+
"""Test that get_metric returns None when metrics are not initialized."""
129+
with patch("worker.metrics._meter", None):
130+
metric = get_metric("api_requests")
131+
assert metric is None
132+
133+
134+
def test_get_metric_creates_metric_only_once(reset_metrics_cache):
135+
"""Test that get_metric only creates a metric once and returns cached value."""
136+
mock_counter = MagicMock()
137+
mock_meter = MagicMock()
138+
mock_meter.create_counter.return_value = mock_counter
139+
140+
with patch("worker.metrics._meter", mock_meter), patch(
141+
"worker.metrics._metrics_cache", {}
142+
):
143+
144+
# First call should create the metric
145+
metric1 = get_metric("api_requests")
146+
assert metric1 == mock_counter
147+
mock_meter.create_counter.assert_called_once()
148+
149+
# Reset the mock to check if it's called again
150+
mock_meter.create_counter.reset_mock()
151+
152+
# Second call should return cached metric without creating it again
153+
metric2 = get_metric("api_requests")
154+
assert metric2 == mock_counter
155+
mock_meter.create_counter.assert_not_called()
156+
157+
158+
def test_all_expected_metrics_exist(reset_metrics_cache):
159+
"""Test that all expected metrics can be retrieved."""
160+
expected_metrics = [
161+
"api_requests",
162+
"api_response_latency",
163+
"active_policies",
164+
"policy_executions",
165+
"backend_execution_success",
166+
"backend_execution_failure",
167+
"backend_execution_latency",
168+
]
169+
170+
mock_meter = MagicMock()
171+
mock_meter.create_counter.return_value = MagicMock()
172+
mock_meter.create_histogram.return_value = MagicMock()
173+
174+
with patch("worker.metrics._meter", mock_meter), patch(
175+
"worker.metrics._metrics_cache", {}
176+
):
177+
178+
for metric_name in expected_metrics:
179+
metric = get_metric(metric_name)
180+
assert metric is not None, f"Expected metric {metric_name} to exist"
181+
182+
183+
def test_setup_metrics_export_meter_provider_error(mock_opentelemetry, reset_metrics_cache):
184+
"""Test handling of errors when setting the meter provider."""
185+
endpoint = "http://localhost:4317"
186+
export_period = 30
187+
188+
# Mock set_meter_provider to raise an exception
189+
with patch("worker.metrics.otlp_metrics.set_meter_provider",
190+
side_effect=Exception("Provider error")), \
191+
patch("worker.metrics.logger") as mock_logger:
192+
193+
# Call function
194+
setup_metrics_export(endpoint, export_period)
195+
196+
# Verify components were created but meter provider wasn't set
197+
mock_opentelemetry["exporter"].assert_called_once()
198+
mock_opentelemetry["reader"].assert_called_once()
199+
mock_opentelemetry["provider"].assert_called_once()
200+
201+
# Verify warning was logged
202+
mock_logger.warning.assert_called_once()
203+
warning_message = mock_logger.warning.call_args[0][0]
204+
assert "Could not set meter provider" in warning_message
205+
206+
# Verify meter was not created
207+
mock_opentelemetry["provider"].return_value.get_meter.assert_not_called()
208+
209+
# Verify metrics are not enabled and get_metric returns None
210+
metric = get_metric("api_requests")
211+
assert metric is None

worker/tests/test_server.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# Copyright 2025 NetBox Labs Inc
33
"""NetBox Labs - Server Unit Tests."""
44

5-
from unittest.mock import patch
5+
from unittest.mock import MagicMock, patch
66

77
import pytest
88
import yaml
@@ -139,11 +139,25 @@ def test_read_status(mock_version_semver):
139139
mock_version_semver: Mocked version_semver function.
140140
141141
"""
142-
response = client.get("/api/v1/status")
143-
mock_version_semver.assert_called_once()
144-
assert response.status_code == 200
145-
assert response.json()["version"] == "1.0.0"
146-
assert "up_time_seconds" in response.json()
142+
mock_api_requests = MagicMock()
143+
mock_api_response_latency = MagicMock()
144+
145+
mock_metrics = {
146+
"api_requests": mock_api_requests,
147+
"api_response_latency": mock_api_response_latency,
148+
}
149+
150+
def mock_get_metric(name):
151+
return mock_metrics.get(name)
152+
153+
with patch("worker.server.get_metric", side_effect=mock_get_metric):
154+
response = client.get("/api/v1/status")
155+
mock_version_semver.assert_called_once()
156+
assert response.status_code == 200
157+
assert response.json()["version"] == "1.0.0"
158+
assert "up_time_seconds" in response.json()
159+
assert mock_api_requests.add.call_count == 1
160+
assert mock_api_response_latency.record.call_count == 1
147161

148162

149163
def test_read_capabilities():

0 commit comments

Comments
 (0)