Skip to content

Commit b9ec625

Browse files
committed
Add feature to support first created at TS
1 parent 36de463 commit b9ec625

File tree

5 files changed

+121
-10
lines changed

5 files changed

+121
-10
lines changed

nodestream_plugin_neo4j/database_connector.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def from_file_data(
2525
chunk_size: int = 1000,
2626
execute_chunks_in_parallel: bool = True,
2727
retries_per_chunk: int = 3,
28+
_experimental_set_first_ingested_at: bool = False,
2829
**connection_args
2930
):
3031
database_connection = Neo4jDatabaseConnection.from_configuration(
@@ -37,6 +38,7 @@ def from_file_data(
3738
chunk_size=chunk_size,
3839
execute_chunks_in_parallel=execute_chunks_in_parallel,
3940
retries_per_chunk=retries_per_chunk,
41+
set_first_ingested_at=_experimental_set_first_ingested_at,
4042
)
4143

4244
def __init__(
@@ -47,16 +49,18 @@ def __init__(
4749
chunk_size: int = 1000,
4850
execute_chunks_in_parallel: bool = True,
4951
retries_per_chunk: int = 3,
52+
set_first_ingested_at: bool = False,
5053
) -> None:
5154
self.use_enterprise_features = use_enterprise_features
5255
self.use_apoc = use_apoc
5356
self.database_connection = database_connection
5457
self.chunk_size = chunk_size
5558
self.execute_chunks_in_parallel = execute_chunks_in_parallel
5659
self.retries_per_chunk = retries_per_chunk
60+
self.set_first_ingested_at = set_first_ingested_at
5761

5862
def make_query_executor(self) -> QueryExecutor:
59-
query_builder = Neo4jIngestQueryBuilder(self.use_apoc)
63+
query_builder = Neo4jIngestQueryBuilder(self.use_apoc, self.set_first_ingested_at)
6064
return Neo4jQueryExecutor(
6165
self.database_connection,
6266
query_builder,

nodestream_plugin_neo4j/ingest_query_builder.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ def _merge_node(
9494

9595
@cache
9696
def _make_relationship(
97-
rel_identity: RelationshipIdentityShape, creation_rule: RelationshipCreationRule
97+
rel_identity: RelationshipIdentityShape,
98+
creation_rule: RelationshipCreationRule,
99+
set_first_ingested_at: bool,
98100
):
99101
keys = generate_properties_set_with_prefix(rel_identity.keys, RELATIONSHIP_REF_NAME)
100102
merge_rel_query = (
@@ -114,12 +116,16 @@ def _make_relationship(
114116
create_rel_query = str(merge_rel_query).replace("MERGE", "CREATE")
115117
return f"{create_rel_query} {set_properties_query}"
116118

119+
if set_first_ingested_at:
120+
merge_rel_query = f"{merge_rel_query} ON CREATE SET {RELATIONSHIP_REF_NAME}.first_ingested_at = datetime()"
121+
117122
return f"{merge_rel_query} {set_properties_query}"
118123

119124

120125
class Neo4jIngestQueryBuilder:
121-
def __init__(self, apoc_iterate: bool):
126+
def __init__(self, apoc_iterate: bool, set_first_ingested_at: bool):
122127
self.apoc_iterate = apoc_iterate
128+
self.set_first_ingested_at = set_first_ingested_at
123129

124130
@cache
125131
@correct_parameters
@@ -131,6 +137,12 @@ def generate_update_node_operation_query_statement(
131137

132138
if operation.node_creation_rule == NodeCreationRule.EAGER:
133139
query = str(_merge_node(operation))
140+
141+
# We only need to add this if the user wants a first_ingested_at
142+
# property and we are in the merge case. Because MATCH will not
143+
# create the node.
144+
if self.set_first_ingested_at:
145+
query = f"{query} ON CREATE SET {GENERIC_NODE_REF_NAME}.first_ingested_at = datetime()"
134146
else:
135147
query = str(_match_node(operation))
136148

@@ -173,7 +185,9 @@ def generate_update_relationship_operation_query_statement(
173185
match_from_node_segment = _match_node(operation.from_node, FROM_NODE_REF_NAME)
174186
match_to_node_segment = _match_node(operation.to_node, TO_NODE_REF_NAME)
175187
merge_rel_segment = _make_relationship(
176-
operation.relationship_identity, operation.relationship_creation_rule
188+
operation.relationship_identity,
189+
operation.relationship_creation_rule,
190+
self.set_first_ingested_at,
177191
)
178192
return f"{match_from_node_segment} {match_to_node_segment} {merge_rel_segment}"
179193

tests/e2e/project/nodestream.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,9 @@ targets:
1818
uri: !env NEO4J_CONNECT_URI
1919
username: neo4j
2020
password: password
21+
my-creation-ts-db:
22+
database: neo4j
23+
uri: !env NEO4J_CONNECT_URI
24+
username: neo4j
25+
password: password
26+
_experimental_set_first_ingested_at: true

tests/e2e/test_neo4j_pipelines.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,41 @@ def project():
1919
return Project.read_from_file(Path("tests/e2e/project/nodestream.yaml"))
2020

2121

22+
def validate_timestamps(session):
23+
nodes_with_first_ingested_at = session.run(
24+
"""
25+
MATCH (n) where n.first_ingested_at IS NOT NULL
26+
RETURN count(n) AS count
27+
"""
28+
)
29+
30+
total_nodes = session.run(
31+
"""
32+
MATCH (n)
33+
RETURN count(n) AS count
34+
"""
35+
)
36+
37+
assert nodes_with_first_ingested_at.single()["count"] == total_nodes.single()["count"]
38+
39+
# Check that all relationships have a last_ingested_at property
40+
relationships_with_last_ingested_at = session.run(
41+
"""
42+
MATCH ()-[r]->() where r.last_ingested_at IS NOT NULL
43+
RETURN count(r) AS count
44+
"""
45+
)
46+
47+
total_relationships = session.run(
48+
"""
49+
MATCH ()-[r]->()
50+
RETURN count(r) AS count
51+
"""
52+
)
53+
54+
assert relationships_with_last_ingested_at.single()["count"] == total_relationships.single()["count"]
55+
56+
2257
def validate_airports(session):
2358
result = session.run(
2459
"""
@@ -125,8 +160,9 @@ def validate_ttl_seperation_between_node_object_types(session):
125160

126161

127162
PIPELINE_TESTS = [
128-
("airports", [validate_airports, valiudate_airport_country]),
129-
("fifa", [validate_fifa_player_count, validate_fifa_mo_club]),
163+
("airports", [validate_airports, valiudate_airport_country], "my-neo4j-db"),
164+
("fifa", [validate_fifa_player_count, validate_fifa_mo_club], "my-neo4j-db"),
165+
("airpoints", [validate_airports, valiudate_airport_country, validate_timestamps], "my-creation-ts-db"),
130166
]
131167

132168
TTL_TESTS = [
@@ -280,16 +316,16 @@ def create_relationship(
280316
@pytest.mark.e2e
281317
@pytest.mark.parametrize("neo4j_version", TESTED_NEO4J_VERSIONS)
282318
@pytest.mark.parametrize(
283-
"pipeline_name,validations",
319+
"pipeline_name,validations,target_name",
284320
PIPELINE_TESTS,
285321
)
286322
async def test_neo4j_pipeline(
287-
project, neo4j_container, pipeline_name, validations, neo4j_version
323+
project, neo4j_container, pipeline_name, validations, neo4j_version, target_name
288324
):
289325
with neo4j_container(
290326
neo4j_version
291327
) as neo4j_container, neo4j_container.get_driver() as driver, driver.session() as session:
292-
target = project.get_target_by_name("my-neo4j-db")
328+
target = project.get_target_by_name(target_name)
293329

294330
await project.run(
295331
RunRequest(

tests/unit/test_ingest_query_builder.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
@pytest.fixture
2929
def query_builder():
30-
return Neo4jIngestQueryBuilder(True)
30+
return Neo4jIngestQueryBuilder(True, False)
3131

3232

3333
GREATEST_DAY = Timestamp(1998, 3, 25, 2, 0, 1)
@@ -177,6 +177,16 @@ def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_
177177
}
178178
],
179179
)
180+
SIMPLE_NODE_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
181+
"MERGE (node: TestType {id : params.__node_id}) ON CREATE SET node.first_ingested_at = datetime() SET node += params.__node_properties",
182+
[
183+
{
184+
"__node_id": "foo",
185+
"__node_properties": SIMPLE_NODE.properties,
186+
"__node_additional_labels": (),
187+
}
188+
],
189+
)
180190

181191
SIMPLE_NODE_EXPECTED_QUERY_ON_MATCH = QueryBatch(
182192
"MATCH (node: TestType) WHERE node.id = params.__node_id SET node += params.__node_properties",
@@ -207,6 +217,17 @@ def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_
207217
],
208218
)
209219

220+
COMPLEX_NODE_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
221+
"MERGE (node: ComplexType {id : params.__node_id}) ON CREATE SET node.first_ingested_at = datetime() WITH node, params CALL apoc.create.addLabels(node, params.__node_additional_labels) yield node as _ SET node += params.__node_properties",
222+
[
223+
{
224+
"__node_id": "foo",
225+
"__node_properties": COMPLEX_NODE.properties,
226+
"__node_additional_labels": ("ExtraTypeOne", "ExtraTypeTwo"),
227+
}
228+
],
229+
)
230+
210231
COMPLEX_NODE_TWO = Node(
211232
"ComplexType",
212233
{"id_part1": "foo", "id_part2": "bar"},
@@ -225,6 +246,18 @@ def test_generates_expected_queries(mocked_utcnow, query_builder, ttl, expected_
225246
],
226247
)
227248

249+
COMPLEX_NODE_TWO_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP = QueryBatch(
250+
"MERGE (node: ComplexType {id_part1 : params.__node_id_part1, id_part2 : params.__node_id_part2}) ON CREATE SET node.first_ingested_at = datetime() WITH node, params CALL apoc.create.addLabels(node, params.__node_additional_labels) yield node as _ SET node += params.__node_properties",
251+
[
252+
{
253+
"__node_id_part1": "foo",
254+
"__node_id_part2": "bar",
255+
"__node_properties": COMPLEX_NODE_TWO.properties,
256+
"__node_additional_labels": ("ExtraTypeOne", "ExtraTypeTwo"),
257+
}
258+
],
259+
)
260+
228261

229262
@pytest.mark.parametrize(
230263
"node,expected_query,node_creation_rule",
@@ -243,6 +276,24 @@ def test_node_update_generates_expected_queries(
243276
assert_that(query, equal_to(expected_query))
244277

245278

279+
@pytest.mark.parametrize(
280+
"node,expected_query,node_creation_rule",
281+
[
282+
[SIMPLE_NODE, SIMPLE_NODE_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP, NodeCreationRule.EAGER],
283+
[COMPLEX_NODE, COMPLEX_NODE_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP, NodeCreationRule.EAGER],
284+
[COMPLEX_NODE_TWO, COMPLEX_NODE_TWO_EXPECTED_QUERY_WITH_CREATED_TIMESTAMP, NodeCreationRule.EAGER],
285+
[SIMPLE_NODE, SIMPLE_NODE_EXPECTED_QUERY_ON_MATCH, NodeCreationRule.MATCH_ONLY],
286+
],
287+
)
288+
def test_node_update_with_created_timestamp_generates_exepected_queries(
289+
query_builder, node, expected_query, node_creation_rule
290+
):
291+
query_builder.set_first_ingested_at = True
292+
operation = OperationOnNodeIdentity(node.identity_shape, node_creation_rule)
293+
query = query_builder.generate_batch_update_node_operation_batch(operation, [node])
294+
assert_that(query, equal_to(expected_query))
295+
296+
246297
RELATIONSHIP_BETWEEN_TWO_NODES = RelationshipWithNodes(
247298
from_node=SIMPLE_NODE,
248299
to_node=COMPLEX_NODE,

0 commit comments

Comments
 (0)