Skip to content

Commit fca2a02

Browse files
authored
Merge branch 'main' into splunk_extractor
2 parents 3a6db6d + d32b3cd commit fca2a02

26 files changed

+2163
-1471
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,5 @@ cython_debug/
160160
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
161161
# and can be added to the global gitignore or merged into this file. For a more nuclear
162162
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
163-
.idea/
163+
.idea/
164+
pytest.xml

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ venv: poetry.lock
2929
format: venv
3030
poetry run black nodestream tests
3131
poetry run isort nodestream tests
32+
poetry run ruff check nodestream tests --fix
3233

3334
.PHONY: lint
3435
lint: venv
3536
poetry run black nodestream tests --check
3637
poetry run ruff check nodestream tests
38+
poetry run isort nodestream tests --check-only
3739

3840
.PHONY: test-unit
3941
test-unit: venv

nodestream/cli/operations/run_copy.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from ...databases import Copier, GraphDatabaseWriter
44
from ...pipeline import Pipeline
55
from ...pipeline.object_storage import ObjectStore
6+
from ...pipeline.progress_reporter import PipelineProgressReporter
67
from ...project import Project, Target
78
from ..commands.nodestream_command import NodestreamCommand
89
from .operation import Operation
@@ -20,12 +21,13 @@ def __init__(
2021
self.from_target = from_target
2122
self.to_target = to_target
2223
self.project = project
24+
self.schema = self.project.get_schema()
2325
self.node_types = node_types
2426
self.relationship_types = relationship_types
2527

2628
async def perform(self, command: NodestreamCommand):
2729
pipeline = self.build_pipeline()
28-
await pipeline.run()
30+
await pipeline.run(reporter=PipelineProgressReporter())
2931

3032
def build_pipeline(self) -> Pipeline:
3133
copier = self.build_copier()
@@ -37,7 +39,7 @@ def build_pipeline(self) -> Pipeline:
3739
def build_copier(self) -> Copier:
3840
return Copier(
3941
self.from_target.make_type_retriever(),
40-
self.project,
42+
self.schema,
4143
self.node_types,
4244
self.relationship_types,
4345
)

nodestream/cli/operations/run_pipeline.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from logging import getLogger
12
from typing import Iterable, Optional
23

34
from yaml import safe_dump
@@ -113,7 +114,7 @@ def get_progress_indicator(
113114
self, command: NodestreamCommand, pipeline_name: str
114115
) -> "ProgressIndicator":
115116
if command.has_json_logging_set:
116-
return ProgressIndicator(command, pipeline_name)
117+
return JsonProgressIndicator(command, pipeline_name)
117118

118119
return SpinnerProgressIndicator(command, pipeline_name)
119120

@@ -160,14 +161,15 @@ def on_start(self):
160161
self.progress = self.command.progress_indicator()
161162
self.progress.start(f"Running pipeline: '{self.pipeline_name}'")
162163

163-
def progress_callback(self, index, _):
164+
def progress_callback(self, index, metrics: Metrics):
164165
self.progress.set_message(
165166
f"Currently processing record at index: <info>{index}</info>"
166167
)
168+
metrics.tick()
167169

168170
def on_finish(self, metrics: Metrics):
169171
self.progress.finish(f"Finished running pipeline: '{self.pipeline_name}'")
170-
172+
metrics.tick()
171173
if self.exception:
172174
raise self.exception
173175

@@ -176,3 +178,27 @@ def on_fatal_error(self, exception: Exception):
176178
"<error>Encountered a fatal error while running pipeline</error>"
177179
)
178180
self.exception = exception
181+
182+
183+
class JsonProgressIndicator(ProgressIndicator):
184+
def __init__(self, command: NodestreamCommand, pipeline_name: str) -> None:
185+
super().__init__(command, pipeline_name)
186+
self.logger = getLogger()
187+
self.exception = None
188+
189+
def on_start(self):
190+
self.logger.info("Starting Pipeline")
191+
192+
def progress_callback(self, index, metrics: Metrics):
193+
self.logger.info("Processing Record", extra={"index": index})
194+
metrics.tick()
195+
196+
def on_finish(self, metrics: Metrics):
197+
self.logger.info("Pipeline Completed")
198+
metrics.tick()
199+
if self.exception:
200+
raise self.exception
201+
202+
def on_fatal_error(self, exception: Exception):
203+
self.logger.error("Pipeline Failed", exc_info=exception)
204+
self.exception = exception

nodestream/databases/query_executor_with_statistics.py

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
from typing import Iterable
22

