Skip to content

Commit 10e8083

Browse files
authored
Merge pull request #14 from nodestream-proj/neo4j_reader_fix
Neo4j Reader Fix + Integration Test
2 parents f2594c6 + ffb5a7a commit 10e8083

File tree

4 files changed

+56
-2
lines changed

4 files changed

+56
-2
lines changed

nodestream_plugin_neo4j/neo4j_database.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def __init__(self, driver: AsyncDriver, database_name: str) -> None:
2727
self.logger = getLogger(self.__class__.__name__)
2828

2929
async def execute(
30-
self, query: Query, log_result: bool = False, routing=RoutingControl.WRITE
30+
self, query: Query, log_result: bool = False, routing_=RoutingControl.WRITE
3131
) -> Iterable[Record]:
3232
self.logger.info(
3333
"Executing Cypher Query to Neo4j",
@@ -41,7 +41,7 @@ async def execute(
4141
query.query_statement,
4242
query.parameters,
4343
database_=self.database_name,
44-
routing_=routing,
44+
routing_=routing_,
4545
)
4646
if log_result:
4747
for record in result.records:
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
- implementation: nodestream_plugin_neo4j.extractor:Neo4jExtractor
2+
arguments:
3+
database: neo4j
4+
uri: !env NEO4J_CONNECT_URI
5+
username: neo4j
6+
password: password
7+
query: |
8+
MATCH (a:ObjectA)
9+
RETURN a.identifier as identifier
10+
SKIP $offset LIMIT $limit
11+
12+
- implementation: nodestream.interpreting:Interpreter
13+
arguments:
14+
interpretations:
15+
- type: source_node
16+
node_type: NewObject
17+
key:
18+
identifier: !jmespath identifier

tests/e2e/project/nodestream.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ scopes:
99
name: node-ttls
1010
- path: tests/e2e/project/relationship-ttls.yaml
1111
name: relationship-ttls
12+
- path: tests/e2e/project/extractor_integration.yaml
13+
name: extractor_integration
1214

1315
targets:
1416
my-neo4j-db:

tests/e2e/test_neo4j_pipelines.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ def validate_airports(session):
3030
assert result.single()["count"] == 1000
3131

3232

33+
def validate_enrichment(session):
34+
result = session.run(
35+
"""
36+
MATCH (a:NewObject)
37+
RETURN count(a) AS count
38+
"""
39+
)
40+
assert result.single()["count"] == 30
41+
42+
3343
def valiudate_airport_country(session):
3444
result = session.run(
3545
"""
@@ -130,6 +140,9 @@ def validate_ttl_seperation_between_node_object_types(session):
130140
("node-ttls", [validate_ttl_seperation_between_node_object_types]),
131141
]
132142

143+
EXTRACTOR_TESTS = [("extractor_integration", [validate_enrichment])]
144+
145+
133146
NODE_CREATION_QUERY = """
134147
CREATE (n:{node_label})
135148
SET n.last_ingested_at = $timestamp
@@ -309,3 +322,24 @@ async def test_neo4j_ttls(project, neo4j_container, neo4j_version):
309322
)
310323
for validator in validations:
311324
validator(session)
325+
326+
327+
@pytest.mark.asyncio
328+
@pytest.mark.e2e
329+
@pytest.mark.parametrize("neo4j_version", TESTED_NEO4J_VERSIONS)
330+
async def test_neo4j_extractor(project, neo4j_container, neo4j_version):
331+
with neo4j_container(
332+
neo4j_version
333+
) as neo4j_container, neo4j_container.get_driver() as driver, driver.session() as session:
334+
create_test_objects(session)
335+
target = project.get_target_by_name("my-neo4j-db")
336+
for pipeline_name, validations in EXTRACTOR_TESTS:
337+
await project.run(
338+
RunRequest(
339+
pipeline_name,
340+
PipelineInitializationArguments(extra_steps=[target.make_writer()]),
341+
PipelineProgressReporter(),
342+
)
343+
)
344+
for validator in validations:
345+
validator(session)

0 commit comments

Comments
 (0)