Skip to content

Commit 8e5b6f8

Browse files
committed
Added in the tests
1 parent 7d0d9d8 commit 8e5b6f8

File tree

5 files changed

+76
-20
lines changed

5 files changed

+76
-20
lines changed

nodestream_plugin_neo4j/ingest_query_builder.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,16 +256,17 @@ def generate_ttl_match_query(self, config: TimeToLiveConfiguration) -> Query:
256256
def generate_ttl_query_from_configuration(
257257
self,
258258
config: TimeToLiveConfiguration,
259-
chunk_size,
260-
execute_chunks_in_parallel,
261259
retries_per_chunk,
262260
) -> Query:
263261
ttl_match_query = self.generate_ttl_match_query(config)
262+
execute_chunks_in_parallel = (
263+
config.graph_object_type == GraphObjectType.RELATIONSHIP
264+
)
264265
operation = (
265266
DELETE_NODE_QUERY
266267
if config.graph_object_type == GraphObjectType.NODE
267268
else DELETE_REL_QUERY
268269
)
269270
return ttl_match_query.feed_batched_query(
270-
operation, chunk_size, execute_chunks_in_parallel, retries_per_chunk
271+
operation, config.batch_size, execute_chunks_in_parallel, retries_per_chunk
271272
)

nodestream_plugin_neo4j/query_executor.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ async def upsert_relationships_in_bulk_of_same_operation(
7373
async def perform_ttl_op(self, config: TimeToLiveConfiguration):
7474
query = self.ingest_query_builder.generate_ttl_query_from_configuration(
7575
config,
76-
chunk_size=self.chunk_size,
77-
execute_chunks_in_parallel=self.execute_chunks_in_parallel,
7876
retries_per_chunk=self.retries_per_chunk,
7977
)
8078
await self.database_connection.execute(query)

tests/e2e/test_neo4j_pipelines.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,32 @@ def create_relationship_query_from_params(
177177
)
178178

179179

180+
"""
181+
The test works like this:
182+
We create 60 nodes in total:
183+
10 ObjectA 60hrs old
184+
10 ObjectB 60hrs old
185+
10 ObjectA 36hrs old
186+
10 ObjectB 36hrs old
187+
10 ObjectA 12hrs old
188+
10 ObjectB 12hrs old
189+
190+
We create 60 pairwise relationships as follows:
191+
ObjectA -ADJACENT_TO {60hrs old} > Object B
192+
ObjectA -CONNECTED_TO {60hrs old} > Object B
193+
....
194+
195+
Following the same pattern.
196+
197+
With our TTL configuration, were able to first get rid of relationships.
198+
CONNECTED_TO has 48hr expiry leaving 20 relationships of this type post deletion.
199+
ADJACENT_TO has 24hr expiry leaving 10 relationships of this type post deletion
200+
201+
We assert that the nodes were untouched in the validate_consistency_in_node_counts function located above.
202+
We then do the exact same thing with the nodes.
203+
"""
204+
205+
180206
def create_test_objects(session: Session):
181207
def create_node(node_label, node_id, timestamp):
182208
query = create_node_query_from_params(node_label, node_id, timestamp)

tests/unit/test_ingest_query_builder.py

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ def query_builder():
3131

3232

3333
GREATEST_DAY = Timestamp(1998, 3, 25, 2, 0, 1)
34-
DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL = True
35-
DEFAULT_CHUNK_SIZE = 1000
34+
DEFAULT_CHUNK_SIZE = 100
3635
DEFAULT_RETRIES_PER_CHUNK = 3
3736

3837
BASIC_NODE_TTL = TimeToLiveConfiguration(
@@ -46,7 +45,7 @@ def query_builder():
4645
"iterate_params": {"earliest_allowed_time": GREATEST_DAY},
4746
"batched_query": DELETE_NODE_QUERY,
4847
"iterable_query": "MATCH (x: TestNodeType) WHERE x.last_ingested_at <= $earliest_allowed_time RETURN id(x) as id",
49-
"execute_chunks_in_parallel": DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
48+
"execute_chunks_in_parallel": False,
5049
"chunk_size": DEFAULT_CHUNK_SIZE,
5150
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
5251
},
@@ -64,7 +63,7 @@ def query_builder():
6463
"iterate_params": {"earliest_allowed_time": GREATEST_DAY},
6564
"batched_query": DELETE_NODE_QUERY,
6665
"iterable_query": NODE_TTL_WITH_CUSTOM_QUERY.custom_query,
67-
"execute_chunks_in_parallel": DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
66+
"execute_chunks_in_parallel": False,
6867
"chunk_size": DEFAULT_CHUNK_SIZE,
6968
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
7069
},
@@ -81,7 +80,7 @@ def query_builder():
8180
"iterate_params": {"earliest_allowed_time": GREATEST_DAY},
8281
"iterable_query": "MATCH ()-[x: IS_RELATED_TO]->() WHERE x.last_ingested_at <= $earliest_allowed_time RETURN id(x) as id",
8382
"batched_query": DELETE_REL_QUERY,
84-
"execute_chunks_in_parallel": DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
83+
"execute_chunks_in_parallel": True,
8584
"chunk_size": DEFAULT_CHUNK_SIZE,
8685
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
8786
},
@@ -99,12 +98,50 @@ def query_builder():
9998
"iterate_params": {"earliest_allowed_time": GREATEST_DAY},
10099
"iterable_query": REL_TTL_WITH_CUSTOM_QUERY.custom_query,
101100
"batched_query": DELETE_REL_QUERY,
102-
"execute_chunks_in_parallel": DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
101+
"execute_chunks_in_parallel": True,
103102
"chunk_size": DEFAULT_CHUNK_SIZE,
104103
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
105104
},
106105
)
107106

