Skip to content

Commit 032c469

Browse files
committed
First iteration of TTL fix
1 parent 5d1be06 commit 032c469

File tree

5 files changed

+52
-6
lines changed

5 files changed

+52
-6
lines changed

nodestream_plugin_neo4j/ingest_query_builder.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,12 +254,18 @@ def generate_ttl_match_query(self, config: TimeToLiveConfiguration) -> Query:
254254
return Query(str(query_builder), params)
255255

256256
def generate_ttl_query_from_configuration(
257-
self, config: TimeToLiveConfiguration
257+
self,
258+
config: TimeToLiveConfiguration,
259+
chunk_size,
260+
execute_chunks_in_parallel,
261+
retries_per_chunk,
258262
) -> Query:
259263
ttl_match_query = self.generate_ttl_match_query(config)
260264
operation = (
261265
DELETE_NODE_QUERY
262266
if config.graph_object_type == GraphObjectType.NODE
263267
else DELETE_REL_QUERY
264268
)
265-
return ttl_match_query.feed_batched_query(operation)
269+
return ttl_match_query.feed_batched_query(
270+
operation, chunk_size, execute_chunks_in_parallel, retries_per_chunk
271+
)

nodestream_plugin_neo4j/query.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,23 @@ class Query:
2929
def from_statement(cls, query_statement: str, **parameters: Any) -> "Query":
3030
return cls(query_statement, parameters)
3131

32-
def feed_batched_query(self, batched_query: str) -> "Query":
32+
def feed_batched_query(
33+
self,
34+
batched_query: str,
35+
chunk_size: int = 1000,
36+
execute_chunks_in_parallel: bool = True,
37+
retries_per_chunk: int = 3,
38+
) -> "Query":
3339
"""Feed the results of the the query into another query that will be executed in batches."""
3440
return Query(
3541
COMMIT_QUERY,
3642
{
3743
"iterate_params": self.parameters,
3844
"batched_query": batched_query,
3945
"iterable_query": self.query_statement,
46+
"execute_chunks_in_parallel": execute_chunks_in_parallel,
47+
"chunk_size": chunk_size,
48+
"retries_per_chunk": retries_per_chunk,
4049
},
4150
)
4251

nodestream_plugin_neo4j/query_executor.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,12 @@ async def upsert_relationships_in_bulk_of_same_operation(
7171
)
7272

7373
async def perform_ttl_op(self, config: TimeToLiveConfiguration):
74-
query = self.ingest_query_builder.generate_ttl_query_from_configuration(config)
74+
query = self.ingest_query_builder.generate_ttl_query_from_configuration(
75+
config,
76+
chunk_size=self.chunk_size,
77+
execute_chunks_in_parallel=self.execute_chunks_in_parallel,
78+
retries_per_chunk=self.retries_per_chunk,
79+
)
7580
await self.database_connection.execute(query)
7681

7782
async def execute_hook(self, hook: IngestionHook):

tests/unit/test_ingest_query_builder.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ def query_builder():
3131

3232

3333
GREATEST_DAY = Timestamp(1998, 3, 25, 2, 0, 1)
34+
DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL = 1000
35+
DEFAULT_CHUNK_SIZE = True
36+
DEFAULT_RETRIES_PER_CHUNK = 3
3437

3538
BASIC_NODE_TTL = TimeToLiveConfiguration(
3639
graph_object_type=GraphObjectType.NODE,
@@ -43,6 +46,9 @@ def query_builder():
4346
"iterate_params": {"earliest_allowed_time": GREATEST_DAY},
4447
"batched_query": DELETE_NODE_QUERY,
4548
"iterable_query": "MATCH (x: TestNodeType) WHERE x.last_ingested_at <= $earliest_allowed_time RETURN id(x) as id",
49+
"execute_chunks_in_parallel": DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
50+
"chunk_size": DEFAULT_CHUNK_SIZE,
51+
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
4652
},
4753
)
4854

@@ -58,6 +64,9 @@ def query_builder():
5864
"iterate_params": {"earliest_allowed_time": GREATEST_DAY},
5965
"batched_query": DELETE_NODE_QUERY,
6066
"iterable_query": NODE_TTL_WITH_CUSTOM_QUERY.custom_query,
67+
"execute_chunks_in_parallel": DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
68+
"chunk_size": DEFAULT_CHUNK_SIZE,
69+
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
6170
},
6271
)
6372

@@ -72,6 +81,9 @@ def query_builder():
7281
"iterate_params": {"earliest_allowed_time": GREATEST_DAY},
7382
"iterable_query": "MATCH ()-[x: IS_RELATED_TO]->() WHERE x.last_ingested_at <= $earliest_allowed_time RETURN id(x) as id",
7483
"batched_query": DELETE_REL_QUERY,
84+
"execute_chunks_in_parallel": DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
85+
"chunk_size": DEFAULT_CHUNK_SIZE,
86+
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
7587
},
7688
)
7789

@@ -87,6 +99,9 @@ def query_builder():
8799
"iterate_params": {"earliest_allowed_time": GREATEST_DAY},
88100
"iterable_query": REL_TTL_WITH_CUSTOM_QUERY.custom_query,
89101
"batched_query": DELETE_REL_QUERY,
102+
"execute_chunks_in_parallel": DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
103+
"chunk_size": DEFAULT_CHUNK_SIZE,
104+
"retries_per_chunk": DEFAULT_RETRIES_PER_CHUNK,
90105
},
91106
)
92107

@@ -103,7 +118,12 @@ def query_builder():
103118
)
104119
def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_query):
105120
mocked_utcnow.return_value = Timestamp(1998, 3, 25, 12, 0, 1)
106-
resultant_query = query_builder.generate_ttl_query_from_configuration(ttl)
121+
resultant_query = query_builder.generate_ttl_query_from_configuration(
122+
ttl,
123+
DEFAULT_CHUNK_SIZE,
124+
DEFAULT_EXECUTE_CHUNKS_IN_PARALLEL,
125+
DEFAULT_RETRIES_PER_CHUNK,
126+
)
107127
assert_that(resultant_query, equal_to(expected_query))
108128

109129

tests/unit/test_query_executor.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99

1010
from .matchers import ran_query
1111

12+
TEST_PARAMS = {
13+
"execute_chunks_in_parallel": True,
14+
"chunk_size": 1000,
15+
"retries_per_chunk": 3,
16+
}
17+
1218

1319
@pytest.fixture
1420
def some_query():
@@ -84,7 +90,7 @@ async def test_perform_ttl_op(query_executor, some_query):
8490
)
8591
query_generator.return_value = some_query
8692
await query_executor.perform_ttl_op(ttl_config)
87-
query_generator.assert_called_once_with(ttl_config)
93+
query_generator.assert_called_once_with(ttl_config, **TEST_PARAMS)
8894
assert_that(query_executor, ran_query(some_query))
8995

9096

0 commit comments

Comments
 (0)