Skip to content

Commit a6c349f

Browse files
Merge pull request #427 from nodestream-proj/MetricsPivot
Optionally adding time intervals as a reporter frequency option. Adding accumulator metrics.
2 parents d32b3cd + a1e885c commit a6c349f

File tree

7 files changed

+308
-15
lines changed

7 files changed

+308
-15
lines changed

nodestream/cli/commands/run.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ class Run(NodestreamCommand):
5757
description="Storage backend to use for checkpointing",
5858
flag=False,
5959
),
60+
option(
61+
"metrics-interval-in-seconds",
62+
description="Time interval to report metrics in seconds",
63+
default=None,
64+
flag=False,
65+
),
6066
*PROMETHEUS_OPTIONS,
6167
]
6268

nodestream/cli/operations/run_pipeline.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,14 @@ def create_progress_reporter(
122122
self, command: NodestreamCommand, pipeline_name: str
123123
) -> PipelineProgressReporter:
124124
indicator = self.get_progress_indicator(command, pipeline_name)
125+
metrics_interval_in_seconds = (
126+
float(command.option("metrics-interval-in-seconds"))
127+
if command.option("metrics-interval-in-seconds")
128+
else None
129+
)
125130
return PipelineProgressReporter(
126131
reporting_frequency=int(command.option("reporting-frequency")),
132+
metrics_interval_in_seconds=metrics_interval_in_seconds,
127133
callback=indicator.progress_callback,
128134
on_start_callback=indicator.on_start,
129135
on_finish_callback=indicator.on_finish,

nodestream/metrics.py

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
class Metric:
1515
name: str
1616
description: Optional[str] = None
17+
accumulate: bool = False
1718

1819
def increment_on(self, handler: "MetricHandler", value: Number = 1):
1920
"""Increment this metric on the given handler."""
@@ -27,6 +28,18 @@ def register(self, handler: "MetricHandler"):
2728
handler.increment(self, 0)
2829
return self
2930

31+
def __str__(self):
32+
return self.name
33+
34+
def __hash__(self):
35+
return hash(self.name)
36+
37+
def __eq__(self, other):
38+
return self.name == other.name
39+
40+
def __ne__(self, other):
41+
return self.name != other.name
42+
3043

3144
class MetricHandler(ABC):
3245
"""Recieves metrics and handles them in some way.
@@ -74,21 +87,28 @@ def tick(self):
7487

7588

7689
# Core metrics
77-
RECORDS = Metric("records", "Number of records processed")
78-
NON_FATAL_ERRORS = Metric("non_fatal_errors", "Number of non-fatal errors")
79-
FATAL_ERRORS = Metric("fatal_errors", "Number of fatal errors")
80-
NODES_UPSERTED = Metric("nodes_upserted", "Number of nodes upserted to the graph")
90+
RECORDS = Metric("records", "Number of records processed", accumulate=True)
91+
NON_FATAL_ERRORS = Metric(
92+
"non_fatal_errors", "Number of non-fatal errors", accumulate=True
93+
)
94+
FATAL_ERRORS = Metric("fatal_errors", "Number of fatal errors", accumulate=True)
95+
NODES_UPSERTED = Metric(
96+
"nodes_upserted", "Number of nodes upserted to the graph", accumulate=True
97+
)
8198
RELATIONSHIPS_UPSERTED = Metric(
82-
"relationships_upserted", "Number of relationships upserted to the graph"
99+
"relationships_upserted",
100+
"Number of relationships upserted to the graph",
101+
accumulate=True,
83102
)
84103
TIME_TO_LIVE_OPERATIONS = Metric(
85-
"time_to_live_operations", "Number of time-to-live operations executed"
104+
"time_to_live_operations",
105+
"Number of time-to-live operations executed",
106+
accumulate=True,
86107
)
87108
INGEST_HOOKS_EXECUTED = Metric(
88-
"ingest_hooks_executed", "Number of ingest hooks executed to the graph"
89-
)
90-
BUFFERED_RECORDS = Metric(
91-
"buffered_records", "Number of records buffered between steps"
109+
"ingest_hooks_executed",
110+
"Number of ingest hooks executed to the graph",
111+
accumulate=True,
92112
)
93113
STEPS_RUNNING = Metric(
94114
"steps_running", "Number of steps currently running in the pipeline"
@@ -184,6 +204,8 @@ def tick(self):
184204
class ConsoleMetricHandler(MetricHandler):
185205
"""A metric handler that prints metrics to the console."""
186206

207+
# Note: TODO: Tie incremental values to the metrics rather than the handler.
208+
187209
def __init__(self, command: Command):
188210
self.metrics: dict[Metric, Number] = {}
189211
self.command = command
@@ -194,8 +216,17 @@ def increment(self, metric: Metric, value: Number):
194216
def decrement(self, metric: Metric, value: Number):
195217
self.metrics[metric] = self.metrics.get(metric, 0) - value
196218

219+
def discharge(self) -> dict[Metric, Number]:
220+
metrics = {}
221+
for metric, value in self.metrics.items():
222+
metrics[metric.name] = value
223+
if metric.accumulate:
224+
self.metrics[metric] = 0
225+
return metrics
226+
197227
def render(self):
198-
stats = ((k.name, str(v)) for k, v in self.metrics.items() if v > 0)
228+
metrics = self.discharge()
229+
stats = ((k, str(v)) for k, v in metrics.items())
199230
table = self.command.table(STATS_TABLE_COLS, stats)
200231
table.render()
201232

@@ -219,9 +250,20 @@ def increment(self, metric: Metric, value: Number):
219250
def decrement(self, metric: Metric, value: Number):
220251
self.metrics[metric] = self.metrics.get(metric, 0) - value
221252

253+
def discharge(self) -> dict[Metric, Number]:
254+
metrics = {}
255+
for metric, value in self.metrics.items():
256+
metrics[metric.name] = value
257+
if metric.accumulate:
258+
self.metrics[metric] = 0
259+
260+
return metrics
261+
222262
def render(self):
263+
metrics = self.discharge()
223264
self.logger.info(
224-
"Metrics Report", extra={k.name: v for k, v in self.metrics.items()}
265+
"Metrics Report",
266+
extra=metrics,
225267
)
226268

227269
def stop(self):

nodestream/pipeline/progress_reporter.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import os
2+
import time
23
from dataclasses import dataclass, field
34
from logging import Logger, getLogger
4-
from typing import Any, Callable
5+
from typing import Any, Callable, Optional
56

67
from psutil import Process
78

@@ -32,13 +33,15 @@ class PipelineProgressReporter:
3233
"""A `PipelineProgressReporter` is a utility that can be used to report on the progress of a pipeline."""
3334

3435
reporting_frequency: int = 10000
36+
metrics_interval_in_seconds: Optional[float] = None
3537
logger: Logger = field(default_factory=getLogger)
3638
callback: Callable[[int, Metrics], None] = field(default=no_op)
3739
on_start_callback: Callable[[], None] = field(default=no_op)
3840
on_finish_callback: Callable[[Metrics], None] = field(default=no_op)
3941
on_fatal_error_callback: Callable[[Exception], None] = field(default=no_op)
4042
encountered_fatal_error: bool = field(default=False)
4143
observability_callback: Callable[[Any], None] = field(default=no_op)
44+
last_report_time: float = field(default=0)
4245

4346
def on_fatal_error(self, exception: Exception):
4447
self.encountered_fatal_error = True
@@ -68,7 +71,12 @@ def for_testing(cls, results_list: list) -> "PipelineProgressReporter":
6871
)
6972

7073
def report(self, index, metrics: Metrics):
71-
if index % self.reporting_frequency == 0:
74+
if self.metrics_interval_in_seconds is not None:
75+
current_time = time.time()
76+
if current_time - self.last_report_time >= self.metrics_interval_in_seconds:
77+
self.callback(index, metrics)
78+
self.last_report_time = current_time
79+
elif index % self.reporting_frequency == 0:
7280
self.callback(index, metrics)
7381

7482
def observe(self, record: Any):

tests/unit/cli/operations/test_run_pipeline.py

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,18 @@ def test_make_run_request(run_pipeline_operation, mocker):
3232
targets = ["t1", "t2"]
3333
pipeline_name = "my_pipeline"
3434
command = mocker.Mock()
35-
command.option.side_effect = ["my-storage", annotations, "10001", targets, "10000"]
35+
# Updated to handle all the option calls including time-interval-seconds
36+
option_responses = {
37+
"storage-backend": "my-storage",
38+
"annotations": annotations,
39+
"step-outbox-size": "10001",
40+
"target": targets,
41+
"time-interval-seconds": None, # No time interval provided
42+
"reporting-frequency": "10000",
43+
}
44+
command.option.side_effect = lambda opt: option_responses.get(opt)
45+
command.has_json_logging_set = False # Required for create_progress_reporter
46+
command.is_very_verbose = False # Required for make_run_request
3647
command.argument.return_value = [pipeline_name]
3748
pipeline = mocker.patch("nodestream.project.PipelineDefinition")
3849
pipeline.name = pipeline_name
@@ -43,6 +54,129 @@ def test_make_run_request(run_pipeline_operation, mocker):
4354
assert_that(result.progress_reporter.reporting_frequency, equal_to(10000))
4455

4556

57+
def test_create_progress_reporter_with_time_interval_seconds(
58+
run_pipeline_operation, mocker
59+
):
60+
"""Test that time_interval_seconds gets properly converted to float and passed to PipelineProgressReporter"""
61+
command = mocker.Mock()
62+
command.option.side_effect = lambda opt: {
63+
"metrics-interval-in-seconds": "30.5",
64+
"reporting-frequency": "1000",
65+
}.get(opt)
66+
command.has_json_logging_set = False
67+
68+
# Mock PipelineProgressReporter to capture arguments
69+
mock_progress_reporter = mocker.patch(
70+
"nodestream.cli.operations.run_pipeline.PipelineProgressReporter"
71+
)
72+
73+
_ = run_pipeline_operation.create_progress_reporter(command, "test_pipeline")
74+
75+
# Verify PipelineProgressReporter was called with correct time_interval_seconds
76+
mock_progress_reporter.assert_called_once()
77+
call_args = mock_progress_reporter.call_args
78+
assert_that(call_args.kwargs["metrics_interval_in_seconds"], equal_to(30.5))
79+
assert_that(call_args.kwargs["reporting_frequency"], equal_to(1000))
80+
81+
82+
def test_create_progress_reporter_without_time_interval_seconds(
83+
run_pipeline_operation, mocker
84+
):
85+
"""Test that time_interval_seconds is None when not provided"""
86+
command = mocker.Mock()
87+
command.option.side_effect = lambda opt: {
88+
"metrics-interval-in-seconds": None,
89+
"reporting-frequency": "2000",
90+
}.get(opt)
91+
command.has_json_logging_set = False
92+
93+
# Mock PipelineProgressReporter to capture arguments
94+
mock_progress_reporter = mocker.patch(
95+
"nodestream.cli.operations.run_pipeline.PipelineProgressReporter"
96+
)
97+
98+
_ = run_pipeline_operation.create_progress_reporter(command, "test_pipeline")
99+
100+
# Verify PipelineProgressReporter was called with None for time_interval_seconds
101+
mock_progress_reporter.assert_called_once()
102+
call_args = mock_progress_reporter.call_args
103+
assert_that(call_args.kwargs["metrics_interval_in_seconds"], equal_to(None))
104+
assert_that(call_args.kwargs["reporting_frequency"], equal_to(2000))
105+
106+
107+
def test_create_progress_reporter_with_json_indicator(run_pipeline_operation, mocker):
108+
"""Test that create_progress_reporter works correctly with JSON progress indicator"""
109+
command = mocker.Mock()
110+
command.option.side_effect = lambda opt: {
111+
"metrics-interval-in-seconds": "15.0",
112+
"reporting-frequency": "500",
113+
}.get(opt)
114+
command.has_json_logging_set = True
115+
116+
# Mock PipelineProgressReporter to capture arguments
117+
mock_progress_reporter = mocker.patch(
118+
"nodestream.cli.operations.run_pipeline.PipelineProgressReporter"
119+
)
120+
121+
_ = run_pipeline_operation.create_progress_reporter(command, "test_pipeline")
122+
123+
# Verify PipelineProgressReporter was called with correct arguments
124+
mock_progress_reporter.assert_called_once()
125+
call_args = mock_progress_reporter.call_args
126+
assert_that(call_args.kwargs["metrics_interval_in_seconds"], equal_to(15.0))
127+
assert_that(call_args.kwargs["reporting_frequency"], equal_to(500))
128+
129+
130+
def test_make_run_request_with_time_interval_seconds_integration(
131+
run_pipeline_operation, mocker
132+
):
133+
"""Integration test to ensure make_run_request properly handles time_interval_seconds through create_progress_reporter"""
134+
annotations = ["annotation1"]
135+
targets = ["t1"]
136+
pipeline_name = "my_pipeline"
137+
command = mocker.Mock()
138+
139+
# Setup command.option to handle all the different option calls made by make_run_request
140+
option_responses = {
141+
"storage-backend": "my-storage",
142+
"annotations": annotations,
143+
"step-outbox-size": "10001",
144+
"target": targets,
145+
"metrics-interval-in-seconds": "45.0",
146+
"reporting-frequency": "5000",
147+
}
148+
command.option.side_effect = lambda opt: option_responses.get(opt)
149+
command.has_json_logging_set = False
150+
command.is_very_verbose = False
151+
command.argument.return_value = [pipeline_name]
152+
153+
pipeline = mocker.Mock()
154+
pipeline.name = pipeline_name
155+
pipeline.configuration = PipelineConfiguration()
156+
157+
# Mock the project's get_object_storage_by_name method
158+
run_pipeline_operation.project.get_object_storage_by_name.return_value = None
159+
run_pipeline_operation.project.get_target_by_name.return_value = None
160+
161+
# Mock PipelineProgressReporter to capture its arguments
162+
mock_progress_reporter = mocker.patch(
163+
"nodestream.cli.operations.run_pipeline.PipelineProgressReporter"
164+
)
165+
166+
result = run_pipeline_operation.make_run_request(command, pipeline)
167+
168+
# Verify the progress reporter was created with correct time_interval_seconds
169+
mock_progress_reporter.assert_called_once()
170+
call_args = mock_progress_reporter.call_args
171+
assert_that(call_args.kwargs["metrics_interval_in_seconds"], equal_to(45.0))
172+
assert_that(call_args.kwargs["reporting_frequency"], equal_to(5000))
173+
174+
# Verify other parts of the request are still correct
175+
assert_that(result.pipeline_name, equal_to(pipeline_name))
176+
assert_that(result.initialization_arguments.annotations, equal_to(annotations))
177+
assert_that(result.initialization_arguments.step_outbox_size, equal_to(10001))
178+
179+
46180
def test_spinner_on_start(mocker):
47181
spinner = SpinnerProgressIndicator(mocker.Mock(), "pipeline_name")
48182
spinner.on_start()

tests/unit/pipeline/test_pipeline_progress_reporter.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
from unittest.mock import Mock
2+
13
import pytest
24
from hamcrest import assert_that, equal_to
35

6+
from nodestream.metrics import Metrics
47
from nodestream.pipeline import IterableExtractor, Pipeline, PipelineProgressReporter
58

69

@@ -17,3 +20,35 @@ async def test_pipeline_progress_reporter_for_testing(mocker):
1720
result = PipelineProgressReporter.for_testing([])
1821
assert_that(result.reporting_frequency, equal_to(1))
1922
assert_that(result.logger.name, equal_to("test"))
23+
24+
25+
def test_pipeline_progress_reporter_with_metrics_interval_in_seconds(mocker):
26+
"""Test that metrics_interval_in_seconds works correctly"""
27+
mock_callback = Mock()
28+
reporter = PipelineProgressReporter(
29+
metrics_interval_in_seconds=0.1, callback=mock_callback
30+
)
31+
32+
mock_time = mocker.patch("nodestream.pipeline.progress_reporter.time.time")
33+
mock_time.side_effect = [0.15, 0.2] # 150ms, 200ms
34+
35+
metrics = Metrics()
36+
reporter.report(1, metrics) # Should report (150ms >= 100ms from 0)
37+
reporter.report(2, metrics) # Should not report (200ms - 150ms = 50ms < 100ms)
38+
39+
assert_that(mock_callback.call_count, equal_to(1))
40+
41+
42+
def test_pipeline_progress_reporter_without_metrics_interval_in_seconds_uses_frequency():
43+
"""Test that None metrics_interval_in_seconds falls back to frequency-based reporting"""
44+
mock_callback = Mock()
45+
reporter = PipelineProgressReporter(
46+
metrics_interval_in_seconds=None, reporting_frequency=3, callback=mock_callback
47+
)
48+
49+
metrics = Metrics()
50+
for i in range(1, 7): # 1,2,3,4,5,6
51+
reporter.report(i, metrics)
52+
53+
# Should report on multiples of 3: indices 3, 6
54+
assert_that(mock_callback.call_count, equal_to(2))

0 commit comments

Comments
 (0)