Skip to content

Commit 1994658

Browse files
Merge pull request #12 from nodestream-proj/ttl-fix
Batch TTL fix + TTL integration tests
2 parents 5d1be06 + 8e5b6f8 commit 1994658

File tree

9 files changed

+333
-10
lines changed

9 files changed

+333
-10
lines changed

nodestream_plugin_neo4j/ingest_query_builder.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,12 +254,19 @@ def generate_ttl_match_query(self, config: TimeToLiveConfiguration) -> Query:
254254
return Query(str(query_builder), params)
255255

256256
def generate_ttl_query_from_configuration(
257-
self, config: TimeToLiveConfiguration
257+
self,
258+
config: TimeToLiveConfiguration,
259+
retries_per_chunk,
258260
) -> Query:
259261
ttl_match_query = self.generate_ttl_match_query(config)
262+
execute_chunks_in_parallel = (
263+
config.graph_object_type == GraphObjectType.RELATIONSHIP
264+
)
260265
operation = (
261266
DELETE_NODE_QUERY
262267
if config.graph_object_type == GraphObjectType.NODE
263268
else DELETE_REL_QUERY
264269
)
265-
return ttl_match_query.feed_batched_query(operation)
270+
return ttl_match_query.feed_batched_query(
271+
operation, config.batch_size, execute_chunks_in_parallel, retries_per_chunk
272+
)

nodestream_plugin_neo4j/query.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,23 @@ class Query:
2929
def from_statement(cls, query_statement: str, **parameters: Any) -> "Query":
3030
return cls(query_statement, parameters)
3131

32-
def feed_batched_query(self, batched_query: str) -> "Query":
32+
def feed_batched_query(
33+
self,
34+
batched_query: str,
35+
chunk_size: int = 1000,
36+
execute_chunks_in_parallel: bool = True,
37+
retries_per_chunk: int = 3,
38+
) -> "Query":
3339
"""Feed the results of the the query into another query that will be executed in batches."""
3440
return Query(
3541
COMMIT_QUERY,
3642
{
3743
"iterate_params": self.parameters,
3844
"batched_query": batched_query,
3945
"iterable_query": self.query_statement,
46+
"execute_chunks_in_parallel": execute_chunks_in_parallel,
47+
"chunk_size": chunk_size,
48+
"retries_per_chunk": retries_per_chunk,
4049
},
4150
)
4251

nodestream_plugin_neo4j/query_executor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ async def upsert_relationships_in_bulk_of_same_operation(
7171
)
7272

7373
async def perform_ttl_op(self, config: TimeToLiveConfiguration):
74-
query = self.ingest_query_builder.generate_ttl_query_from_configuration(config)
74+
query = self.ingest_query_builder.generate_ttl_query_from_configuration(
75+
config,
76+
retries_per_chunk=self.retries_per_chunk,
77+
)
7578
await self.database_connection.execute(query)
7679

7780
async def execute_hook(self, hook: IngestionHook):

tests/e2e/project/node-ttls.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
- implementation: nodestream.pipeline.extractors:TimeToLiveConfigurationExtractor
2+
arguments:
3+
graph_object_type: NODE
4+
configurations:
5+
- object_type: ObjectA
6+
expiry_in_hours: 48
7+
- object_type: ObjectB
8+
expiry_in_hours: 24

tests/e2e/project/nodestream.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ scopes:
55
name: airports
66
- path: tests/e2e/project/fifa_2021_player_data.yaml
77
name: fifa
8+
- path: tests/e2e/project/node-ttls.yaml
9+
name: node-ttls
10+
- path: tests/e2e/project/relationship-ttls.yaml
11+
name: relationship-ttls
812

913
targets:
1014
my-neo4j-db:
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
- implementation: nodestream.pipeline.extractors:TimeToLiveConfigurationExtractor
2+
arguments:
3+
graph_object_type: RELATIONSHIP
4+
configurations:
5+
- object_type: CONNECTED_TO
6+
expiry_in_hours: 48
7+
- object_type: ADJACENT_TO
8+
expiry_in_hours: 24

tests/e2e/test_neo4j_pipelines.py

Lines changed: 226 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
from pathlib import Path
22

33
import pytest
4+
from neo4j import Session
45
from nodestream.pipeline import (
56
PipelineInitializationArguments,
67
PipelineProgressReporter,
78
)
89
from nodestream.project import Project, RunRequest
10+
from pandas import Timedelta, Timestamp
11+
12+
from nodestream_plugin_neo4j.query import Query
913

1014
from .conftest import TESTED_NEO4J_VERSIONS
1115

