Skip to content

Commit a4e9111

Browse files
committed
Formatting and linting
1 parent 692dcfb commit a4e9111

File tree

5 files changed

+89
-68
lines changed

5 files changed

+89
-68
lines changed

nodestream/metrics.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,16 @@ def decrement_on(self, handler: "MetricHandler", value: Number = 1):
2727
def register(self, handler: "MetricHandler"):
2828
handler.increment(self, 0)
2929
return self
30-
30+
3131
def __str__(self):
3232
return f"{self.name}: {self.value}"
33-
33+
3434
def __hash__(self):
3535
return hash(self.name)
36-
36+
3737
def __eq__(self, other):
3838
return self.name == other.name
39-
39+
4040
def __ne__(self, other):
4141
return self.name != other.name
4242

@@ -88,17 +88,27 @@ def tick(self):
8888

8989
# Core metrics
9090
RECORDS = Metric("records", "Number of records processed", accumulate=True)
91-
NON_FATAL_ERRORS = Metric("non_fatal_errors", "Number of non-fatal errors", accumulate=True)
91+
NON_FATAL_ERRORS = Metric(
92+
"non_fatal_errors", "Number of non-fatal errors", accumulate=True
93+
)
9294
FATAL_ERRORS = Metric("fatal_errors", "Number of fatal errors", accumulate=True)
93-
NODES_UPSERTED = Metric("nodes_upserted", "Number of nodes upserted to the graph", accumulate=True)
95+
NODES_UPSERTED = Metric(
96+
"nodes_upserted", "Number of nodes upserted to the graph", accumulate=True
97+
)
9498
RELATIONSHIPS_UPSERTED = Metric(
95-
"relationships_upserted", "Number of relationships upserted to the graph", accumulate=True
99+
"relationships_upserted",
100+
"Number of relationships upserted to the graph",
101+
accumulate=True,
96102
)
97103
TIME_TO_LIVE_OPERATIONS = Metric(
98-
"time_to_live_operations", "Number of time-to-live operations executed", accumulate=True
104+
"time_to_live_operations",
105+
"Number of time-to-live operations executed",
106+
accumulate=True,
99107
)
100108
INGEST_HOOKS_EXECUTED = Metric(
101-
"ingest_hooks_executed", "Number of ingest hooks executed to the graph", accumulate=True
109+
"ingest_hooks_executed",
110+
"Number of ingest hooks executed to the graph",
111+
accumulate=True,
102112
)
103113
STEPS_RUNNING = Metric(
104114
"steps_running", "Number of steps currently running in the pipeline"
@@ -204,7 +214,7 @@ def increment(self, metric: Metric, value: Number):
204214
self.metrics[metric] = self.metrics.get(metric, 0) + value
205215

206216
def decrement(self, metric: Metric, value: Number):
207-
self.metrics[metric] = self.metrics.get(metric, 0) - value
217+
self.metrics[metric] = self.metrics.get(metric, 0) - value
208218

209219
def discharge(self) -> dict[Metric, Number]:
210220
metrics = {}

nodestream/pipeline/progress_reporter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import os
2+
import time
23
from dataclasses import dataclass, field
34
from logging import Logger, getLogger
45
from typing import Any, Callable, Optional
56

67
from psutil import Process
78

89
from ..metrics import Metrics
9-
import time
1010

1111

1212
def no_op(*_, **__):

tests/unit/cli/operations/test_run_pipeline.py

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import pytest
22
from hamcrest import assert_that, equal_to
3-
from unittest.mock import Mock
43

54
from nodestream.cli.operations.run_pipeline import (
65
WARNING_NO_TARGETS_PROVIDED,
@@ -40,7 +39,7 @@ def test_make_run_request(run_pipeline_operation, mocker):
4039
"step-outbox-size": "10001",
4140
"target": targets,
4241
"time-interval-seconds": None, # No time interval provided
43-
"reporting-frequency": "10000"
42+
"reporting-frequency": "10000",
4443
}
4544
command.option.side_effect = lambda opt: option_responses.get(opt)
4645
command.has_json_logging_set = False # Required for create_progress_reporter
@@ -55,41 +54,49 @@ def test_make_run_request(run_pipeline_operation, mocker):
5554
assert_that(result.progress_reporter.reporting_frequency, equal_to(10000))
5655

5756

58-
def test_create_progress_reporter_with_time_interval_seconds(run_pipeline_operation, mocker):
57+
def test_create_progress_reporter_with_time_interval_seconds(
58+
run_pipeline_operation, mocker
59+
):
5960
"""Test that time_interval_seconds gets properly converted to float and passed to PipelineProgressReporter"""
6061
command = mocker.Mock()
6162
command.option.side_effect = lambda opt: {
6263
"time-interval-seconds": "30.5",
63-
"reporting-frequency": "1000"
64+
"reporting-frequency": "1000",
6465
}.get(opt)
6566
command.has_json_logging_set = False
66-
67+
6768
# Mock PipelineProgressReporter to capture arguments
68-
mock_progress_reporter = mocker.patch("nodestream.cli.operations.run_pipeline.PipelineProgressReporter")
69-
70-
result = run_pipeline_operation.create_progress_reporter(command, "test_pipeline")
71-
69+
mock_progress_reporter = mocker.patch(
70+
"nodestream.cli.operations.run_pipeline.PipelineProgressReporter"
71+
)
72+
73+
_ = run_pipeline_operation.create_progress_reporter(command, "test_pipeline")
74+
7275
# Verify PipelineProgressReporter was called with correct time_interval_seconds
7376
mock_progress_reporter.assert_called_once()
7477
call_args = mock_progress_reporter.call_args
7578
assert_that(call_args.kwargs["time_interval_seconds"], equal_to(30.5))
7679
assert_that(call_args.kwargs["reporting_frequency"], equal_to(1000))
7780

7881

79-
def test_create_progress_reporter_without_time_interval_seconds(run_pipeline_operation, mocker):
82+
def test_create_progress_reporter_without_time_interval_seconds(
83+
run_pipeline_operation, mocker
84+
):
8085
"""Test that time_interval_seconds is None when not provided"""
8186
command = mocker.Mock()
8287
command.option.side_effect = lambda opt: {
8388
"time-interval-seconds": None,
84-
"reporting-frequency": "2000"
89+
"reporting-frequency": "2000",
8590
}.get(opt)
8691
command.has_json_logging_set = False
87-
92+
8893
# Mock PipelineProgressReporter to capture arguments
89-
mock_progress_reporter = mocker.patch("nodestream.cli.operations.run_pipeline.PipelineProgressReporter")
90-
91-
result = run_pipeline_operation.create_progress_reporter(command, "test_pipeline")
92-
94+
mock_progress_reporter = mocker.patch(
95+
"nodestream.cli.operations.run_pipeline.PipelineProgressReporter"
96+
)
97+
98+
_ = run_pipeline_operation.create_progress_reporter(command, "test_pipeline")
99+
93100
# Verify PipelineProgressReporter was called with None for time_interval_seconds
94101
mock_progress_reporter.assert_called_once()
95102
call_args = mock_progress_reporter.call_args
@@ -102,62 +109,68 @@ def test_create_progress_reporter_with_json_indicator(run_pipeline_operation, mo
102109
command = mocker.Mock()
103110
command.option.side_effect = lambda opt: {
104111
"time-interval-seconds": "15.0",
105-
"reporting-frequency": "500"
112+
"reporting-frequency": "500",
106113
}.get(opt)
107114
command.has_json_logging_set = True
108-
115+
109116
# Mock PipelineProgressReporter to capture arguments
110-
mock_progress_reporter = mocker.patch("nodestream.cli.operations.run_pipeline.PipelineProgressReporter")
111-
112-
result = run_pipeline_operation.create_progress_reporter(command, "test_pipeline")
113-
117+
mock_progress_reporter = mocker.patch(
118+
"nodestream.cli.operations.run_pipeline.PipelineProgressReporter"
119+
)
120+
121+
_ = run_pipeline_operation.create_progress_reporter(command, "test_pipeline")
122+
114123
# Verify PipelineProgressReporter was called with correct arguments
115124
mock_progress_reporter.assert_called_once()
116125
call_args = mock_progress_reporter.call_args
117126
assert_that(call_args.kwargs["time_interval_seconds"], equal_to(15.0))
118127
assert_that(call_args.kwargs["reporting_frequency"], equal_to(500))
119128

120129

121-
def test_make_run_request_with_time_interval_seconds_integration(run_pipeline_operation, mocker):
130+
def test_make_run_request_with_time_interval_seconds_integration(
131+
run_pipeline_operation, mocker
132+
):
122133
"""Integration test to ensure make_run_request properly handles time_interval_seconds through create_progress_reporter"""
123134
annotations = ["annotation1"]
124135
targets = ["t1"]
125136
pipeline_name = "my_pipeline"
126137
command = mocker.Mock()
127-
138+
128139
# Setup command.option to handle all the different option calls made by make_run_request
129140
option_responses = {
130141
"storage-backend": "my-storage",
131142
"annotations": annotations,
132143
"step-outbox-size": "10001",
133144
"target": targets,
134145
"time-interval-seconds": "45.0",
135-
"reporting-frequency": "5000"
146+
"reporting-frequency": "5000",
136147
}
137148
command.option.side_effect = lambda opt: option_responses.get(opt)
138149
command.has_json_logging_set = False
139150
command.is_very_verbose = False
140151
command.argument.return_value = [pipeline_name]
141-
152+
142153
pipeline = mocker.Mock()
143154
pipeline.name = pipeline_name
144155
pipeline.configuration = PipelineConfiguration()
145-
156+
146157
# Mock the project's get_object_storage_by_name method
147158
run_pipeline_operation.project.get_object_storage_by_name.return_value = None
148159
run_pipeline_operation.project.get_target_by_name.return_value = None
149-
160+
150161
# Mock PipelineProgressReporter to capture its arguments
151-
mock_progress_reporter = mocker.patch("nodestream.cli.operations.run_pipeline.PipelineProgressReporter")
152-
162+
mock_progress_reporter = mocker.patch(
163+
"nodestream.cli.operations.run_pipeline.PipelineProgressReporter"
164+
)
165+
153166
result = run_pipeline_operation.make_run_request(command, pipeline)
154-
167+
155168
# Verify the progress reporter was created with correct time_interval_seconds
156169
mock_progress_reporter.assert_called_once()
157170
call_args = mock_progress_reporter.call_args
158171
assert_that(call_args.kwargs["time_interval_seconds"], equal_to(45.0))
159172
assert_that(call_args.kwargs["reporting_frequency"], equal_to(5000))
160-
173+
161174
# Verify other parts of the request are still correct
162175
assert_that(result.pipeline_name, equal_to(pipeline_name))
163176
assert_that(result.initialization_arguments.annotations, equal_to(annotations))

tests/unit/pipeline/test_pipeline_progress_reporter.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
from unittest.mock import Mock
2+
13
import pytest
24
from hamcrest import assert_that, equal_to
3-
from unittest.mock import Mock
45

5-
from nodestream.pipeline import IterableExtractor, Pipeline, PipelineProgressReporter
66
from nodestream.metrics import Metrics
7+
from nodestream.pipeline import IterableExtractor, Pipeline, PipelineProgressReporter
78

89

910
@pytest.mark.asyncio
@@ -25,32 +26,29 @@ def test_pipeline_progress_reporter_with_time_interval_seconds(mocker):
2526
"""Test that time_interval_seconds works correctly"""
2627
mock_callback = Mock()
2728
reporter = PipelineProgressReporter(
28-
time_interval_seconds=0.1,
29-
callback=mock_callback
29+
time_interval_seconds=0.1, callback=mock_callback
3030
)
31-
32-
mock_time = mocker.patch('nodestream.pipeline.progress_reporter.time.time')
31+
32+
mock_time = mocker.patch("nodestream.pipeline.progress_reporter.time.time")
3333
mock_time.side_effect = [0.15, 0.2] # 150ms, 200ms
34-
34+
3535
metrics = Metrics()
3636
reporter.report(1, metrics) # Should report (150ms >= 100ms from 0)
3737
reporter.report(2, metrics) # Should not report (200ms - 150ms = 50ms < 100ms)
38-
38+
3939
assert_that(mock_callback.call_count, equal_to(1))
4040

4141

4242
def test_pipeline_progress_reporter_without_time_interval_uses_frequency():
4343
"""Test that None time_interval_seconds falls back to frequency-based reporting"""
4444
mock_callback = Mock()
4545
reporter = PipelineProgressReporter(
46-
time_interval_seconds=None,
47-
reporting_frequency=3,
48-
callback=mock_callback
46+
time_interval_seconds=None, reporting_frequency=3, callback=mock_callback
4947
)
50-
48+
5149
metrics = Metrics()
5250
for i in range(1, 7): # 1,2,3,4,5,6
5351
reporter.report(i, metrics)
54-
52+
5553
# Should report on multiples of 3: indices 3, 6
5654
assert_that(mock_callback.call_count, equal_to(2))

tests/unit/test_metrics.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,11 @@ def test_metric_equality_and_hash():
147147
metric1 = Metric("test_metric", "Test description")
148148
metric2 = Metric("test_metric", "Different description")
149149
metric3 = Metric("different_metric", "Test description")
150-
150+
151151
# Test __eq__ and __ne__
152152
assert metric1 == metric2 # Same name
153153
assert metric1 != metric3 # Different name
154-
154+
155155
# Test __hash__
156156
assert hash(metric1) == hash(metric2) # Same name should have same hash
157157

@@ -160,19 +160,19 @@ def test_console_metric_handler_discharge_with_accumulate(mocker):
160160
"""Test that ConsoleMetricHandler discharge resets accumulating metrics"""
161161
mock_command = mocker.Mock()
162162
handler = ConsoleMetricHandler(mock_command)
163-
163+
164164
accumulating_metric = Metric("test_accumulate", accumulate=True)
165165
non_accumulating_metric = Metric("test_no_accumulate", accumulate=False)
166-
166+
167167
handler.increment(accumulating_metric, 5)
168168
handler.increment(non_accumulating_metric, 3)
169-
169+
170170
result = handler.discharge()
171-
171+
172172
# Should return metric names as keys
173173
assert result["test_accumulate"] == 5
174174
assert result["test_no_accumulate"] == 3
175-
175+
176176
# Accumulating metric should be reset to 0, non-accumulating should remain
177177
assert handler.metrics[accumulating_metric] == 0
178178
assert handler.metrics[non_accumulating_metric] == 3
@@ -181,19 +181,19 @@ def test_console_metric_handler_discharge_with_accumulate(mocker):
181181
def test_json_log_metric_handler_discharge_with_accumulate(mocker):
182182
"""Test that JsonLogMetricHandler discharge resets accumulating metrics"""
183183
handler = JsonLogMetricHandler()
184-
184+
185185
accumulating_metric = Metric("test_accumulate", accumulate=True)
186186
non_accumulating_metric = Metric("test_no_accumulate", accumulate=False)
187-
187+
188188
handler.increment(accumulating_metric, 10)
189189
handler.increment(non_accumulating_metric, 7)
190-
190+
191191
result = handler.discharge()
192-
192+
193193
# Should return metric names as keys
194194
assert result["test_accumulate"] == 10
195195
assert result["test_no_accumulate"] == 7
196-
196+
197197
# Accumulating metric should be reset to 0, non-accumulating should remain
198198
assert handler.metrics[accumulating_metric] == 0
199199
assert handler.metrics[non_accumulating_metric] == 7

0 commit comments

Comments
 (0)