Skip to content

Commit 225ef60

Browse files
authored
Merge pull request #4 from nodestream-proj/feature/merge-rels
Improve Neo4j Query Performance By Using MERGE
2 parents e9247e9 + c18e7df commit 225ef60

File tree

6 files changed

+69
-30
lines changed

6 files changed

+69
-30
lines changed

nodestream_plugin_neo4j/database_connector.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ def from_file_data(
2222
cls,
2323
use_enterprise_features: bool = False,
2424
use_apoc: bool = True,
25+
chunk_size: int = 1000,
26+
execute_chunks_in_parallel: bool = True,
27+
retries_per_chunk: int = 3,
2528
**connection_args
2629
):
2730
database_connection = Neo4jDatabaseConnection.from_configuration(
@@ -31,21 +34,36 @@ def from_file_data(
3134
database_connection=database_connection,
3235
use_enterprise_features=use_enterprise_features,
3336
use_apoc=use_apoc,
37+
chunk_size=chunk_size,
38+
execute_chunks_in_parallel=execute_chunks_in_parallel,
39+
retries_per_chunk=retries_per_chunk,
3440
)
3541

3642
def __init__(
3743
self,
3844
database_connection: Neo4jDatabaseConnection,
3945
use_apoc: bool,
4046
use_enterprise_features: bool,
47+
chunk_size: int = 1000,
48+
execute_chunks_in_parallel: bool = True,
49+
retries_per_chunk: int = 3,
4150
) -> None:
4251
self.use_enterprise_features = use_enterprise_features
4352
self.use_apoc = use_apoc
4453
self.database_connection = database_connection
54+
self.chunk_size = chunk_size
55+
self.execute_chunks_in_parallel = execute_chunks_in_parallel
56+
self.retries_per_chunk = retries_per_chunk
4557

4658
def make_query_executor(self) -> QueryExecutor:
4759
query_builder = Neo4jIngestQueryBuilder(self.use_apoc)
48-
return Neo4jQueryExecutor(self.database_connection, query_builder)
60+
return Neo4jQueryExecutor(
61+
self.database_connection,
62+
query_builder,
63+
chunk_size=self.chunk_size,
64+
execute_chunks_in_parallel=self.execute_chunks_in_parallel,
65+
retries_per_chunk=self.retries_per_chunk,
66+
)
4967

5068
def make_type_retriever(self) -> TypeRetriever:
5169
return Neo4jTypeRetriever(self.database_connection)

nodestream_plugin_neo4j/ingest_query_builder.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ def _make_relationship(
9797
rel_identity: RelationshipIdentityShape, creation_rule: RelationshipCreationRule
9898
):
9999
keys = generate_properties_set_with_prefix(rel_identity.keys, RELATIONSHIP_REF_NAME)
100-
match_rel_query = (
100+
merge_rel_query = (
101101
QueryBuilder()
102-
.match_optional()
102+
.merge()
103103
.node(ref_name=FROM_NODE_REF_NAME)
104104
.related_to(
105105
ref_name=RELATIONSHIP_REF_NAME,
@@ -109,18 +109,12 @@ def _make_relationship(
109109
.node(ref_name=TO_NODE_REF_NAME)
110110
)
111111

112-
create_rel_query = str(match_rel_query).replace("OPTIONAL MATCH ", "CREATE")
113112
set_properties_query = f"SET {RELATIONSHIP_REF_NAME} += params.{generate_prefixed_param_name(PROPERTIES_PARAM_NAME, RELATIONSHIP_REF_NAME)}"
114113
if creation_rule == RelationshipCreationRule.CREATE:
114+
create_rel_query = str(merge_rel_query).replace("MERGE", "CREATE")
115115
return f"{create_rel_query} {set_properties_query}"
116116

117-
return f"""
118-
{match_rel_query}
119-
FOREACH (x IN CASE WHEN {RELATIONSHIP_REF_NAME} IS NULL THEN [1] ELSE [] END |
120-
{create_rel_query} {set_properties_query})
121-
FOREACH (i in CASE WHEN {RELATIONSHIP_REF_NAME} IS NOT NULL THEN [1] ELSE [] END |
122-
{set_properties_query})
123-
"""
117+
return f"{merge_rel_query} {set_properties_query}"
124118

125119

126120
class Neo4jIngestQueryBuilder:

nodestream_plugin_neo4j/query.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
CALL apoc.periodic.iterate(
77
$iterable_query,
88
$batched_query,
9-
{batchSize: 1000, parallel: true, retries: 3, params: $iterate_params}
9+
{batchSize: $chunk_size, parallel: $execute_chunks_in_parallel, retries: $retries_per_chunk, params: $iterate_params}
1010
)
1111
YIELD batches, committedOperations, failedOperations, errorMessages
1212
RETURN batches, committedOperations, failedOperations, errorMessages
1313
"""
1414

1515
NON_APOCH_COMMIT_QUERY = """
16-
UNWIND $iterate_params.param_sets AS param
16+
UNWIND $iterate_params.batched_parameter_sets AS param
1717
CALL apoc.cypher.doIt($batched_query, {params: param})
1818
YIELD value
1919
RETURN value
@@ -46,7 +46,13 @@ class QueryBatch:
4646
query_statement: str
4747
batched_parameter_sets: List[Dict[str, Any]]
4848

49-
def as_query(self, apoc_iterate: bool) -> Query:
49+
def as_query(
50+
self,
51+
apoc_iterate: bool,
52+
chunk_size: int = 1000,
53+
execute_chunks_in_parallel: bool = True,
54+
retries_per_chunk: int = 3,
55+
) -> Query:
5056
return Query(
5157
{True: COMMIT_QUERY, False: NON_APOCH_COMMIT_QUERY}[apoc_iterate],
5258
{
@@ -55,5 +61,8 @@ def as_query(self, apoc_iterate: bool) -> Query:
5561
},
5662
"batched_query": self.query_statement,
5763
"iterable_query": UNWIND_QUERY,
64+
"execute_chunks_in_parallel": execute_chunks_in_parallel,
65+
"chunk_size": chunk_size,
66+
"retries_per_chunk": retries_per_chunk,
5867
},
5968
)

nodestream_plugin_neo4j/query_executor.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,35 @@
1515

1616
from .ingest_query_builder import Neo4jIngestQueryBuilder
1717
from .neo4j_database import Neo4jDatabaseConnection
18-
from .query import Query
18+
from .query import Query, QueryBatch
1919

2020

2121
class Neo4jQueryExecutor(QueryExecutor):
2222
def __init__(
2323
self,
2424
database_connection: Neo4jDatabaseConnection,
2525
ingest_query_builder: Neo4jIngestQueryBuilder,
26+
chunk_size: int = 1000,
27+
execute_chunks_in_parallel: bool = True,
28+
retries_per_chunk: int = 3,
2629
) -> None:
2730
self.database_connection = database_connection
2831
self.ingest_query_builder = ingest_query_builder
2932
self.logger = getLogger(self.__class__.__name__)
33+
self.chunk_size = chunk_size
34+
self.execute_chunks_in_parallel = execute_chunks_in_parallel
35+
self.retries_per_chunk = retries_per_chunk
36+
37+
async def execute_query_batch(self, batch: QueryBatch):
38+
await self.database_connection.execute(
39+
batch.as_query(
40+
self.ingest_query_builder.apoc_iterate,
41+
chunk_size=self.chunk_size,
42+
execute_chunks_in_parallel=self.execute_chunks_in_parallel,
43+
retries_per_chunk=self.retries_per_chunk,
44+
),
45+
log_result=True,
46+
)
3047

3148
async def upsert_nodes_in_bulk_with_same_operation(
3249
self, operation: OperationOnNodeIdentity, nodes: Iterable[Node]
@@ -36,10 +53,7 @@ async def upsert_nodes_in_bulk_with_same_operation(
3653
operation, nodes
3754
)
3855
)
39-
await self.database_connection.execute(
40-
batched_query.as_query(self.ingest_query_builder.apoc_iterate),
41-
log_result=True,
42-
)
56+
await self.execute_query_batch(batched_query)
4357

4458
async def upsert_relationships_in_bulk_of_same_operation(
4559
self,

tests/unit/test_ingest_query_builder.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -194,11 +194,7 @@ def test_node_update_generates_expected_queries(
194194

195195
RELATIONSHIP_BETWEEN_TWO_NODES_EXPECTED_QUERY = QueryBatch(
196196
"""MATCH (from_node: TestType) WHERE from_node.id = params.__from_node_id MATCH (to_node: ComplexType) WHERE to_node.id = params.__to_node_id
197-
OPTIONAL MATCH (from_node)-[rel: RELATED_TO]->(to_node)
198-
FOREACH (x IN CASE WHEN rel IS NULL THEN [1] ELSE [] END |
199-
CREATE (from_node)-[rel: RELATED_TO]->(to_node) SET rel += params.__rel_properties)
200-
FOREACH (i in CASE WHEN rel IS NOT NULL THEN [1] ELSE [] END |
201-
SET rel += params.__rel_properties)
197+
MERGE (from_node)-[rel: RELATED_TO]->(to_node) SET rel += params.__rel_properties
202198
""",
203199
[
204200
{
@@ -217,12 +213,7 @@ def test_node_update_generates_expected_queries(
217213

218214
RELATIONSHIP_BETWEEN_TWO_NODES_EXPECTED_QUERY_WITH_MULTI_KEY = QueryBatch(
219215
"""MATCH (from_node: TestType) WHERE from_node.id = params.__from_node_id MATCH (to_node: ComplexType) WHERE to_node.id_part1 = params.__to_node_id_part1 AND to_node.id_part2 = params.__to_node_id_part2
220-
OPTIONAL MATCH (from_node)-[rel: RELATED_TO]->(to_node)
221-
FOREACH (x IN CASE WHEN rel IS NULL THEN [1] ELSE [] END |
222-
CREATE (from_node)-[rel: RELATED_TO]->(to_node) SET rel += params.__rel_properties)
223-
FOREACH (i in CASE WHEN rel IS NOT NULL THEN [1] ELSE [] END |
224-
SET rel += params.__rel_properties)
225-
""",
216+
MERGE (from_node)-[rel: RELATED_TO]->(to_node) SET rel += params.__rel_properties""",
226217
[
227218
{
228219
"__from_node_id": "foo",

tests/unit/test_query_executor.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,19 @@ def query_executor(mocker):
2828
return Neo4jQueryExecutor(database_connection, ingest_query_builder_mock)
2929

3030

31+
@pytest.mark.asyncio
32+
async def test_execute_query_batch(query_executor, some_query_batch, mocker):
33+
expected_query = some_query_batch.as_query(
34+
query_executor.ingest_query_builder.apoc_iterate,
35+
chunk_size=query_executor.chunk_size,
36+
execute_chunks_in_parallel=query_executor.execute_chunks_in_parallel,
37+
retries_per_chunk=query_executor.retries_per_chunk,
38+
)
39+
40+
await query_executor.execute_query_batch(some_query_batch)
41+
assert_that(query_executor, ran_query(expected_query))
42+
43+
3144
@pytest.mark.asyncio
3245
async def test_upsert_nodes_in_bulk_of_same_operation(query_executor, some_query_batch):
3346
query_executor.ingest_query_builder.generate_batch_update_node_operation_batch.return_value = (

0 commit comments

Comments
 (0)