Skip to content

Commit b3bbffb

Browse files
Merge pull request #430 from nodestream-proj/SnapshotFix
Updating channel metric accumulation logic, fixed snapshots.
2 parents 5c229d7 + d965656 commit b3bbffb

File tree

5 files changed

+45
-4
lines changed

5 files changed

+45
-4
lines changed

nodestream/metrics.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ def increment(self, metric: Metric, value: Number):
214214
self.metrics[metric] = self.metrics.get(metric, 0) + value
215215

216216
def decrement(self, metric: Metric, value: Number):
217-
self.metrics[metric] = self.metrics.get(metric, 0) - value
217+
if not metric.accumulate:
218+
self.metrics[metric] = self.metrics.get(metric, 0) - value
218219

219220
def discharge(self) -> dict[Metric, Number]:
220221
metrics = {}
@@ -248,7 +249,8 @@ def increment(self, metric: Metric, value: Number):
248249
self.metrics[metric] = self.metrics.get(metric, 0) + value
249250

250251
def decrement(self, metric: Metric, value: Number):
251-
self.metrics[metric] = self.metrics.get(metric, 0) - value
252+
if not metric.accumulate:
253+
self.metrics[metric] = self.metrics.get(metric, 0) - value
252254

253255
def discharge(self) -> dict[Metric, Number]:
254256
metrics = {}

nodestream/pipeline/channel.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def create_with_naming(
5151
metric = Metric(
5252
f"buffered_{input_name}_to_{output_name}",
5353
f"Records buffered: {input_name}{output_name}",
54+
accumulate=True,
5455
)
5556
return cls(size, metric)
5657

nodestream/pipeline/progress_reporter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,11 @@ def for_testing(cls, results_list: list) -> "PipelineProgressReporter":
6464
return cls(
6565
reporting_frequency=1,
6666
logger=getLogger("test"),
67-
callback=lambda _, record: results_list.append(record),
67+
callback=no_op,
6868
on_start_callback=no_op,
6969
on_finish_callback=no_op,
7070
on_fatal_error_callback=raise_exception,
71+
observability_callback=lambda record: results_list.append(record),
7172
)
7273

7374
def report(self, index, metrics: Metrics):

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "nodestream"
3-
version = "0.14.12"
3+
version = "0.14.13"
44
description = "A Fast, Declarative ETL for Graph Databases."
55
license = "GPL-3.0-only"
66
authors = [

tests/unit/test_metrics.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,40 @@ def test_json_log_metric_handler_discharge_with_accumulate(mocker):
202202
# Accumulating metric should be reset to 0, non-accumulating should remain
203203
assert handler.metrics[accumulating_metric] == 0
204204
assert handler.metrics[non_accumulating_metric] == 7
205+
206+
207+
def test_json_log_metric_handler_discharge_with_accumulate_and_decrement(mocker):
208+
"""Test that JsonLogMetricHandler does not decrement accumulating metrics"""
209+
handler = JsonLogMetricHandler()
210+
211+
accumulating_metric = Metric("test_accumulate", accumulate=True)
212+
non_accumulating_metric = Metric("test_no_accumulate", accumulate=False)
213+
214+
handler.increment(accumulating_metric, 10)
215+
handler.decrement(accumulating_metric, 7)
216+
217+
handler.increment(non_accumulating_metric, 10)
218+
handler.decrement(non_accumulating_metric, 7)
219+
220+
# Accumulating metric should be reset to 0, non-accumulating should remain
221+
assert handler.metrics[accumulating_metric] == 10
222+
assert handler.metrics[non_accumulating_metric] == 3
223+
224+
225+
def test_console_metric_handler_discharge_with_accumulate_and_decrement(mocker):
226+
"""Test that ConsoleMetricHandler does not decrement accumulating metrics"""
227+
mock_command = mocker.Mock()
228+
handler = ConsoleMetricHandler(mock_command)
229+
230+
accumulating_metric = Metric("test_accumulate", accumulate=True)
231+
non_accumulating_metric = Metric("test_no_accumulate", accumulate=False)
232+
233+
handler.increment(accumulating_metric, 10)
234+
handler.decrement(accumulating_metric, 7)
235+
236+
handler.increment(non_accumulating_metric, 10)
237+
handler.decrement(non_accumulating_metric, 7)
238+
239+
# Accumulating metric should be reset to 0, non-accumulating should remain
240+
assert handler.metrics[accumulating_metric] == 10
241+
assert handler.metrics[non_accumulating_metric] == 3

0 commit comments

Comments
 (0)