Skip to content

Commit 4a181cd

Browse files
committed
Undoing Registry.
1 parent fad09d5 commit 4a181cd

File tree

6 files changed

+79
-104
lines changed

6 files changed

+79
-104
lines changed

nodestream/databases/query_executor_with_statistics.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
from ..metrics import (
44
Metric,
55
Metrics,
6-
NodestreamMetricRegistry,
6+
NODES_UPSERTED,
7+
RELATIONSHIPS_UPSERTED,
8+
TIME_TO_LIVE_OPERATIONS,
9+
INGEST_HOOKS_EXECUTED,
710
)
811
from ..model import IngestionHook, Node, RelationshipWithNodes, TimeToLiveConfiguration
912
from .query_executor import (
@@ -53,7 +56,7 @@ async def upsert_nodes_in_bulk_with_same_operation(
5356

5457
# Increment metrics in bulk
5558
metrics = Metrics.get()
56-
metrics.increment(NodestreamMetricRegistry.NODES_UPSERTED, total_nodes)
59+
metrics.increment(NODES_UPSERTED, total_nodes)
5760
for node_type, count in node_type_counts.items():
5861
metric = self._get_or_create_node_metric(node_type)
5962
metrics.increment(metric, count)
@@ -80,19 +83,19 @@ async def upsert_relationships_in_bulk_of_same_operation(
8083
# Increment metrics in bulk
8184
metrics = Metrics.get()
8285
metrics.increment(
83-
NodestreamMetricRegistry.RELATIONSHIPS_UPSERTED, total_relationships
86+
RELATIONSHIPS_UPSERTED, total_relationships
8487
)
8588
for rel_type, count in relationship_type_counts.items():
8689
metric = self._get_or_create_relationship_metric(rel_type)
8790
metrics.increment(metric, count)
8891

8992
async def perform_ttl_op(self, config: TimeToLiveConfiguration):
9093
await self.inner.perform_ttl_op(config)
91-
Metrics.get().increment(NodestreamMetricRegistry.TIME_TO_LIVE_OPERATIONS)
94+
Metrics.get().increment(TIME_TO_LIVE_OPERATIONS)
9295

9396
async def execute_hook(self, hook: IngestionHook):
9497
await self.inner.execute_hook(hook)
95-
Metrics.get().increment(NodestreamMetricRegistry.INGEST_HOOKS_EXECUTED)
98+
Metrics.get().increment(INGEST_HOOKS_EXECUTED)
9699

97100
async def finish(self):
98101
await self.inner.finish()

nodestream/metrics.py

Lines changed: 22 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -73,63 +73,26 @@ def tick(self):
7373
pass
7474

7575

76-
class MetricRegistry:
77-
"""A registry of metrics that collects metrics from class variables in subclasses.
78-
79-
This class provides a way to collect metrics defined as class attributes in subclasses.
80-
Metrics can be declared as class attributes in subclasses. For example:
81-
82-
class MyMetricRegistry(MetricRegistry):
83-
MY_METRIC = Metric("my_metric", "Description of my metric")
84-
ANOTHER_METRIC = Metric("another_metric", "Description of another metric")
85-
"""
86-
87-
_subclasses: list[type["MetricRegistry"]] = []
88-
89-
def __init_subclass__(cls, **kwargs):
90-
"""Register subclasses for metric collection."""
91-
super().__init_subclass__(**kwargs)
92-
MetricRegistry._subclasses.append(cls)
93-
94-
@classmethod
95-
def get_all_metrics(cls) -> dict[str, Metric]:
96-
"""Get all metrics from all subclasses.
97-
98-
Returns a dictionary mapping metric names to their Metric instances,
99-
collected from all MetricRegistry subclasses.
100-
"""
101-
all_metrics = {}
102-
for subclass in cls._subclasses:
103-
# Collect metrics from class variables
104-
for attr_name, attr_value in subclass.__dict__.items():
105-
if isinstance(attr_value, Metric):
106-
all_metrics[attr_name] = attr_value
107-
return all_metrics
108-
109-
110-
class NodestreamMetricRegistry(MetricRegistry):
111-
"""A registry of metrics for the Nodestream project."""
112-
113-
# Core metrics
114-
RECORDS = Metric("records", "Number of records processed")
115-
NON_FATAL_ERRORS = Metric("non_fatal_errors", "Number of non-fatal errors")
116-
FATAL_ERRORS = Metric("fatal_errors", "Number of fatal errors")
117-
NODES_UPSERTED = Metric("nodes_upserted", "Number of nodes upserted to the graph")
118-
RELATIONSHIPS_UPSERTED = Metric(
119-
"relationships_upserted", "Number of relationships upserted to the graph"
120-
)
121-
TIME_TO_LIVE_OPERATIONS = Metric(
122-
"time_to_live_operations", "Number of time-to-live operations executed"
123-
)
124-
INGEST_HOOKS_EXECUTED = Metric(
125-
"ingest_hooks_executed", "Number of ingest hooks executed to the graph"
126-
)
127-
BUFFERED_RECORDS = Metric(
128-
"buffered_records", "Number of records buffered between steps"
129-
)
130-
STEPS_RUNNING = Metric(
131-
"steps_running", "Number of steps currently running in the pipeline"
132-
)
76+
# Core metrics
77+
RECORDS = Metric("records", "Number of records processed")
78+
NON_FATAL_ERRORS = Metric("non_fatal_errors", "Number of non-fatal errors")
79+
FATAL_ERRORS = Metric("fatal_errors", "Number of fatal errors")
80+
NODES_UPSERTED = Metric("nodes_upserted", "Number of nodes upserted to the graph")
81+
RELATIONSHIPS_UPSERTED = Metric(
82+
"relationships_upserted", "Number of relationships upserted to the graph"
83+
)
84+
TIME_TO_LIVE_OPERATIONS = Metric(
85+
"time_to_live_operations", "Number of time-to-live operations executed"
86+
)
87+
INGEST_HOOKS_EXECUTED = Metric(
88+
"ingest_hooks_executed", "Number of ingest hooks executed to the graph"
89+
)
90+
BUFFERED_RECORDS = Metric(
91+
"buffered_records", "Number of records buffered between steps"
92+
)
93+
STEPS_RUNNING = Metric(
94+
"steps_running", "Number of steps currently running in the pipeline"
95+
)
13396

13497

13598
try:
@@ -223,9 +186,7 @@ class ConsoleMetricHandler(MetricHandler):
223186
"""A metric handler that prints metrics to the console."""
224187

225188
def __init__(self, command: Command):
226-
self.metrics: dict[Metric, Number] = {
227-
metric: 0 for metric in MetricRegistry.get_all_metrics().values()
228-
}
189+
self.metrics: dict[Metric, Number] = {}
229190
self.command = command
230191

231192
def increment(self, metric: Metric, value: Number):
@@ -250,9 +211,7 @@ class JsonLogMetricHandler(MetricHandler):
250211
"""A metric handler that logs metrics in JSON format."""
251212

252213
def __init__(self):
253-
self.metrics: dict[Metric, Number] = {
254-
metric: 0 for metric in MetricRegistry.get_all_metrics().values()
255-
}
214+
self.metrics: dict[Metric, Number] = {}
256215
self.logger = getLogger(__name__)
257216

258217
def increment(self, metric: Metric, value: Number):

nodestream/pipeline/pipeline.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
from logging import getLogger
33
from typing import Iterable, List, Tuple
44

5-
from ..metrics import Metrics, NodestreamMetricRegistry
5+
from ..metrics import (
6+
Metrics,
7+
RECORDS,
8+
STEPS_RUNNING,
9+
)
610
from ..schema import ExpandsSchema, ExpandsSchemaFromChildren
711
from .channel import StepInput, StepOutput, channel
812
from .object_storage import ObjectStore
@@ -34,14 +38,14 @@ def __init__(
3438

3539
async def start_step(self):
3640
try:
37-
Metrics.get().increment(NodestreamMetricRegistry.STEPS_RUNNING)
41+
Metrics.get().increment(STEPS_RUNNING)
3842
await self.step.start(self.context)
3943
except Exception as e:
4044
self.context.report_error("Error starting step", e)
4145

4246
async def stop_step(self):
4347
try:
44-
Metrics.get().decrement(NodestreamMetricRegistry.STEPS_RUNNING)
48+
Metrics.get().decrement(STEPS_RUNNING)
4549
await self.step.finish(self.context)
4650
except Exception as e:
4751
self.context.report_error("Error stopping step", e)
@@ -116,7 +120,7 @@ async def run(self):
116120

117121
index = 0
118122
while (record := await self.input.get()) is not None:
119-
metrics.increment(NodestreamMetricRegistry.RECORDS)
123+
metrics.increment(RECORDS)
120124
self.call_handling_errors(self.reporter.report, index, metrics)
121125
if self.observe_results:
122126
self.call_handling_errors(self.reporter.observe, record)

nodestream/pipeline/step.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
from typing import AsyncGenerator, Optional
22

3-
from ..metrics import Metrics, NodestreamMetricRegistry
3+
from ..metrics import (
4+
Metrics,
5+
FATAL_ERRORS,
6+
NON_FATAL_ERRORS,
7+
)
48
from .object_storage import ObjectStore
59
from .progress_reporter import PipelineProgressReporter
610

@@ -57,9 +61,9 @@ def report_error(
5761
)
5862
if fatal:
5963
self.reporter.on_fatal_error(exception)
60-
Metrics.get().increment(NodestreamMetricRegistry.FATAL_ERRORS)
64+
Metrics.get().increment(FATAL_ERRORS)
6165
else:
62-
Metrics.get().increment(NodestreamMetricRegistry.NON_FATAL_ERRORS)
66+
Metrics.get().increment(NON_FATAL_ERRORS)
6367

6468
def debug(self, message: str, **extras):
6569
"""Log a debug message.

tests/unit/databases/test_query_executor_with_statistics.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@
55
from nodestream.databases.query_executor_with_statistics import (
66
QueryExecutorWithStatistics,
77
)
8-
from nodestream.metrics import Metrics, NodestreamMetricRegistry
8+
from nodestream.metrics import (
9+
Metrics,
10+
NODES_UPSERTED,
11+
RELATIONSHIPS_UPSERTED,
12+
TIME_TO_LIVE_OPERATIONS,
13+
INGEST_HOOKS_EXECUTED,
14+
)
915
from nodestream.model import Node, Relationship, RelationshipWithNodes
1016

1117

@@ -35,7 +41,7 @@ async def test_upsert_nodes_in_bulk_with_same_operation_increments_counter_by_si
3541
in metrics.increment.call_args_list
3642
)
3743
assert (
38-
call(NodestreamMetricRegistry.NODES_UPSERTED, 2)
44+
call(NODES_UPSERTED, 2)
3945
in metrics.increment.call_args_list
4046
)
4147

@@ -82,7 +88,7 @@ async def test_upsert_relationships_in_bulk_of_same_operation_increments_counter
8288
in metrics.increment.call_args_list
8389
)
8490
assert (
85-
call(NodestreamMetricRegistry.RELATIONSHIPS_UPSERTED, 2)
91+
call(RELATIONSHIPS_UPSERTED, 2)
8692
in metrics.increment.call_args_list
8793
)
8894

@@ -98,7 +104,7 @@ async def test_perform_ttl_op_increments_counter_by_one(
98104
"config"
99105
)
100106
metrics.increment.assert_called_once_with(
101-
NodestreamMetricRegistry.TIME_TO_LIVE_OPERATIONS
107+
TIME_TO_LIVE_OPERATIONS
102108
)
103109

104110

@@ -113,7 +119,7 @@ async def test_execute_hook_increments_counter_by_one(
113119
"hook"
114120
)
115121
metrics.increment.assert_called_once_with(
116-
NodestreamMetricRegistry.INGEST_HOOKS_EXECUTED
122+
INGEST_HOOKS_EXECUTED
117123
)
118124

119125

tests/unit/test_metrics.py

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,26 @@
88
JsonLogMetricHandler,
99
Metric,
1010
Metrics,
11-
NodestreamMetricRegistry,
11+
RECORDS,
12+
NODES_UPSERTED,
13+
RELATIONSHIPS_UPSERTED,
14+
TIME_TO_LIVE_OPERATIONS,
15+
INGEST_HOOKS_EXECUTED,
1216
NullMetricHandler,
1317
PrometheusMetricHandler,
1418
)
1519

1620

17-
def test_metric_registry_contains_subclasses_in_all_metrics():
18-
for metric in NodestreamMetricRegistry.get_all_metrics().values():
19-
assert metric in NodestreamMetricRegistry.get_all_metrics().values()
20-
21-
2221
def test_metric_increment_on_handler_increments_metric_on_handler(mocker):
2322
handler = mocker.Mock()
24-
metric = NodestreamMetricRegistry.RECORDS
23+
metric = RECORDS
2524
metric.increment_on(handler, 1)
2625
handler.increment.assert_called_once_with(metric, 1)
2726

2827

2928
def test_metric_decrement_on_handler_decrements_metric_on_handler(mocker):
3029
handler = mocker.Mock()
31-
metric = NodestreamMetricRegistry.RECORDS
30+
metric = RECORDS
3231
metric.decrement_on(handler, 1)
3332
handler.decrement.assert_called_once_with(metric, 1)
3433

@@ -42,8 +41,8 @@ def test_prometheus_metric_handler_registers_new_metrics(mocker):
4241

4342
def test_null_metric_handler():
4443
handler = NullMetricHandler()
45-
handler.increment(NodestreamMetricRegistry.RECORDS, 1)
46-
handler.decrement(NodestreamMetricRegistry.RECORDS, 1)
44+
handler.increment(RECORDS, 1)
45+
handler.decrement(RECORDS, 1)
4746
# No assertions needed as NullMetricHandler does nothing
4847

4948

@@ -53,16 +52,16 @@ def test_prometheus_metric_handler(mocker):
5352
handler = PrometheusMetricHandler()
5453
handler.start()
5554
mock_start_http_server.assert_called_once()
56-
handler.increment(NodestreamMetricRegistry.RECORDS, 1)
57-
handler.decrement(NodestreamMetricRegistry.RECORDS, 1)
55+
handler.increment(RECORDS, 1)
56+
handler.decrement(RECORDS, 1)
5857
handler.stop()
5958

6059

6160
def test_console_metric_handler(mocker):
6261
mock_command = mocker.Mock()
6362
handler = ConsoleMetricHandler(mock_command)
64-
handler.increment(NodestreamMetricRegistry.RECORDS, 1)
65-
handler.decrement(NodestreamMetricRegistry.RECORDS, 1)
63+
handler.increment(RECORDS, 1)
64+
handler.decrement(RECORDS, 1)
6665
handler.tick()
6766
handler.stop()
6867
mock_command.table.assert_called_once()
@@ -71,8 +70,8 @@ def test_console_metric_handler(mocker):
7170
def test_json_log_metric_handler(mocker):
7271
mock_logger = mocker.patch("nodestream.metrics.getLogger").return_value
7372
handler = JsonLogMetricHandler()
74-
handler.increment(NodestreamMetricRegistry.RECORDS, 1)
75-
handler.decrement(NodestreamMetricRegistry.RECORDS, 1)
73+
handler.increment(RECORDS, 1)
74+
handler.decrement(RECORDS, 1)
7675
handler.tick()
7776
handler.stop()
7877
mock_logger.info.assert_called_once()
@@ -85,12 +84,12 @@ def test_aggregate_handler(mocker):
8584
handler.start()
8685
mock_handler1.start.assert_called_once()
8786
mock_handler2.start.assert_called_once()
88-
handler.increment(NodestreamMetricRegistry.RECORDS, 1)
89-
mock_handler1.increment.assert_called_once_with(NodestreamMetricRegistry.RECORDS, 1)
90-
mock_handler2.increment.assert_called_once_with(NodestreamMetricRegistry.RECORDS, 1)
91-
handler.decrement(NodestreamMetricRegistry.RECORDS, 1)
92-
mock_handler1.decrement.assert_called_once_with(NodestreamMetricRegistry.RECORDS, 1)
93-
mock_handler2.decrement.assert_called_once_with(NodestreamMetricRegistry.RECORDS, 1)
87+
handler.increment(RECORDS, 1)
88+
mock_handler1.increment.assert_called_once_with(RECORDS, 1)
89+
mock_handler2.increment.assert_called_once_with(RECORDS, 1)
90+
handler.decrement(RECORDS, 1)
91+
mock_handler1.decrement.assert_called_once_with(RECORDS, 1)
92+
mock_handler2.decrement.assert_called_once_with(RECORDS, 1)
9493
handler.stop()
9594
mock_handler1.stop.assert_called_once()
9695
mock_handler2.stop.assert_called_once()
@@ -99,8 +98,8 @@ def test_aggregate_handler(mocker):
9998
def test_metrics_context(mocker):
10099
handler = mocker.Mock()
101100
with Metrics.capture(handler) as metrics:
102-
metrics.increment(NodestreamMetricRegistry.RECORDS, 1)
103-
handler.increment.assert_called_once_with(NodestreamMetricRegistry.RECORDS, 1)
101+
metrics.increment(RECORDS, 1)
102+
handler.increment.assert_called_once_with(RECORDS, 1)
104103
handler.stop.assert_called_once()
105104

106105

0 commit comments

Comments
 (0)