3-
from ..metrics import Metric, Metrics
3+
from ..metrics import (
4+
INGEST_HOOKS_EXECUTED,
5+
NODES_UPSERTED,
6+
RELATIONSHIPS_UPSERTED,
7+
TIME_TO_LIVE_OPERATIONS,
8+
Metric,
9+
Metrics,
10+
)
411
from ..model import IngestionHook, Node, RelationshipWithNodes, TimeToLiveConfiguration
512
from .query_executor import (
613
OperationOnNodeIdentity,
@@ -14,12 +21,45 @@ class QueryExecutorWithStatistics(QueryExecutor):
1421

1522
def __init__(self, inner: QueryExecutor) -> None:
1623
self.inner = inner
24+
self.node_metric_by_type: dict[str, Metric] = {}
25+
self.relationship_metric_by_relationship_type: dict[str, Metric] = {}
26+
27+
def _get_or_create_node_metric(self, node_type: str) -> Metric:
28+
"""Get or create a metric for a node type."""
29+
if node_type not in self.node_metric_by_type:
30+
self.node_metric_by_type[node_type] = Metric(
31+
f"nodes_upserted_by_type_{node_type}",
32+
f"Number of nodes upserted by type {node_type}",
33+
)
34+
return self.node_metric_by_type[node_type]
35+
36+
def _get_or_create_relationship_metric(self, relationship_type: str) -> Metric:
37+
"""Get or create a metric for a relationship type."""
38+
if relationship_type not in self.relationship_metric_by_relationship_type:
39+
self.relationship_metric_by_relationship_type[relationship_type] = Metric(
40+
f"relationships_upserted_by_relationship_type_{relationship_type}",
41+
f"Number of relationships upserted by relationship type {relationship_type}",
42+
)
43+
return self.relationship_metric_by_relationship_type[relationship_type]
1744

1845
async def upsert_nodes_in_bulk_with_same_operation(
1946
self, operation: OperationOnNodeIdentity, nodes: Iterable[Node]
2047
):
2148
await self.inner.upsert_nodes_in_bulk_with_same_operation(operation, nodes)
22-
Metrics.get().increment(Metric.NODES_UPSERTED, len(nodes))
49+
50+
# Tally node types
51+
node_type_counts: dict[str, int] = {}
52+
total_nodes = 0
53+
for node in nodes:
54+
node_type_counts[node.type] = node_type_counts.get(node.type, 0) + 1
55+
total_nodes += 1
56+
57+
# Increment metrics in bulk
58+
metrics = Metrics.get()
59+
metrics.increment(NODES_UPSERTED, total_nodes)
60+
for node_type, count in node_type_counts.items():
61+
metric = self._get_or_create_node_metric(node_type)
62+
metrics.increment(metric, count)
2363

2464
async def upsert_relationships_in_bulk_of_same_operation(
2565
self,
@@ -29,15 +69,31 @@ async def upsert_relationships_in_bulk_of_same_operation(
2969
await self.inner.upsert_relationships_in_bulk_of_same_operation(
3070
shape, relationships
3171
)
32-
Metrics.get().increment(Metric.RELATIONSHIPS_UPSERTED, len(relationships))
72+
73+
# Tally relationship types
74+
relationship_type_counts: dict[str, int] = {}
75+
total_relationships = 0
76+
for relationship in relationships:
77+
rel_type = relationship.relationship.type
78+
relationship_type_counts[rel_type] = (
79+
relationship_type_counts.get(rel_type, 0) + 1
80+
)
81+
total_relationships += 1
82+
83+
# Increment metrics in bulk
84+
metrics = Metrics.get()
85+
metrics.increment(RELATIONSHIPS_UPSERTED, total_relationships)
86+
for rel_type, count in relationship_type_counts.items():
87+
metric = self._get_or_create_relationship_metric(rel_type)
88+
metrics.increment(metric, count)
3389

3490
async def perform_ttl_op(self, config: TimeToLiveConfiguration):
3591
await self.inner.perform_ttl_op(config)
36-
Metrics.get().increment(Metric.TIME_TO_LIVE_OPERATIONS)
92+
Metrics.get().increment(TIME_TO_LIVE_OPERATIONS)
3793

3894
async def execute_hook(self, hook: IngestionHook):
3995
await self.inner.execute_hook(hook)
40-
Metrics.get().increment(Metric.INGEST_HOOKS_EXECUTED)
96+
Metrics.get().increment(INGEST_HOOKS_EXECUTED)
4197

4298
async def finish(self):
4399
await self.inner.finish()

nodestream/databases/writer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def from_file_data(
1414
),
1515
collect_stats: bool = True,
1616
batch_size: int = 1000,
17-
**database_args
17+
**database_args,
1818
):
1919
connector = DatabaseConnector.from_database_args(
2020
database=database, **database_args

0 commit comments

Comments
 (0)