107+
NODE_TTL_WITH_BATCH_PARAMS = TimeToLiveConfiguration(
108+
graph_object_type=GraphObjectType.NODE,
109+
object_type="TestNodeType",
110+
expiry_in_hours=10,
111+
batch_size=200,
112+
)
113+
114+
NODE_TTL_WITH_BATCH_PARAMS_EXPECTED_QUERY = Query(
115+
COMMIT_QUERY,
116+
{
117+
"iterate_params": {"earliest_allowed_time": GREATEST_DAY},
118+
"batched_query": DELETE_NODE_QUERY,
119+
"iterable_query": "MATCH (x: TestNodeType) WHERE x.last_ingested_at <= $earliest_allowed_time RETURN id(x) as id",
120+
"execute_chunks_in_parallel": False,
121+
"chunk_size": 200,
122+
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
123+
},
124+
)
125+
126+
REL_TTL_WITH_BATCH_PARAMS = TimeToLiveConfiguration(
127+
graph_object_type=GraphObjectType.RELATIONSHIP,
128+
object_type="IS_RELATED_TO",
129+
expiry_in_hours=10,
130+
batch_size=200,
131+
)
132+
133+
REL_TTL_WITH_BATCH_PARAMS_EXPECTED_QUERY = Query(
134+
COMMIT_QUERY,
135+
{
136+
"iterate_params": {"earliest_allowed_time": GREATEST_DAY},
137+
"iterable_query": "MATCH ()-[x: IS_RELATED_TO]->() WHERE x.last_ingested_at <= $earliest_allowed_time RETURN id(x) as id",
138+
"batched_query": DELETE_REL_QUERY,
139+
"execute_chunks_in_parallel": True,
140+
"chunk_size": 200,
141+
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
142+
},
143+
)
144+
108145

109146
@patch("pandas.Timestamp.utcnow")
110147
@pytest.mark.parametrize(
@@ -114,14 +151,14 @@ def query_builder():
114151
(NODE_TTL_WITH_CUSTOM_QUERY, NODE_TTL_WITH_CUSTOM_QUERY_EXPECTED_QUERY),
115152
(BASIC_REL_TTL, BASIC_REL_TTL_EXPECTED_QUERY),
116153
(REL_TTL_WITH_CUSTOM_QUERY, REL_TTL_WITH_CUSTOM_QUERY_EXPECTED_QUERY),
154+
(NODE_TTL_WITH_BATCH_PARAMS, NODE_TTL_WITH_BATCH_PARAMS_EXPECTED_QUERY),
155+
(REL_TTL_WITH_BATCH_PARAMS, REL_TTL_WITH_BATCH_PARAMS_EXPECTED_QUERY),
117156
],
118157
)
119158
def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_query):
120159
mocked_utcnow.return_value = Timestamp(1998, 3, 25, 12, 0, 1)
121160
resultant_query = query_builder.generate_ttl_query_from_configuration(
122161
ttl,
123-
DEFAULT_CHUNK_SIZE,
124-
DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
125162
DEFAULT_RETRIES_PER_CHUNK,
126163
)
127164
assert_that(resultant_query, equal_to(expected_query))

tests/unit/test_query_executor.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,9 @@
88
from nodestream_plugin_neo4j.query_executor import Neo4jQueryExecutor
99

1010
from .matchers import ran_query
11-
from .test_ingest_query_builder import (
12-
DEFAULT_CHUNK_SIZE,
13-
DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
14-
DEFAULT_RETRIES_PER_CHUNK,
15-
)
11+
from .test_ingest_query_builder import DEFAULT_RETRIES_PER_CHUNK
1612

1713
TEST_PARAMS = {
18-
"execute_chunks_in_parallel": DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
19-
"chunk_size": DEFAULT_CHUNK_SIZE,
2014
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
2115
}
2216

0 commit comments

Comments
 (0)