1
1
from pathlib import Path
2
- from time import sleep
3
2
4
3
import pytest
4
+ from neo4j import Session
5
5
from nodestream .pipeline import (
6
6
PipelineInitializationArguments ,
7
7
PipelineProgressReporter ,
8
8
)
9
9
from nodestream .project import Project , RunRequest
10
+ from pandas import Timedelta , Timestamp
11
+
12
+ from nodestream_plugin_neo4j .query import Query
10
13
11
14
from .conftest import TESTED_NEO4J_VERSIONS
12
15
@@ -60,31 +63,55 @@ def validate_fifa_mo_club(session):
60
63
assert result .single ()["club" ] == "Liverpool"
61
64
62
65
63
- def validate_relationship_ttls (session ):
66
+ def validate_consistency_in_node_counts (session ):
67
+ result = session .run (
68
+ """
69
+ MATCH (n:ObjectA)
70
+ RETURN count(n) as node_count
71
+ """
72
+ )
73
+ assert result .single ()["node_count" ] == 30
64
74
result = session .run (
65
75
"""
66
- MATCH ()-[r]-()
76
+ MATCH (n:ObjectB)
77
+ RETURN count(n) as node_count
78
+ """
79
+ )
80
+ assert result .single ()["node_count" ] == 30
81
+
82
+
83
+ def validate_ttl_seperation_between_relationship_object_types (session ):
84
+ result = session .run (
85
+ """
86
+ MATCH ()-[r:CONNECTED_TO]->()
67
87
RETURN count(r) as relationship_count
68
88
"""
69
89
)
70
- assert result .single ()["relationship_count" ] == 0
90
+ assert result .single ()["relationship_count" ] == 20
71
91
result = session .run (
72
92
"""
73
- MATCH (n )
74
- RETURN count(n ) as relationship_count
93
+ MATCH ()-[r:ADJACENT_TO]->( )
94
+ RETURN count(r ) as relationship_count
75
95
"""
76
96
)
77
- assert result .single ()["relationship_count" ] != 0
97
+ assert result .single ()["relationship_count" ] == 10
78
98
79
99
80
- def validate_node_ttls (session ):
100
+ def validate_ttl_seperation_between_node_object_types (session ):
81
101
result = session .run (
82
102
"""
83
- MATCH (n)
103
+ MATCH (n:ObjectA )
84
104
RETURN count(n) as node_count
85
105
"""
86
106
)
87
- assert result .single ()["node_count" ] == 0
107
+ assert result .single ()["node_count" ] == 20
108
+ result = session .run (
109
+ """
110
+ MATCH (n:ObjectB)
111
+ RETURN count(n) as node_count
112
+ """
113
+ )
114
+ assert result .single ()["node_count" ] == 10
88
115
89
116
90
117
PIPELINE_TESTS = [
@@ -93,10 +120,122 @@ def validate_node_ttls(session):
93
120
]
94
121
95
122
TTL_TESTS = [
96
- ("relationship-ttlss" , [validate_relationship_ttls ]),
97
- ("node-ttls" , [validate_node_ttls ]),
123
+ (
124
+ "relationship-ttls" ,
125
+ [
126
+ validate_consistency_in_node_counts ,
127
+ validate_ttl_seperation_between_relationship_object_types ,
128
+ ],
129
+ ),
130
+ ("node-ttls" , [validate_ttl_seperation_between_node_object_types ]),
98
131
]
99
132
133
+ NODE_CREATION_QUERY = """
134
+ CREATE (n:{node_label})
135
+ SET n.last_ingested_at = $timestamp
136
+ SET n.identifier = $node_id
137
+ """
138
+
139
+ RELATIONSHIP_CREATION_QUERY = """
140
+ MATCH (from_node:{from_node_label}) MATCH (to_node:{to_node_label})
141
+ WHERE to_node.identifier=$to_node_identifier AND from_node.identifier=$from_node_identifier
142
+ CREATE (from_node)-[rel:{relationship_label}]->(to_node)
143
+ SET rel.last_ingested_at = $timestamp
144
+ """
145
+
146
+ REALLY_OLD_TIMESTAMP = Timestamp .utcnow () - Timedelta (hours = 60 )
147
+ OLD_TIMESTAMP = Timestamp .utcnow () - Timedelta (hours = 36 )
148
+ NEW_TIMESTAMP = Timestamp .utcnow () - Timedelta (hours = 12 )
149
+
150
+
151
+ def create_node_query_from_params (node_label , node_id , timestamp ):
152
+ return Query (
153
+ NODE_CREATION_QUERY .format (node_label = node_label ),
154
+ {"timestamp" : timestamp , "node_id" : node_id },
155
+ )
156
+
157
+
158
+ def create_relationship_query_from_params (
159
+ from_node_label ,
160
+ from_node_identifier ,
161
+ to_node_label ,
162
+ to_node_identifier ,
163
+ relationship_label ,
164
+ timestamp ,
165
+ ):
166
+ return Query (
167
+ RELATIONSHIP_CREATION_QUERY .format (
168
+ from_node_label = from_node_label ,
169
+ to_node_label = to_node_label ,
170
+ relationship_label = relationship_label ,
171
+ ),
172
+ {
173
+ "timestamp" : timestamp ,
174
+ "from_node_identifier" : from_node_identifier ,
175
+ "to_node_identifier" : to_node_identifier ,
176
+ },
177
+ )
178
+
179
+
180
+ def create_test_objects (session : Session ):
181
+ def create_node (node_label , node_id , timestamp ):
182
+ query = create_node_query_from_params (node_label , node_id , timestamp )
183
+ session .run (query .query_statement , query .parameters )
184
+
185
+ def create_relationship (
186
+ from_node_label ,
187
+ from_node_identifier ,
188
+ to_node_label ,
189
+ to_node_identifier ,
190
+ relationship_label ,
191
+ timestamp ,
192
+ ):
193
+ query = create_relationship_query_from_params (
194
+ from_node_label ,
195
+ from_node_identifier ,
196
+ to_node_label ,
197
+ to_node_identifier ,
198
+ relationship_label ,
199
+ timestamp ,
200
+ )
201
+ session .run (query .query_statement , query .parameters )
202
+
203
+ for i in range (0 , 10 ):
204
+ create_node ("ObjectA" , str (i ), REALLY_OLD_TIMESTAMP )
205
+ create_node ("ObjectB" , str (i ), REALLY_OLD_TIMESTAMP )
206
+
207
+ for i in range (10 , 20 ):
208
+ create_node ("ObjectA" , str (i ), OLD_TIMESTAMP )
209
+ create_node ("ObjectB" , str (i ), OLD_TIMESTAMP )
210
+
211
+ for i in range (20 , 30 ):
212
+ create_node ("ObjectA" , str (i ), NEW_TIMESTAMP )
213
+ create_node ("ObjectB" , str (i ), NEW_TIMESTAMP )
214
+
215
+ for i in range (0 , 10 ):
216
+ create_relationship (
217
+ "ObjectA" , str (i ), "ObjectB" , str (i ), "CONNECTED_TO" , NEW_TIMESTAMP
218
+ )
219
+ create_relationship (
220
+ "ObjectA" , str (i ), "ObjectB" , str (i ), "ADJACENT_TO" , NEW_TIMESTAMP
221
+ )
222
+
223
+ for i in range (10 , 20 ):
224
+ create_relationship (
225
+ "ObjectA" , str (i ), "ObjectB" , str (i ), "CONNECTED_TO" , OLD_TIMESTAMP
226
+ )
227
+ create_relationship (
228
+ "ObjectA" , str (i ), "ObjectB" , str (i ), "ADJACENT_TO" , OLD_TIMESTAMP
229
+ )
230
+
231
+ for i in range (20 , 30 ):
232
+ create_relationship (
233
+ "ObjectA" , str (i ), "ObjectB" , str (i ), "CONNECTED_TO" , REALLY_OLD_TIMESTAMP
234
+ )
235
+ create_relationship (
236
+ "ObjectA" , str (i ), "ObjectB" , str (i ), "ADJACENT_TO" , REALLY_OLD_TIMESTAMP
237
+ )
238
+
100
239
101
240
@pytest .mark .asyncio
102
241
@pytest .mark .e2e
@@ -132,22 +271,8 @@ async def test_neo4j_ttls(project, neo4j_container, neo4j_version):
132
271
with neo4j_container (
133
272
neo4j_version
134
273
) as neo4j_container , neo4j_container .get_driver () as driver , driver .session () as session :
274
+ create_test_objects (session )
135
275
target = project .get_target_by_name ("my-neo4j-db" )
136
-
137
- for pipeline_name , validations in PIPELINE_TESTS :
138
- await project .run (
139
- RunRequest (
140
- pipeline_name ,
141
- PipelineInitializationArguments (extra_steps = [target .make_writer ()]),
142
- PipelineProgressReporter (),
143
- )
144
- )
145
-
146
- for validator in validations :
147
- validator (session )
148
-
149
- sleep (30 )
150
-
151
276
for pipeline_name , validations in TTL_TESTS :
152
277
await project .run (
153
278
RunRequest (
@@ -156,6 +281,5 @@ async def test_neo4j_ttls(project, neo4j_container, neo4j_version):
156
281
PipelineProgressReporter (),
157
282
)
158
283
)
159
-
160
284
for validator in validations :
161
285
validator (session )
0 commit comments