Skip to content

Commit 0bdebc6

Browse files
committed
Created tests for the change.
1 parent a992f88 commit 0bdebc6

File tree

6 files changed

+171
-33
lines changed

6 files changed

+171
-33
lines changed

nodestream/metrics.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ def decrement_on(self, handler: "MetricHandler", value: Number = 1):
2323
"""Decrement this metric on the given handler."""
2424
handler.decrement(self, value)
2525

26-
def register(self):
27-
Metrics().get().increment(self, 0)
26+
def register(self, handler: "MetricHandler"):
27+
handler.increment(self, 0)
2828
return self
2929

3030

@@ -175,7 +175,7 @@ def stop(self):
175175
self.thread.join()
176176
self.logger.info("Prometheus metrics server shut down successfully")
177177

178-
def get_guage(self, metric: Metric) -> Gauge:
178+
def get_gauge(self, metric: Metric) -> Gauge:
179179
"""Get the Gauge for the given metric, creating it if it doesn't exist."""
180180
if metric not in self.instruments_by_metric:
181181
self.instruments_by_metric[metric] = Gauge(
@@ -186,10 +186,11 @@ def get_guage(self, metric: Metric) -> Gauge:
186186
return self.instruments_by_metric[metric]
187187

188188
def increment(self, metric: Metric, value: Number):
189-
self.get_guage(metric).inc(value)
189+
print(f"Incrementing {metric.name} by {value}")
190+
self.get_gauge(metric).inc(value)
190191

191192
def decrement(self, metric: Metric, value: Number):
192-
self.get_guage(metric).dec(value)
193+
self.get_gauge(metric).dec(value)
193194

194195
def tick(self):
195196
pass

nodestream/pipeline/pipeline.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ async def emit_record(self, record):
5858
async def drive_step(self):
5959
try:
6060
while (next_record := await self.input.get()) is not None:
61+
print(f"Driving step with record {next_record}")
6162
results = self.step.process_record(next_record, self.context)
6263
async for record in results:
6364
if not await self.emit_record(record):
@@ -172,6 +173,7 @@ async def run(self, reporter: PipelineProgressReporter):
172173
# the flow of records between the steps.
173174
executors: List[StepExecutor] = []
174175
current_input, current_output = channel(self.step_outbox_size)
176+
print(current_input, current_output)
175177
pipeline_output = PipelineOutput(current_input, reporter)
176178

177179
# Create the executors for the steps in the pipeline. The executors

tests/unit/cli/operations/test_run_pipeline.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
ProgressIndicator,
77
RunPipeline,
88
SpinnerProgressIndicator,
9+
JsonProgressIndicator,
910
)
1011
from nodestream.metrics import Metrics
1112
from nodestream.project import PipelineConfiguration, PipelineDefinition, Project
@@ -59,18 +60,21 @@ def test_spinner_on_finish(mocker):
5960
def test_spinner_progress_callback(mocker):
6061
spinner = SpinnerProgressIndicator(mocker.Mock(), "pipeline_name")
6162
spinner.on_start()
62-
spinner.progress_callback(1000, None)
63+
mock_metrics = mocker.Mock()
64+
spinner.progress_callback(1000, mock_metrics)
6365
spinner.progress.set_message.assert_called_once()
66+
mock_metrics.tick.assert_called_once()
6467

6568

6669
def test_spinner_error_condition(mocker):
6770
spinner = SpinnerProgressIndicator(mocker.Mock(), "pipeline_name")
6871
spinner.on_start()
6972
spinner.on_fatal_error(Exception())
7073
spinner.progress.set_message.assert_called_once()
71-
74+
mock_metrics = mocker.Mock()
7275
with pytest.raises(Exception):
73-
spinner.on_finish(Metrics())
76+
spinner.on_finish(mock_metrics)
77+
mock_metrics.tick.assert_called_once()
7478

7579

