Skip to content

Commit 8cf43f4

Browse files
authored
Merge branch 'main' into pipeline-specific-schemas
2 parents 6b54d66 + b3bbffb commit 8cf43f4

File tree

4 files changed

+44
-3
lines changed

4 files changed

+44
-3
lines changed

nodestream/metrics.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ def increment(self, metric: Metric, value: Number):
214214
self.metrics[metric] = self.metrics.get(metric, 0) + value
215215

216216
def decrement(self, metric: Metric, value: Number):
217-
self.metrics[metric] = self.metrics.get(metric, 0) - value
217+
if not metric.accumulate:
218+
self.metrics[metric] = self.metrics.get(metric, 0) - value
218219

219220
def discharge(self) -> dict[Metric, Number]:
220221
metrics = {}
@@ -248,7 +249,8 @@ def increment(self, metric: Metric, value: Number):
248249
self.metrics[metric] = self.metrics.get(metric, 0) + value
249250

250251
def decrement(self, metric: Metric, value: Number):
251-
self.metrics[metric] = self.metrics.get(metric, 0) - value
252+
if not metric.accumulate:
253+
self.metrics[metric] = self.metrics.get(metric, 0) - value
252254

253255
def discharge(self) -> dict[Metric, Number]:
254256
metrics = {}

nodestream/pipeline/channel.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def create_with_naming(
5151
metric = Metric(
5252
f"buffered_{input_name}_to_{output_name}",
5353
f"Records buffered: {input_name}{output_name}",
54+
accumulate=True,
5455
)
5556
return cls(size, metric)
5657

nodestream/pipeline/progress_reporter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,11 @@ def for_testing(cls, results_list: list) -> "PipelineProgressReporter":
6464
return cls(
6565
reporting_frequency=1,
6666
logger=getLogger("test"),
67-
callback=lambda _, record: results_list.append(record),
67+
callback=no_op,
6868
on_start_callback=no_op,
6969
on_finish_callback=no_op,
7070
on_fatal_error_callback=raise_exception,
71+
observability_callback=lambda record: results_list.append(record),
7172
)
7273

7374
def report(self, index, metrics: Metrics):

tests/unit/test_metrics.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,40 @@ def test_json_log_metric_handler_discharge_with_accumulate(mocker):
202202
# Accumulating metric should be reset to 0, non-accumulating should remain
203203
assert handler.metrics[accumulating_metric] == 0
204204
assert handler.metrics[non_accumulating_metric] == 7
205+
206+
207+
def test_json_log_metric_handler_discharge_with_accumulate_and_decrement(mocker):
208+
"""Test that JsonLogMetricHandler does not decrement accumulating metrics"""
209+
handler = JsonLogMetricHandler()
210+
211+
accumulating_metric = Metric("test_accumulate", accumulate=True)
212+
non_accumulating_metric = Metric("test_no_accumulate", accumulate=False)
213+
214+
handler.increment(accumulating_metric, 10)
215+
handler.decrement(accumulating_metric, 7)
216+
217+
handler.increment(non_accumulating_metric, 10)
218+
handler.decrement(non_accumulating_metric, 7)
219+
220+
# Accumulating metric should be reset to 0, non-accumulating should remain
221+
assert handler.metrics[accumulating_metric] == 10
222+
assert handler.metrics[non_accumulating_metric] == 3
223+
224+
225+
def test_console_metric_handler_discharge_with_accumulate_and_decrement(mocker):
226+
"""Test that ConsoleMetricHandler does not decrement accumulating metrics"""
227+
mock_command = mocker.Mock()
228+
handler = ConsoleMetricHandler(mock_command)
229+
230+
accumulating_metric = Metric("test_accumulate", accumulate=True)
231+
non_accumulating_metric = Metric("test_no_accumulate", accumulate=False)
232+
233+
handler.increment(accumulating_metric, 10)
234+
handler.decrement(accumulating_metric, 7)
235+
236+
handler.increment(non_accumulating_metric, 10)
237+
handler.decrement(non_accumulating_metric, 7)
238+
239+
# Accumulating metric should be reset to 0, non-accumulating should remain
240+
assert handler.metrics[accumulating_metric] == 10
241+
assert handler.metrics[non_accumulating_metric] == 3

0 commit comments

Comments
 (0)