Skip to content

Commit 8ee377b

Browse files
committed
Updating channel metric accumulation logic, fixed snapshots.
1 parent 5c229d7 commit 8ee377b

File tree

4 files changed

+10
-4
lines changed

4 files changed

+10
-4
lines changed

nodestream/metrics.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,9 @@ def increment(self, metric: Metric, value: Number):
214214
self.metrics[metric] = self.metrics.get(metric, 0) + value
215215

216216
def decrement(self, metric: Metric, value: Number):
217-
self.metrics[metric] = self.metrics.get(metric, 0) - value
217+
# Decrementing metrics that are accumulated doesn't help.
218+
if not metric.accumulate:
219+
self.metrics[metric] = self.metrics.get(metric, 0) - value
218220

219221
def discharge(self) -> dict[Metric, Number]:
220222
metrics = {}
@@ -248,7 +250,9 @@ def increment(self, metric: Metric, value: Number):
248250
self.metrics[metric] = self.metrics.get(metric, 0) + value
249251

250252
def decrement(self, metric: Metric, value: Number):
251-
self.metrics[metric] = self.metrics.get(metric, 0) - value
253+
# Decrementing metrics that are accumulated doesn't help.
254+
if not metric.accumulate:
255+
self.metrics[metric] = self.metrics.get(metric, 0) - value
252256

253257
def discharge(self) -> dict[Metric, Number]:
254258
metrics = {}

nodestream/pipeline/channel.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def create_with_naming(
5151
metric = Metric(
5252
f"buffered_{input_name}_to_{output_name}",
5353
f"Records buffered: {input_name}{output_name}",
54+
accumulate=True,
5455
)
5556
return cls(size, metric)
5657

nodestream/pipeline/progress_reporter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,11 @@ def for_testing(cls, results_list: list) -> "PipelineProgressReporter":
6464
return cls(
6565
reporting_frequency=1,
6666
logger=getLogger("test"),
67-
callback=lambda _, record: results_list.append(record),
67+
callback=no_op,
6868
on_start_callback=no_op,
6969
on_finish_callback=no_op,
7070
on_fatal_error_callback=raise_exception,
71+
observability_callback=lambda _: lambda record: results_list.append(record),
7172
)
7273

7374
def report(self, index, metrics: Metrics):

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "nodestream"
3-
version = "0.14.12"
3+
version = "0.14.13"
44
description = "A Fast, Declarative ETL for Graph Databases."
55
license = "GPL-3.0-only"
66
authors = [

0 commit comments

Comments
 (0)