7680
@pytest.mark.parametrize(
@@ -126,3 +130,14 @@ def test_progress_indicator_error(mocker):
126130
indicator = ProgressIndicator(mocker.Mock(), "pipeline_name")
127131
indicator.on_fatal_error(Exception("Boom"))
128132
indicator.command.line.assert_called_with("<error>Boom</error>")
133+
134+
135+
def test_json_progress_indicator(mocker):
136+
indicator = JsonProgressIndicator(mocker.Mock(), "pipeline_name")
137+
indicator.logger.info = mocker.Mock()
138+
indicator.on_start()
139+
indicator.on_finish(Metrics())
140+
assert indicator.logger.info.call_args_list == [
141+
mocker.call("Starting Pipeline"),
142+
mocker.call("Pipeline Completed"),
143+
]

tests/unit/databases/test_query_executor_with_statistics.py

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import pytest
2+
from unittest.mock import call
23

34
from nodestream.databases.query_executor_with_statistics import (
45
QueryExecutorWithStatistics,
56
)
6-
from nodestream.metrics import Metric, Metrics
7+
from nodestream.metrics import Metrics, NodestreamMetricRegistry
8+
from nodestream.model import Node, RelationshipWithNodes, Relationship
79

810

911
@pytest.fixture
@@ -18,13 +20,23 @@ async def test_upsert_nodes_in_bulk_with_same_operation_increments_counter_by_si
1820
with Metrics.capture() as metrics:
1921
metrics.increment = mocker.Mock()
2022
await query_executor_with_statistics.upsert_nodes_in_bulk_with_same_operation(
21-
"operation", ["node1", "node2"]
23+
"operation",
24+
[Node("node_type", "node1", "id1"), Node("node_type", "node2", "id2")],
2225
)
2326
query_executor_with_statistics.inner.upsert_nodes_in_bulk_with_same_operation.assert_awaited_once_with(
2427
"operation",
25-
["node1", "node2"],
28+
[Node("node_type", "node1", "id1"), Node("node_type", "node2", "id2")],
29+
)
30+
31+
assert "node_type" in query_executor_with_statistics.node_metric_by_type
32+
assert (
33+
call(query_executor_with_statistics.node_metric_by_type["node_type"], 2)
34+
in metrics.increment.call_args_list
35+
)
36+
assert (
37+
call(NodestreamMetricRegistry.NODES_UPSERTED, 2)
38+
in metrics.increment.call_args_list
2639
)
27-
metrics.increment.assert_called_once_with(Metric.NODES_UPSERTED, 2)
2840

2941

3042
@pytest.mark.asyncio
@@ -34,13 +46,44 @@ async def test_upsert_relationships_in_bulk_of_same_operation_increments_counter
3446
with Metrics.capture() as metrics:
3547
metrics.increment = mocker.Mock()
3648
await query_executor_with_statistics.upsert_relationships_in_bulk_of_same_operation(
37-
"operation", ["relationship1", "relationship2"]
49+
"operation",
50+
[
51+
RelationshipWithNodes(
52+
"node1", "node2", Relationship("relationship_type")
53+
),
54+
RelationshipWithNodes(
55+
"node3", "node4", Relationship("relationship_type")
56+
),
57+
],
3858
)
3959
query_executor_with_statistics.inner.upsert_relationships_in_bulk_of_same_operation.assert_awaited_once_with(
4060
"operation",
41-
["relationship1", "relationship2"],
61+
[
62+
RelationshipWithNodes(
63+
"node1", "node2", Relationship("relationship_type")
64+
),
65+
RelationshipWithNodes(
66+
"node3", "node4", Relationship("relationship_type")
67+
),
68+
],
69+
)
70+
assert (
71+
"relationship_type"
72+
in query_executor_with_statistics.relationship_metric_by_relationship_type
73+
)
74+
assert (
75+
call(
76+
query_executor_with_statistics.relationship_metric_by_relationship_type[
77+
"relationship_type"
78+
],
79+
2,
80+
)
81+
in metrics.increment.call_args_list
82+
)
83+
assert (
84+
call(NodestreamMetricRegistry.RELATIONSHIPS_UPSERTED, 2)
85+
in metrics.increment.call_args_list
4286
)
43-
metrics.increment.assert_called_once_with(Metric.RELATIONSHIPS_UPSERTED, 2)
4487

4588

4689
@pytest.mark.asyncio
@@ -53,7 +96,9 @@ async def test_perform_ttl_op_increments_counter_by_one(
5396
query_executor_with_statistics.inner.perform_ttl_op.assert_awaited_once_with(
5497
"config"
5598
)
56-
metrics.increment.assert_called_once_with(Metric.TIME_TO_LIVE_OPERATIONS)
99+
metrics.increment.assert_called_once_with(
100+
NodestreamMetricRegistry.TIME_TO_LIVE_OPERATIONS
101+
)
57102

58103

59104
@pytest.mark.asyncio
@@ -66,7 +111,9 @@ async def test_execute_hook_increments_counter_by_one(
66111
query_executor_with_statistics.inner.execute_hook.assert_awaited_once_with(
67112
"hook"
68113
)
69-
metrics.increment.assert_called_once_with(Metric.INGEST_HOOKS_EXECUTED)
114+
metrics.increment.assert_called_once_with(
115+
NodestreamMetricRegistry.INGEST_HOOKS_EXECUTED
116+
)
70117

71118

72119
@pytest.mark.asyncio

tests/unit/pipeline/test_pipeline.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import pytest
22

3+
from nodestream.pipeline import Extractor
4+
from nodestream.pipeline.object_storage import NullObjectStore
35
from nodestream.pipeline.pipeline import (
6+
Pipeline,
47
PipelineOutput,
58
PipelineProgressReporter,
69
Step,
@@ -9,6 +12,7 @@
912
StepInput,
1013
StepOutput,
1114
)
15+
from nodestream.pipeline.channel import Channel
1216

1317

1418
@pytest.fixture
@@ -136,3 +140,43 @@ def on_start_callback():
136140

137141
output.call_handling_errors(output.reporter.on_start_callback)
138142
output.reporter.logger.exception.assert_called_once()
143+
144+
145+
class TestExtractor(Extractor):
146+
async def extract_records(self):
147+
return None
148+
149+
150+
class TestStep2(Step):
151+
async def process_record(self, record, context: StepContext):
152+
yield record
153+
154+
155+
@pytest.mark.asyncio
156+
async def test_pipeline_channels_obtain_input_and_output_names_from_steps(mocker):
157+
channel = mocker.Mock(Channel)
158+
mock_input = mocker.Mock(StepInput, channel=channel)
159+
mock_output = mocker.Mock(StepOutput, channel=channel)
160+
mock_input.get.return_value = None
161+
mock_output.put.return_value = True
162+
with mocker.patch(
163+
"nodestream.pipeline.pipeline.channel", return_value=(mock_input, mock_output)
164+
):
165+
step1 = TestExtractor()
166+
step2 = TestStep2()
167+
pipeline = Pipeline(
168+
steps=[step1, step2], step_outbox_size=1, object_store=NullObjectStore()
169+
)
170+
await pipeline.run(
171+
reporter=PipelineProgressReporter(
172+
on_start_callback=lambda: None, logger=mocker.Mock()
173+
)
174+
)
175+
assert mock_input.register.call_args_list == [
176+
mocker.call("TestStep2"),
177+
mocker.call("TestExtractor"),
178+
]
179+
assert mock_output.register.call_args_list == [
180+
mocker.call("TestStep2"),
181+
mocker.call("TestExtractor"),
182+
]

tests/unit/test_metrics.py

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,43 @@
77
ConsoleMetricHandler,
88
JsonLogMetricHandler,
99
Metric,
10+
NodestreamMetricRegistry,
1011
Metrics,
1112
NullMetricHandler,
1213
PrometheusMetricHandler,
1314
)
1415

1516

17+
def test_metric_registry_contains_subclasses_in_all_metrics():
18+
for metric in NodestreamMetricRegistry.get_all_metrics().values():
19+
assert metric in NodestreamMetricRegistry._subclasses
20+
21+
22+
def test_metric_increment_on_handler_increments_metric_on_handler(mocker):
23+
handler = mocker.Mock()
24+
metric = NodestreamMetricRegistry.RECORDS
25+
metric.increment_on(handler, 1)
26+
handler.increment.assert_called_once_with(metric, 1)
27+
28+
29+
def test_metric_decrement_on_handler_decrements_metric_on_handler(mocker):
30+
handler = mocker.Mock()
31+
metric = NodestreamMetricRegistry.RECORDS
32+
metric.decrement_on(handler, 1)
33+
handler.decrement.assert_called_once_with(metric, 1)
34+
35+
36+
def test_prometheus_metric_handler_registers_new_metrics(mocker):
37+
handler = PrometheusMetricHandler()
38+
metric = Metric("new_metric", "New metric")
39+
metric.register(handler)
40+
assert metric in handler.instruments_by_metric.keys()
41+
42+
1643
def test_null_metric_handler():
1744
handler = NullMetricHandler()
18-
handler.increment(Metric.RECORDS, 1)
19-
handler.decrement(Metric.RECORDS, 1)
45+
handler.increment(NodestreamMetricRegistry.RECORDS, 1)
46+
handler.decrement(NodestreamMetricRegistry.RECORDS, 1)
2047
# No assertions needed as NullMetricHandler does nothing
2148

2249

@@ -26,25 +53,27 @@ def test_prometheus_metric_handler(mocker):
2653
handler = PrometheusMetricHandler()
2754
handler.start()
2855
mock_start_http_server.assert_called_once()
29-
handler.increment(Metric.RECORDS, 1)
30-
handler.decrement(Metric.RECORDS, 1)
56+
handler.increment(NodestreamMetricRegistry.RECORDS, 1)
57+
handler.decrement(NodestreamMetricRegistry.RECORDS, 1)
3158
handler.stop()
3259

3360

3461
def test_console_metric_handler(mocker):
3562
mock_command = mocker.Mock()
3663
handler = ConsoleMetricHandler(mock_command)
37-
handler.increment(Metric.RECORDS, 1)
38-
handler.decrement(Metric.RECORDS, 1)
64+
handler.increment(NodestreamMetricRegistry.RECORDS, 1)
65+
handler.decrement(NodestreamMetricRegistry.RECORDS, 1)
66+
handler.tick()
3967
handler.stop()
4068
mock_command.table.assert_called_once()
4169

4270

4371
def test_json_log_metric_handler(mocker):
4472
mock_logger = mocker.patch("nodestream.metrics.getLogger").return_value
4573
handler = JsonLogMetricHandler()
46-
handler.increment(Metric.RECORDS, 1)
47-
handler.decrement(Metric.RECORDS, 1)
74+
handler.increment(NodestreamMetricRegistry.RECORDS, 1)
75+
handler.decrement(NodestreamMetricRegistry.RECORDS, 1)
76+
handler.tick()
4877
handler.stop()
4978
mock_logger.info.assert_called_once()
5079

@@ -56,12 +85,12 @@ def test_aggregate_handler(mocker):
5685
handler.start()
5786
mock_handler1.start.assert_called_once()
5887
mock_handler2.start.assert_called_once()
59-
handler.increment(Metric.RECORDS, 1)
60-
mock_handler1.increment.assert_called_once_with(Metric.RECORDS, 1)
61-
mock_handler2.increment.assert_called_once_with(Metric.RECORDS, 1)
62-
handler.decrement(Metric.RECORDS, 1)
63-
mock_handler1.decrement.assert_called_once_with(Metric.RECORDS, 1)
64-
mock_handler2.decrement.assert_called_once_with(Metric.RECORDS, 1)
88+
handler.increment(NodestreamMetricRegistry.RECORDS, 1)
89+
mock_handler1.increment.assert_called_once_with(NodestreamMetricRegistry.RECORDS, 1)
90+
mock_handler2.increment.assert_called_once_with(NodestreamMetricRegistry.RECORDS, 1)
91+
handler.decrement(NodestreamMetricRegistry.RECORDS, 1)
92+
mock_handler1.decrement.assert_called_once_with(NodestreamMetricRegistry.RECORDS, 1)
93+
mock_handler2.decrement.assert_called_once_with(NodestreamMetricRegistry.RECORDS, 1)
6594
handler.stop()
6695
mock_handler1.stop.assert_called_once()
6796
mock_handler2.stop.assert_called_once()
@@ -70,8 +99,8 @@ def test_aggregate_handler(mocker):
7099
def test_metrics_context(mocker):
71100
handler = mocker.Mock()
72101
with Metrics.capture(handler) as metrics:
73-
metrics.increment(Metric.RECORDS, 1)
74-
handler.increment.assert_called_once_with(Metric.RECORDS, 1)
102+
metrics.increment(NodestreamMetricRegistry.RECORDS, 1)
103+
handler.increment.assert_called_once_with(NodestreamMetricRegistry.RECORDS, 1)
75104
handler.stop.assert_called_once()
76105

77106

0 commit comments

Comments
 (0)