Skip to content

Commit 7524486

Browse files
authored
Merge pull request #26 from nodestream-proj/beta/first-created-at-timestamp
Support Experimental `first_ingested_at` timestamp
2 parents 36de463 + 063bd16 commit 7524486

File tree

5 files changed

+231
-10
lines changed

5 files changed

+231
-10
lines changed

nodestream_plugin_neo4j/database_connector.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def from_file_data(
2525
chunk_size: int = 1000,
2626
execute_chunks_in_parallel: bool = True,
2727
retries_per_chunk: int = 3,
28+
_experimental_set_first_ingested_at: bool = False,
2829
**connection_args
2930
):
3031
database_connection = Neo4jDatabaseConnection.from_configuration(
@@ -37,6 +38,7 @@ def from_file_data(
3738
chunk_size=chunk_size,
3839
execute_chunks_in_parallel=execute_chunks_in_parallel,
3940
retries_per_chunk=retries_per_chunk,
41+
set_first_ingested_at=_experimental_set_first_ingested_at,
4042
)
4143

4244
def __init__(
@@ -47,16 +49,20 @@ def __init__(
4749
chunk_size: int = 1000,
4850
execute_chunks_in_parallel: bool = True,
4951
retries_per_chunk: int = 3,
52+
set_first_ingested_at: bool = False,
5053
) -> None:
5154
self.use_enterprise_features = use_enterprise_features
5255
self.use_apoc = use_apoc
5356
self.database_connection = database_connection
5457
self.chunk_size = chunk_size
5558
self.execute_chunks_in_parallel = execute_chunks_in_parallel
5659
self.retries_per_chunk = retries_per_chunk
60+
self.set_first_ingested_at = set_first_ingested_at
5761

5862
def make_query_executor(self) -> QueryExecutor:
59-
query_builder = Neo4jIngestQueryBuilder(self.use_apoc)
63+
query_builder = Neo4jIngestQueryBuilder(
64+
self.use_apoc, self.set_first_ingested_at
65+
)
6066
return Neo4jQueryExecutor(
6167
self.database_connection,
6268
query_builder,

nodestream_plugin_neo4j/ingest_query_builder.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ def _merge_node(
9494

9595
@cache
9696
def _make_relationship(
97-
rel_identity: RelationshipIdentityShape, creation_rule: RelationshipCreationRule
97+
rel_identity: RelationshipIdentityShape,
98+
creation_rule: RelationshipCreationRule,
99+
set_first_ingested_at: bool,
98100
):
99101
keys = generate_properties_set_with_prefix(rel_identity.keys, RELATIONSHIP_REF_NAME)
100102
merge_rel_query = (
@@ -112,14 +114,20 @@ def _make_relationship(
112114
set_properties_query = f"SET {RELATIONSHIP_REF_NAME} += params.{generate_prefixed_param_name(PROPERTIES_PARAM_NAME, RELATIONSHIP_REF_NAME)}"
113115
if creation_rule == RelationshipCreationRule.CREATE:
114116
create_rel_query = str(merge_rel_query).replace("MERGE", "CREATE")
117+
if set_first_ingested_at:
118+
set_properties_query = f"{set_properties_query} SET {RELATIONSHIP_REF_NAME}.first_ingested_at = params.{generate_prefixed_param_name(PROPERTIES_PARAM_NAME, RELATIONSHIP_REF_NAME)}['last_ingested_at']"
115119
return f"{create_rel_query} {set_properties_query}"
116120

121+
if set_first_ingested_at:
122+
merge_rel_query = f"{merge_rel_query} ON CREATE SET {RELATIONSHIP_REF_NAME}.first_ingested_at = params.{generate_prefixed_param_name(PROPERTIES_PARAM_NAME, RELATIONSHIP_REF_NAME)}['last_ingested_at']"
123+
117124
return f"{merge_rel_query} {set_properties_query}"
118125

119126

120127
class Neo4jIngestQueryBuilder:
121-
def __init__(self, apoc_iterate: bool):
128+
def __init__(self, apoc_iterate: bool, set_first_ingested_at: bool):
122129
self.apoc_iterate = apoc_iterate
130+
self.set_first_ingested_at = set_first_ingested_at
123131

124132
@cache
125133
@correct_parameters
@@ -131,6 +139,12 @@ def generate_update_node_operation_query_statement(
131139

132140
if operation.node_creation_rule == NodeCreationRule.EAGER:
133141
query = str(_merge_node(operation))
142+
143+
# We only need to add this if the user wants a first_ingested_at
144+
# property and we are in the merge case. Because MATCH will not
145+
# create the node.
146+
if self.set_first_ingested_at:
147+
query = f"{query} ON CREATE SET {GENERIC_NODE_REF_NAME}.first_ingested_at = params.{generate_prefixed_param_name(PROPERTIES_PARAM_NAME, GENERIC_NODE_REF_NAME)}['last_ingested_at']"
134148
else:
135149
query = str(_match_node(operation))
136150

@@ -173,7 +187,9 @@ def generate_update_relationship_operation_query_statement(
173187
match_from_node_segment = _match_node(operation.from_node, FROM_NODE_REF_NAME)
174188
match_to_node_segment = _match_node(operation.to_node, TO_NODE_REF_NAME)
175189
merge_rel_segment = _make_relationship(
176-
operation.relationship_identity, operation.relationship_creation_rule
190+
operation.relationship_identity,
191+
operation.relationship_creation_rule,
192+
self.set_first_ingested_at,
177193
)
178194
return f"{match_from_node_segment} {match_to_node_segment} {merge_rel_segment}"
179195

tests/e2e/project/nodestream.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,9 @@ targets:
1818
uri: !env NEO4J_CONNECT_URI
1919
username: neo4j
2020
password: password
21+
my-creation-ts-db:
22+
database: neo4j
23+
uri: !env NEO4J_CONNECT_URI
24+
username: neo4j
25+
password: password
26+
_experimental_set_first_ingested_at: true

tests/e2e/test_neo4j_pipelines.py

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,47 @@ def project():
1919
return Project.read_from_file(Path("tests/e2e/project/nodestream.yaml"))
2020

2121

22+
def validate_timestamps(session):
23+
# Check that all nodes have a first_ingested_at property
24+
nodes_with_first_ingested_at = session.run(
25+
"""
26+
MATCH (n) where n.first_ingested_at IS NOT NULL
27+
RETURN count(n) AS count
28+
"""
29+
)
30+
31+
total_nodes = session.run(
32+
"""
33+
MATCH (n)
34+
RETURN count(n) AS count
35+
"""
36+
)
37+
38+
assert (
39+
nodes_with_first_ingested_at.single()["count"] == total_nodes.single()["count"]
40+
)
41+
42+
# Check that all relationships have a last_ingested_at property
43+
relationships_with_last_ingested_at = session.run(
44+
"""
45+
MATCH ()-[r]->() where r.last_ingested_at IS NOT NULL
46+
RETURN count(r) AS count
47+
"""
48+
)
49+
50+
total_relationships = session.run(
51+
"""
52+
MATCH ()-[r]->()
53+
RETURN count(r) AS count
54+
"""
55+
)
56+
57+
assert (
58+
relationships_with_last_ingested_at.single()["count"]
59+
== total_relationships.single()["count"]
60+
)
61+
62+
2263
def validate_airports(session):
2364
result = session.run(
2465
"""
@@ -125,8 +166,13 @@ def validate_ttl_seperation_between_node_object_types(session):
125166

126167

127168
PIPELINE_TESTS = [
128-
("airports", [validate_airports, valiudate_airport_country]),
129-
("fifa", [validate_fifa_player_count, validate_fifa_mo_club]),
169+
("airports", [validate_airports, valiudate_airport_country], "my-neo4j-db"),
170+
("fifa", [validate_fifa_player_count, validate_fifa_mo_club], "my-neo4j-db"),
171+
(
172+
"airports",
173+
[validate_airports, valiudate_airport_country, validate_timestamps],
174+
"my-creation-ts-db",
175+
),
130176
]
131177

132178
TTL_TESTS = [
@@ -280,16 +326,16 @@ def create_relationship(
280326
@pytest.mark.e2e
281327
@pytest.mark.parametrize("neo4j_version", TESTED_NEO4J_VERSIONS)
282328
@pytest.mark.parametrize(
283-
"pipeline_name,validations",
329+
"pipeline_name,validations,target_name",
284330
PIPELINE_TESTS,
285331
)
286332
async def test_neo4j_pipeline(
287-
project, neo4j_container, pipeline_name, validations, neo4j_version
333+
project, neo4j_container, pipeline_name, validations, neo4j_version, target_name
288334
):
289335
with neo4j_container(
290336
neo4j_version
291337
) as neo4j_container, neo4j_container.get_driver() as driver, driver.session() as session:
292-
target = project.get_target_by_name("my-neo4j-db")
338+
target = project.get_target_by_name(target_name)
293339

294340
await project.run(
295341
RunRequest(

tests/unit/test_ingest_query_builder.py

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
@pytest.fixture
2929
def query_builder():
30-
return Neo4jIngestQueryBuilder(True)
30+
return Neo4jIngestQueryBuilder(True, False)
3131

3232

3333
GREATEST_DAY = Timestamp(1998, 3, 25, 2, 0, 1)
@@ -177,6 +177,16 @@ def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_
177177
}
178178
],
179179
)
180+
SIMPLE_NODE_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
181+
"MERGE (node: TestType {id : params.__node_id}) ON CREATE SET node.first_ingested_at = params.__node_properties['last_ingested_at'] SET node += params.__node_properties",
182+
[
183+
{
184+
"__node_id": "foo",
185+
"__node_properties": SIMPLE_NODE.properties,
186+
"__node_additional_labels": (),
187+
}
188+
],
189+
)
180190

181191
SIMPLE_NODE_EXPECTED_QUERY_ON_MATCH = QueryBatch(
182192
"MATCH (node: TestType) WHERE node.id = params.__node_id SET node += params.__node_properties",
@@ -207,6 +217,17 @@ def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_
207217
],
208218
)
209219

220+
COMPLEX_NODE_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
221+
"MERGE (node: ComplexType {id : params.__node_id}) ON CREATE SET node.first_ingested_at = params.__node_properties['last_ingested_at'] WITH node, params CALL apoc.create.addLabels(node, params.__node_additional_labels) yield node as _ SET node += params.__node_properties",
222+
[
223+
{
224+
"__node_id": "foo",
225+
"__node_properties": COMPLEX_NODE.properties,
226+
"__node_additional_labels": ("ExtraTypeOne", "ExtraTypeTwo"),
227+
}
228+
],
229+
)
230+
210231
COMPLEX_NODE_TWO = Node(
211232
"ComplexType",
212233
{"id_part1": "foo", "id_part2": "bar"},
@@ -225,6 +246,18 @@ def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_
225246
],
226247
)
227248

249+
COMPLEX_NODE_TWO_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
250+
"MERGE (node: ComplexType {id_part1 : params.__node_id_part1, id_part2 : params.__node_id_part2}) ON CREATE SET node.first_ingested_at = params.__node_properties['last_ingested_at'] WITH node, params CALL apoc.create.addLabels(node, params.__node_additional_labels) yield node as _ SET node += params.__node_properties",
251+
[
252+
{
253+
"__node_id_part1": "foo",
254+
"__node_id_part2": "bar",
255+
"__node_properties": COMPLEX_NODE_TWO.properties,
256+
"__node_additional_labels": ("ExtraTypeOne", "ExtraTypeTwo"),
257+
}
258+
],
259+
)
260+
228261

229262
@pytest.mark.parametrize(
230263
"node,expected_query,node_creation_rule",
@@ -243,6 +276,36 @@ def test_node_update_generates_expected_queries(
243276
assert_that(query, equal_to(expected_query))
244277

245278

279+
@pytest.mark.parametrize(
280+
"node,expected_query,node_creation_rule",
281+
[
282+
[
283+
SIMPLE_NODE,
284+
SIMPLE_NODE_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP,
285+
NodeCreationRule.EAGER,
286+
],
287+
[
288+
COMPLEX_NODE,
289+
COMPLEX_NODE_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP,
290+
NodeCreationRule.EAGER,
291+
],
292+
[
293+
COMPLEX_NODE_TWO,
294+
COMPLEX_NODE_TWO_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP,
295+
NodeCreationRule.EAGER,
296+
],
297+
[SIMPLE_NODE, SIMPLE_NODE_EXPECTED_QUERY_ON_MATCH, NodeCreationRule.MATCH_ONLY],
298+
],
299+
)
300+
def test_node_update_with_created_timestamp_generates_exepected_queries(
301+
query_builder, node, expected_query, node_creation_rule
302+
):
303+
query_builder.set_first_ingested_at = True
304+
operation = OperationOnNodeIdentity(node.identity_shape, node_creation_rule)
305+
query = query_builder.generate_batch_update_node_operation_batch(operation, [node])
306+
assert_that(query, equal_to(expected_query))
307+
308+
246309
RELATIONSHIP_BETWEEN_TWO_NODES = RelationshipWithNodes(
247310
from_node=SIMPLE_NODE,
248311
to_node=COMPLEX_NODE,
@@ -262,6 +325,19 @@ def test_node_update_generates_expected_queries(
262325
],
263326
)
264327

328+
RELATIONSHIP_BETWEEN_TWO_NODES_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
329+
"""MATCH (from_node: TestType) WHERE from_node.id = params.__from_node_id MATCH (to_node: ComplexType) WHERE to_node.id = params.__to_node_id
330+
MERGE (from_node)-[rel: RELATED_TO]->(to_node) ON CREATE SET rel.first_ingested_at = params.__rel_properties['last_ingested_at'] SET rel += params.__rel_properties
331+
""",
332+
[
333+
{
334+
"__from_node_id": "foo",
335+
"__to_node_id": "foo",
336+
"__rel_properties": RELATIONSHIP_BETWEEN_TWO_NODES.relationship.properties,
337+
}
338+
],
339+
)
340+
265341
RELATIONSHIP_BETWEEN_TWO_NODES_WITH_MULTI_KEY = RelationshipWithNodes(
266342
from_node=SIMPLE_NODE,
267343
to_node=COMPLEX_NODE_TWO,
@@ -281,6 +357,21 @@ def test_node_update_generates_expected_queries(
281357
],
282358
)
283359

360+
361+
RELATIONSHIP_BETWEEN_TWO_NODES_EXPECTED_QUERY_WITH_MULTI_KEY_AND_TIMESTAMP = QueryBatch(
362+
"""MATCH (from_node: TestType) WHERE from_node.id = params.__from_node_id MATCH (to_node: ComplexType) WHERE to_node.id_part1 = params.__to_node_id_part1 AND to_node.id_part2 = params.__to_node_id_part2
363+
MERGE (from_node)-[rel: RELATED_TO]->(to_node) ON CREATE SET rel.first_ingested_at = params.__rel_properties['last_ingested_at'] SET rel += params.__rel_properties""",
364+
[
365+
{
366+
"__from_node_id": "foo",
367+
"__to_node_id_part1": "foo",
368+
"__to_node_id_part2": "bar",
369+
"__rel_properties": RELATIONSHIP_BETWEEN_TWO_NODES_WITH_MULTI_KEY.relationship.properties,
370+
}
371+
],
372+
)
373+
374+
284375
RELATIONSHIP_BETWEEN_TWO_NODES_WITH_MULTI_KEY_AND_CREATE = RelationshipWithNodes(
285376
from_node=SIMPLE_NODE,
286377
to_node=COMPLEX_NODE_TWO,
@@ -301,6 +392,19 @@ def test_node_update_generates_expected_queries(
301392
],
302393
)
303394

395+
RELATIONSHIP_BETWEEN_TWO_NODES_EXPECTED_QUERY_WITH_MULTI_KEY_AND_CREATE_AND_TIMESTAMP = QueryBatch(
396+
"""MATCH (from_node: TestType) WHERE from_node.id = params.__from_node_id MATCH (to_node: ComplexType) WHERE to_node.id_part1 = params.__to_node_id_part1 AND to_node.id_part2 = params.__to_node_id_part2
397+
CREATE (from_node)-[rel: RELATED_TO]->(to_node) SET rel += params.__rel_properties SET rel.first_ingested_at = params.__rel_properties['last_ingested_at']""",
398+
[
399+
{
400+
"__from_node_id": "foo",
401+
"__to_node_id_part1": "foo",
402+
"__to_node_id_part2": "bar",
403+
"__rel_properties": RELATIONSHIP_BETWEEN_TWO_NODES_WITH_MULTI_KEY_AND_CREATE.relationship.properties,
404+
}
405+
],
406+
)
407+
304408

305409
@pytest.mark.parametrize(
306410
"rel,expected_query",
@@ -339,3 +443,46 @@ def test_relationship_update_generates_expected_queries(
339443
assert_that(
340444
query.batched_parameter_sets, equal_to(expected_query.batched_parameter_sets)
341445
)
446+
447+
448+
@pytest.mark.parametrize(
449+
"rel,expected_query",
450+
[
451+
[
452+
RELATIONSHIP_BETWEEN_TWO_NODES,
453+
RELATIONSHIP_BETWEEN_TWO_NODES_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP,
454+
],
455+
[
456+
RELATIONSHIP_BETWEEN_TWO_NODES_WITH_MULTI_KEY,
457+
RELATIONSHIP_BETWEEN_TWO_NODES_EXPECTED_QUERY_WITH_MULTI_KEY_AND_TIMESTAMP,
458+
],
459+
[
460+
RELATIONSHIP_BETWEEN_TWO_NODES_WITH_MULTI_KEY_AND_CREATE,
461+
RELATIONSHIP_BETWEEN_TWO_NODES_EXPECTED_QUERY_WITH_MULTI_KEY_AND_CREATE_AND_TIMESTAMP,
462+
],
463+
],
464+
)
465+
def test_relationship_update_with_created_timestamp_generates_expected_queries(
466+
query_builder, rel, expected_query
467+
):
468+
query_builder.set_first_ingested_at = True
469+
to_op = OperationOnNodeIdentity(rel.to_node.identity_shape, NodeCreationRule.EAGER)
470+
from_op = OperationOnNodeIdentity(
471+
rel.from_node.identity_shape, NodeCreationRule.MATCH_ONLY
472+
)
473+
operation = OperationOnRelationshipIdentity(
474+
from_op,
475+
to_op,
476+
rel.relationship.identity_shape,
477+
relationship_creation_rule=rel.relationship_creation_rule,
478+
)
479+
query = query_builder.generate_batch_update_relationship_query_batch(
480+
operation, [rel]
481+
)
482+
assert_that(
483+
query.query_statement,
484+
equal_to_ignoring_whitespace(expected_query.query_statement),
485+
)
486+
assert_that(
487+
query.batched_parameter_sets, equal_to(expected_query.batched_parameter_sets)
488+
)

0 commit comments

Comments
 (0)