Skip to content

Commit 6d17d9d

Browse files
committed
Use last_ingested_at as first_ingested_at on create case
1 parent 004adfa commit 6d17d9d

File tree

3 files changed

+19
-5
lines changed

3 files changed

+19
-5
lines changed

nodestream_plugin_neo4j/ingest_query_builder.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def _make_relationship(
117117
return f"{create_rel_query} {set_properties_query}"
118118

119119
if set_first_ingested_at:
120-
merge_rel_query = f"{merge_rel_query} ON CREATE SET {RELATIONSHIP_REF_NAME}.first_ingested_at = datetime()"
120+
merge_rel_query = f"{merge_rel_query} ON CREATE SET {RELATIONSHIP_REF_NAME}.first_ingested_at = params.{generate_prefixed_param_name(PROPERTIES_PARAM_NAME, RELATIONSHIP_REF_NAME)}['last_ingested_at']"
121121

122122
return f"{merge_rel_query} {set_properties_query}"
123123

@@ -142,7 +142,7 @@ def generate_update_node_operation_query_statement(
142142
# property and we are in the merge case. Because MATCH will not
143143
# create the node.
144144
if self.set_first_ingested_at:
145-
query = f"{query} ON CREATE SET {GENERIC_NODE_REF_NAME}.first_ingested_at = datetime()"
145+
query = f"{query} ON CREATE SET {GENERIC_NODE_REF_NAME}.first_ingested_at = params.{generate_prefixed_param_name(PROPERTIES_PARAM_NAME, GENERIC_NODE_REF_NAME)}['last_ingested_at']"
146146
else:
147147
query = str(_match_node(operation))
148148

tests/e2e/test_neo4j_pipelines.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def project():
2020

2121

2222
def validate_timestamps(session):
23+
# Check that all nodes have a first_ingested_at property
2324
nodes_with_first_ingested_at = session.run(
2425
"""
2526
MATCH (n) where n.first_ingested_at IS NOT NULL

tests/unit/test_ingest_query_builder.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_
178178
],
179179
)
180180
SIMPLE_NODE_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
181-
"MERGE (node: TestType {id : params.__node_id}) ON CREATE SET node.first_ingested_at = datetime() SET node += params.__node_properties",
181+
"MERGE (node: TestType {id : params.__node_id}) ON CREATE SET node.first_ingested_at = params.__node_properties['last_ingested_at'] SET node += params.__node_properties",
182182
[
183183
{
184184
"__node_id": "foo",
@@ -218,7 +218,7 @@ def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_
218218
)
219219

220220
COMPLEX_NODE_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
221-
"MERGE (node: ComplexType {id : params.__node_id}) ON CREATE SET node.first_ingested_at = datetime() WITH node, params CALL apoc.create.addLabels(node, params.__node_additional_labels) yield node as _ SET node += params.__node_properties",
221+
"MERGE (node: ComplexType {id : params.__node_id}) ON CREATE SET node.first_ingested_at = params.__node_properties['last_ingested_at'] WITH node, params CALL apoc.create.addLabels(node, params.__node_additional_labels) yield node as _ SET node += params.__node_properties",
222222
[
223223
{
224224
"__node_id": "foo",
@@ -247,7 +247,7 @@ def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_
247247
)
248248

249249
COMPLEX_NODE_TWO_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
250-
"MERGE (node: ComplexType {id_part1 : params.__node_id_part1, id_part2 : params.__node_id_part2}) ON CREATE SET node.first_ingested_at = datetime() WITH node, params CALL apoc.create.addLabels(node, params.__node_additional_labels) yield node as _ SET node += params.__node_properties",
250+
"MERGE (node: ComplexType {id_part1 : params.__node_id_part1, id_part2 : params.__node_id_part2}) ON CREATE SET node.first_ingested_at = params.__node_properties['last_ingested_at'] WITH node, params CALL apoc.create.addLabels(node, params.__node_additional_labels) yield node as _ SET node += params.__node_properties",
251251
[
252252
{
253253
"__node_id_part1": "foo",
@@ -325,6 +325,19 @@ def test_node_update_with_created_timestamp_generates_exepected_queries(
325325
],
326326
)
327327

328+
RELATIONSHIP_BETWEEN_TWO_NODES_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
329+
"""MATCH (from_node: TestType) WHERE from_node.id = params.__from_node_id MATCH (to_node: ComplexType) WHERE to_node.id = params.__to_node_id
330+
MERGE (from_node)-[rel: RELATED_TO]->(to_node) ON CREATE SET rel.first_ingested_at = params.__rel_properties['last_ingested_at'] SET rel += params.__rel_properties
331+
""",
332+
[
333+
{
334+
"__from_node_id": "foo",
335+
"__to_node_id": "foo",
336+
"__rel_properties": RELATIONSHIP_BETWEEN_TWO_NODES.relationship.properties,
337+
}
338+
],
339+
)
340+
328341
RELATIONSHIP_BETWEEN_TWO_NODES_WITH_MULTI_KEY = RelationshipWithNodes(
329342
from_node=SIMPLE_NODE,
330343
to_node=COMPLEX_NODE_TWO,

0 commit comments

Comments
 (0)