Skip to content

Commit 1f57dc0

Browse files
committed
Saving
1 parent a9e165c commit 1f57dc0

File tree

3 files changed

+32
-42
lines changed

3 files changed

+32
-42
lines changed

nodestream_plugin_neo4j/neo4j_database.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ async def _execute_query(
106106
query: Query,
107107
log_result: bool = False,
108108
routing_=RoutingControl.WRITE,
109-
) -> Record:
109+
) -> Iterable[Record]:
110110
result: EagerResult = await self.driver.execute_query(
111111
query.query_statement,
112112
query.parameters,
@@ -119,11 +119,12 @@ async def _execute_query(
119119
self.log_error_messages_from_statistics(statistics)
120120
statistics.update_metrics_from_summary()
121121

122-
return neo4j_result.records
122+
for record in neo4j_result.records:
123+
yield record
123124

124125
def log_error_messages_from_statistics(self, statistics: Neo4jQueryStatistics):
125126
for error in statistics.error_messages:
126-
self.logger.error(f"Query Errors Occured: {error}")
127+
self.logger.error("Query Error Occurred: %s", error)
127128

128129
async def execute(
129130
self,

nodestream_plugin_neo4j/query.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from dataclasses import dataclass
1+
from dataclasses import dataclass, field
22
from typing import Any
33

44

@@ -26,15 +26,9 @@ class ApocBatchResponse:
2626
failedOperations: int = 0 # Number of operations that failed
2727
failedBatches: int = 0 # Number of batches that failed
2828
retries: int = 0 # Number of retries performed
29-
errorMessages: dict[str, Any] = None # Map of error messages
29+
errorMessages: dict[str, Any] = field(default_factory=dict) # Map of error messages
3030
wasTerminated: bool = False # Whether the operation was terminated
31-
updateStatistics: ApocUpdateStatistics = None # Statistics about updates
32-
33-
def __post_init__(self):
34-
if self.errorMessages is None:
35-
self.errorMessages = {}
36-
if self.updateStatistics is None:
37-
self.updateStatistics = ApocUpdateStatistics()
31+
updateStatistics: ApocUpdateStatistics = field(default_factory=ApocUpdateStatistics)
3832

3933

4034
# Get fields from ApocBatchResponse dataclass
@@ -58,7 +52,7 @@ def __post_init__(self):
5852
{RETURN_CLAUSE}
5953
"""
6054

61-
NON_APOCH_COMMIT_QUERY = """
55+
NON_APOC_COMMIT_QUERY = """
6256
UNWIND $iterate_params.batched_parameter_sets AS param
6357
CALL apoc.cypher.doIt($batched_query, {params: param})
6458
YIELD value
@@ -112,7 +106,7 @@ def as_query(
112106
retries_per_chunk: int = 3,
113107
) -> Query:
114108
return Query(
115-
{True: COMMIT_QUERY, False: NON_APOCH_COMMIT_QUERY}[apoc_iterate],
109+
{True: COMMIT_QUERY, False: NON_APOC_COMMIT_QUERY}[apoc_iterate],
116110
{
117111
"iterate_params": {
118112
"batched_parameter_sets": self.batched_parameter_sets

nodestream_plugin_neo4j/result.py

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -207,34 +207,29 @@ def from_result(
207207

208208
def update_metrics_from_summary(self):
209209
metrics = Metrics.get()
210-
# Timing metrics
211-
metrics.increment(PLANNING_TIME, self.timing.planning_time_ms)
212-
metrics.increment(PROCESSING_TIME, self.timing.processing_time_ms)
213-
metrics.increment(TOTAL_TIME, self.timing.total_time_ms)
214-
metrics.increment(APOC_TIME, self.timing.apoc_time_ms)
215-
216-
# Write metrics
217-
metrics.increment(NODES_CREATED, self.write_metrics.nodes_created)
218-
metrics.increment(NODES_DELETED, self.write_metrics.nodes_deleted)
219-
metrics.increment(
220-
RELATIONSHIPS_CREATED, self.write_metrics.relationships_created
221-
)
222-
metrics.increment(
223-
RELATIONSHIPS_DELETED, self.write_metrics.relationships_deleted
224-
)
225-
metrics.increment(PROPERTIES_SET, self.write_metrics.properties_set)
226-
metrics.increment(LABELS_ADDED, self.write_metrics.labels_added)
227-
metrics.increment(LABELS_REMOVED, self.write_metrics.labels_removed)
228-
metrics.increment(CONSTRAINTS_ADDED, self.write_metrics.constraints_added)
229-
metrics.increment(CONSTRAINTS_REMOVED, self.write_metrics.constraints_removed)
230-
metrics.increment(INDEXES_ADDED, self.write_metrics.indexes_added)
231-
232-
# APOC specific metrics
233-
metrics.increment(WAS_TERMINATED, int(self.was_terminated))
234-
metrics.increment(RETRIES, self.retries)
235-
236-
# Error tracking
237-
metrics.increment(ERROR_MESSAGES, len(self.error_messages))
210+
211+
metric_updates: list[tuple[Metric, int]] = [
212+
(PLANNING_TIME, self.timing.planning_time_ms),
213+
(PROCESSING_TIME, self.timing.processing_time_ms),
214+
(TOTAL_TIME, self.timing.total_time_ms + self.timing.processing_time_ms),
215+
(APOC_TIME, self.timing.apoc_time_ms),
216+
(NODES_CREATED, self.write_metrics.nodes_created),
217+
(NODES_DELETED, self.write_metrics.nodes_deleted),
218+
(RELATIONSHIPS_CREATED, self.write_metrics.relationships_created),
219+
(RELATIONSHIPS_DELETED, self.write_metrics.relationships_deleted),
220+
(PROPERTIES_SET, self.write_metrics.properties_set),
221+
(LABELS_ADDED, self.write_metrics.labels_added),
222+
(LABELS_REMOVED, self.write_metrics.labels_removed),
223+
(CONSTRAINTS_ADDED, self.write_metrics.constraints_added),
224+
(CONSTRAINTS_REMOVED, self.write_metrics.constraints_removed),
225+
(INDEXES_ADDED, self.write_metrics.indexes_added),
226+
(INDEXES_REMOVED, self.write_metrics.indexes_removed),
227+
(WAS_TERMINATED, int(self.was_terminated)),
228+
(RETRIES, self.retries),
229+
(ERROR_MESSAGES, len(self.error_messages)),
230+
]
231+
232+
metrics.increment(*metric_updates)
238233

239234

240235
class Neo4jResult:

0 commit comments

Comments
 (0)