@@ -59,15 +63,212 @@ def validate_fifa_mo_club(session):
5963
assert result.single()["club"] == "Liverpool"
6064

6165

66+
def validate_consistency_in_node_counts(session):
67+
result = session.run(
68+
"""
69+
MATCH (n:ObjectA)
70+
RETURN count(n) as node_count
71+
"""
72+
)
73+
assert result.single()["node_count"] == 30
74+
result = session.run(
75+
"""
76+
MATCH (n:ObjectB)
77+
RETURN count(n) as node_count
78+
"""
79+
)
80+
assert result.single()["node_count"] == 30
81+
82+
83+
def validate_ttl_seperation_between_relationship_object_types(session):
84+
result = session.run(
85+
"""
86+
MATCH ()-[r:CONNECTED_TO]->()
87+
RETURN count(r) as relationship_count
88+
"""
89+
)
90+
assert result.single()["relationship_count"] == 20
91+
result = session.run(
92+
"""
93+
MATCH ()-[r:ADJACENT_TO]->()
94+
RETURN count(r) as relationship_count
95+
"""
96+
)
97+
assert result.single()["relationship_count"] == 10
98+
99+
100+
def validate_ttl_seperation_between_node_object_types(session):
101+
result = session.run(
102+
"""
103+
MATCH (n:ObjectA)
104+
RETURN count(n) as node_count
105+
"""
106+
)
107+
assert result.single()["node_count"] == 20
108+
result = session.run(
109+
"""
110+
MATCH (n:ObjectB)
111+
RETURN count(n) as node_count
112+
"""
113+
)
114+
assert result.single()["node_count"] == 10
115+
116+
117+
PIPELINE_TESTS = [
118+
("airports", [validate_airports, valiudate_airport_country]),
119+
("fifa", [validate_fifa_player_count, validate_fifa_mo_club]),
120+
]
121+
122+
TTL_TESTS = [
123+
(
124+
"relationship-ttls",
125+
[
126+
validate_consistency_in_node_counts,
127+
validate_ttl_seperation_between_relationship_object_types,
128+
],
129+
),
130+
("node-ttls", [validate_ttl_seperation_between_node_object_types]),
131+
]
132+
133+
NODE_CREATION_QUERY = """
134+
CREATE (n:{node_label})
135+
SET n.last_ingested_at = $timestamp
136+
SET n.identifier = $node_id
137+
"""
138+
139+
RELATIONSHIP_CREATION_QUERY = """
140+
MATCH (from_node:{from_node_label}) MATCH (to_node:{to_node_label})
141+
WHERE to_node.identifier=$to_node_identifier AND from_node.identifier=$from_node_identifier
142+
CREATE (from_node)-[rel:{relationship_label}]->(to_node)
143+
SET rel.last_ingested_at = $timestamp
144+
"""
145+
146+
REALLY_OLD_TIMESTAMP = Timestamp.utcnow() - Timedelta(hours=60)
147+
OLD_TIMESTAMP = Timestamp.utcnow() - Timedelta(hours=36)
148+
NEW_TIMESTAMP = Timestamp.utcnow() - Timedelta(hours=12)
149+
150+
151+
def create_node_query_from_params(node_label, node_id, timestamp):
152+
return Query(
153+
NODE_CREATION_QUERY.format(node_label=node_label),
154+
{"timestamp": timestamp, "node_id": node_id},
155+
)
156+
157+
158+
def create_relationship_query_from_params(
159+
from_node_label,
160+
from_node_identifier,
161+
to_node_label,
162+
to_node_identifier,
163+
relationship_label,
164+
timestamp,
165+
):
166+
return Query(
167+
RELATIONSHIP_CREATION_QUERY.format(
168+
from_node_label=from_node_label,
169+
to_node_label=to_node_label,
170+
relationship_label=relationship_label,
171+
),
172+
{
173+
"timestamp": timestamp,
174+
"from_node_identifier": from_node_identifier,
175+
"to_node_identifier": to_node_identifier,
176+
},
177+
)
178+
179+
180+
"""
181+
The test works like this:
182+
We create 60 nodes in total:
183+
10 ObjectA 60hrs old
184+
10 ObjectB 60hrs old
185+
10 ObjectA 36hrs old
186+
10 ObjectB 36hrs old
187+
10 ObjectA 12hrs old
188+
10 ObjectB 12hrs old
189+
190+
We create 60 pairwise relationships as follows:
191+
ObjectA -ADJACENT_TO {60hrs old} > Object B
192+
ObjectA -CONNECTED_TO {60hrs old} > Object B
193+
....
194+
195+
Following the same pattern.
196+
197+
With our TTL configuration, were able to first get rid of relationships.
198+
CONNECTED_TO has 48hr expiry leaving 20 relationships of this type post deletion.
199+
ADJACENT_TO has 24hr expiry leaving 10 relationships of this type post deletion
200+
201+
We assert that the nodes were untouched in the validate_consistency_in_node_counts function located above.
202+
We then do the exact same thing with the nodes.
203+
"""
204+
205+
206+
def create_test_objects(session: Session):
207+
def create_node(node_label, node_id, timestamp):
208+
query = create_node_query_from_params(node_label, node_id, timestamp)
209+
session.run(query.query_statement, query.parameters)
210+
211+
def create_relationship(
212+
from_node_label,
213+
from_node_identifier,
214+
to_node_label,
215+
to_node_identifier,
216+
relationship_label,
217+
timestamp,
218+
):
219+
query = create_relationship_query_from_params(
220+
from_node_label,
221+
from_node_identifier,
222+
to_node_label,
223+
to_node_identifier,
224+
relationship_label,
225+
timestamp,
226+
)
227+
session.run(query.query_statement, query.parameters)
228+
229+
for i in range(0, 10):
230+
create_node("ObjectA", str(i), REALLY_OLD_TIMESTAMP)
231+
create_node("ObjectB", str(i), REALLY_OLD_TIMESTAMP)
232+
233+
for i in range(10, 20):
234+
create_node("ObjectA", str(i), OLD_TIMESTAMP)
235+
create_node("ObjectB", str(i), OLD_TIMESTAMP)
236+
237+
for i in range(20, 30):
238+
create_node("ObjectA", str(i), NEW_TIMESTAMP)
239+
create_node("ObjectB", str(i), NEW_TIMESTAMP)
240+
241+
for i in range(0, 10):
242+
create_relationship(
243+
"ObjectA", str(i), "ObjectB", str(i), "CONNECTED_TO", NEW_TIMESTAMP
244+
)
245+
create_relationship(
246+
"ObjectA", str(i), "ObjectB", str(i), "ADJACENT_TO", NEW_TIMESTAMP
247+
)
248+
249+
for i in range(10, 20):
250+
create_relationship(
251+
"ObjectA", str(i), "ObjectB", str(i), "CONNECTED_TO", OLD_TIMESTAMP
252+
)
253+
create_relationship(
254+
"ObjectA", str(i), "ObjectB", str(i), "ADJACENT_TO", OLD_TIMESTAMP
255+
)
256+
257+
for i in range(20, 30):
258+
create_relationship(
259+
"ObjectA", str(i), "ObjectB", str(i), "CONNECTED_TO", REALLY_OLD_TIMESTAMP
260+
)
261+
create_relationship(
262+
"ObjectA", str(i), "ObjectB", str(i), "ADJACENT_TO", REALLY_OLD_TIMESTAMP
263+
)
264+
265+
62266
@pytest.mark.asyncio
63267
@pytest.mark.e2e
64268
@pytest.mark.parametrize("neo4j_version", TESTED_NEO4J_VERSIONS)
65269
@pytest.mark.parametrize(
66270
"pipeline_name,validations",
67-
[
68-
("airports", [validate_airports, valiudate_airport_country]),
69-
("fifa", [validate_fifa_player_count, validate_fifa_mo_club]),
70-
],
271+
PIPELINE_TESTS,
71272
)
72273
async def test_neo4j_pipeline(
73274
project, neo4j_container, pipeline_name, validations, neo4j_version
@@ -87,3 +288,24 @@ async def test_neo4j_pipeline(
87288

88289
for validator in validations:
89290
validator(session)
291+
292+
293+
@pytest.mark.asyncio
294+
@pytest.mark.e2e
295+
@pytest.mark.parametrize("neo4j_version", TESTED_NEO4J_VERSIONS)
296+
async def test_neo4j_ttls(project, neo4j_container, neo4j_version):
297+
with neo4j_container(
298+
neo4j_version
299+
) as neo4j_container, neo4j_container.get_driver() as driver, driver.session() as session:
300+
create_test_objects(session)
301+
target = project.get_target_by_name("my-neo4j-db")
302+
for pipeline_name, validations in TTL_TESTS:
303+
await project.run(
304+
RunRequest(
305+
pipeline_name,
306+
PipelineInitializationArguments(extra_steps=[target.make_writer()]),
307+
PipelineProgressReporter(),
308+
)
309+
)
310+
for validator in validations:
311+
validator(session)

0 commit comments

Comments
 (0)