Skip to content

Commit 002c8e8

Browse files
Merge pull request #34 from nodestream-proj/RecordMetrics
Recording Database Runtime Metrics
2 parents 5e7a59b + b3158b6 commit 002c8e8

File tree

11 files changed

+1112
-136
lines changed

11 files changed

+1112
-136
lines changed

Makefile

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ venv: poetry.lock
2727

2828
.PHONY: format
2929
format: venv
30-
poetry run black nodestream_plugin_neo4j tests
30+
poetry run ruff format nodestream_plugin_neo4j tests
3131
poetry run isort nodestream_plugin_neo4j tests
32+
poetry run black nodestream_plugin_neo4j tests
3233

3334
.PHONY: lint
34-
lint: venv
35+
lint: venv
36+
poetry run ruff check nodestream_plugin_neo4j tests
37+
poetry run isort nodestream_plugin_neo4j tests --check
3538
poetry run black nodestream_plugin_neo4j tests --check
36-
poetry run ruff nodestream_plugin_neo4j tests
3739

3840
.PHONY: test-unit
3941
test-unit: venv

nodestream_plugin_neo4j/database_connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def from_file_data(
2626
execute_chunks_in_parallel: bool = True,
2727
retries_per_chunk: int = 3,
2828
_experimental_set_first_ingested_at: bool = False,
29-
**connection_args
29+
**connection_args,
3030
):
3131
database_connection = Neo4jDatabaseConnection.from_configuration(
3232
**connection_args

nodestream_plugin_neo4j/extractor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def from_file_data(
1515
query: str,
1616
parameters: Optional[Dict[str, Any]] = None,
1717
limit: int = 100,
18-
**connection_args
18+
**connection_args,
1919
):
2020
connector = Neo4jDatabaseConnection.from_configuration(**connection_args)
2121
return cls(query, connector, parameters, limit)

nodestream_plugin_neo4j/neo4j_database.py

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@
22
from logging import getLogger
33
from typing import Awaitable, Iterable, Tuple, Union
44

5-
from neo4j import AsyncDriver, AsyncGraphDatabase, AsyncSession, Record, RoutingControl
5+
from neo4j import (
6+
AsyncDriver,
7+
AsyncGraphDatabase,
8+
AsyncSession,
9+
EagerResult,
10+
Record,
11+
RoutingControl,
12+
)
613
from neo4j.auth_management import AsyncAuthManagers
714
from neo4j.exceptions import (
815
AuthError,
@@ -13,6 +20,7 @@
1320
from nodestream.file_io import LazyLoadedArgument
1421

1522
from .query import Query
23+
from .result import Neo4jQueryStatistics, Neo4jResult
1624

1725
RETRYABLE_EXCEPTIONS = (TransientError, ServiceUnavailable, SessionExpired, AuthError)
1826

@@ -74,6 +82,7 @@ def __init__(
7482
self.max_retry_attempts = max_retry_attempts
7583
self.retry_factor = retry_factor
7684
self._driver = None
85+
self.query_set: set[Query] = set()
7786

7887
def acquire_driver(self) -> AsyncDriver:
7988
self._driver = self.driver_factory()
@@ -85,38 +94,39 @@ def driver(self):
8594
return self._driver
8695

8796
def log_query_start(self, query: Query):
88-
self.logger.info(
89-
"Executing Cypher Query to Neo4j",
90-
extra={
91-
"query": query.query_statement,
92-
"uri": self.driver._pool.address.host,
93-
},
94-
)
95-
96-
def log_record(self, record: Record):
97-
self.logger.debug(
98-
"Gathered Query Results",
99-
extra=dict(**record, uri=self.driver._pool.address.host),
100-
)
97+
if query.query_statement not in self.query_set:
98+
self.logger.info(
99+
"Executing Cypher Query to Neo4j.",
100+
extra={
101+
"query": query.query_statement,
102+
"uri": self.driver._pool.address.host,
103+
},
104+
)
105+
self.query_set.add(query.query_statement)
101106

102107
async def _execute_query(
103108
self,
104109
query: Query,
105110
log_result: bool = False,
106111
routing_=RoutingControl.WRITE,
107-
) -> Record:
108-
result = await self.driver.execute_query(
112+
) -> Iterable[Record]:
113+
result: EagerResult = await self.driver.execute_query(
109114
query.query_statement,
110115
query.parameters,
111116
database_=self.database_name,
112117
routing_=routing_,
113118
)
114-
records = result.records
119+
neo4j_result = Neo4jResult(query, result)
115120
if log_result:
116-
for record in records:
117-
self.log_record(record)
121+
statistics = neo4j_result.obtain_query_statistics()
122+
self.log_error_messages_from_statistics(statistics)
123+
statistics.update_metrics_from_summary()
124+
125+
return neo4j_result.records
118126

119-
return records
127+
def log_error_messages_from_statistics(self, statistics: Neo4jQueryStatistics):
128+
for error in statistics.error_messages:
129+
self.logger.error("Query Error Occurred.", extra={"error": error})
120130

121131
async def execute(
122132
self,
@@ -132,7 +142,8 @@ async def execute(
132142
return await self._execute_query(query, log_result, routing_)
133143
except RETRYABLE_EXCEPTIONS as e:
134144
self.logger.warning(
135-
f"Error executing query, retrying. Attempt {attempts + 1}",
145+
"Error executing query, retrying. Attempt %s",
146+
attempts,
136147
exc_info=e,
137148
)
138149
await asyncio.sleep(self.retry_factor * attempts)

nodestream_plugin_neo4j/query.py

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,58 @@
1-
from dataclasses import dataclass
2-
from typing import Any, Dict, List
1+
from dataclasses import dataclass, field
2+
from typing import Any
3+
4+
5+
@dataclass
6+
class ApocUpdateStatistics:
7+
"""Type definition for APOC update statistics."""
8+
9+
relationshipsDeleted: int = 0 # Number of relationships deleted
10+
relationshipsCreated: int = 0 # Number of relationships created
11+
nodesDeleted: int = 0 # Number of nodes deleted
12+
nodesCreated: int = 0 # Number of nodes created
13+
labelsRemoved: int = 0 # Number of labels removed
14+
labelsAdded: int = 0 # Number of labels added
15+
propertiesSet: int = 0 # Number of properties set
16+
17+
18+
@dataclass
19+
class ApocBatchResponse:
20+
"""Type definition for APOC periodic.iterate response fields."""
21+
22+
batches: int = 0 # Number of batches processed
23+
total: int = 0 # Total number of operations
24+
timeTaken: int = 0 # Time taken in milliseconds
25+
committedOperations: int = 0 # Number of operations committed
26+
failedOperations: int = 0 # Number of operations that failed
27+
failedBatches: int = 0 # Number of batches that failed
28+
retries: int = 0 # Number of retries performed
29+
errorMessages: dict[str, Any] = field(default_factory=dict) # Map of error messages
30+
wasTerminated: bool = False # Whether the operation was terminated
31+
updateStatistics: ApocUpdateStatistics = field(default_factory=ApocUpdateStatistics)
32+
33+
34+
# Get fields from ApocBatchResponse dataclass
35+
APOC_BATCH_QUERY_RESPONSE_FIELDS = [
36+
field.name for field in ApocBatchResponse.__dataclass_fields__.values()
37+
]
338

439
UNWIND_QUERY = "UNWIND $batched_parameter_sets as params RETURN params"
5-
COMMIT_QUERY = """
40+
41+
# Build the YIELD and RETURN clauses using the standardized fields
42+
YIELD_CLAUSE = f"YIELD {', '.join(APOC_BATCH_QUERY_RESPONSE_FIELDS)}"
43+
RETURN_CLAUSE = f"RETURN {', '.join(APOC_BATCH_QUERY_RESPONSE_FIELDS)}"
44+
45+
COMMIT_QUERY = f"""
646
CALL apoc.periodic.iterate(
747
$iterable_query,
848
$batched_query,
9-
{batchSize: $chunk_size, parallel: $execute_chunks_in_parallel, retries: $retries_per_chunk, params: $iterate_params}
49+
{{batchSize: $chunk_size, parallel: $execute_chunks_in_parallel, retries: $retries_per_chunk, params: $iterate_params}}
1050
)
11-
YIELD batches, committedOperations, failedOperations, errorMessages
12-
RETURN batches, committedOperations, failedOperations, errorMessages
51+
{YIELD_CLAUSE}
52+
{RETURN_CLAUSE}
1353
"""
1454

15-
NON_APOCH_COMMIT_QUERY = """
55+
NON_APOC_COMMIT_QUERY = """
1656
UNWIND $iterate_params.batched_parameter_sets AS param
1757
CALL apoc.cypher.doIt($batched_query, {params: param})
1858
YIELD value
@@ -23,7 +63,8 @@
2363
@dataclass(slots=True, frozen=True)
2464
class Query:
2565
query_statement: str
26-
parameters: Dict[str, Any]
66+
parameters: dict[str, Any]
67+
is_apoc: bool = False # Indicates if this is an APOC query
2768

2869
@classmethod
2970
def from_statement(cls, query_statement: str, **parameters: Any) -> "Query":
@@ -47,13 +88,15 @@ def feed_batched_query(
4788
"chunk_size": chunk_size,
4889
"retries_per_chunk": retries_per_chunk,
4990
},
91+
is_apoc=True, # This is an APOC query
5092
)
5193

5294

5395
@dataclass(slots=True, frozen=True)
5496
class QueryBatch:
5597
query_statement: str
56-
batched_parameter_sets: List[Dict[str, Any]]
98+
batched_parameter_sets: list[dict[str, Any]]
99+
is_apoc: bool = False # Indicates if this is an APOC query
57100

58101
def as_query(
59102
self,
@@ -63,7 +106,7 @@ def as_query(
63106
retries_per_chunk: int = 3,
64107
) -> Query:
65108
return Query(
66-
{True: COMMIT_QUERY, False: NON_APOCH_COMMIT_QUERY}[apoc_iterate],
109+
{True: COMMIT_QUERY, False: NON_APOC_COMMIT_QUERY}[apoc_iterate],
67110
{
68111
"iterate_params": {
69112
"batched_parameter_sets": self.batched_parameter_sets
@@ -74,4 +117,5 @@ def as_query(
74117
"chunk_size": chunk_size,
75118
"retries_per_chunk": retries_per_chunk,
76119
},
120+
is_apoc=apoc_iterate, # Set is_apoc based on apoc_iterate parameter
77121
)

0 commit comments

Comments
 (0)