11from types import SimpleNamespace
22from unittest .mock import MagicMock , patch
3-
43import pytest
54
6- from rest_framework .exceptions import APIException , ValidationError
5+ import neo4j
6+ import neo4j .exceptions
7+
8+ from rest_framework .exceptions import APIException , PermissionDenied , ValidationError
79
810from api .attack_paths import database as graph_database
911from api .attack_paths import views_helpers
1012
1113
14+ def _make_neo4j_error (message , code ):
15+ """Build a Neo4jError with the given message and code."""
16+ return neo4j .exceptions .Neo4jError ._hydrate_neo4j (code = code , message = message )
17+
18+
1219def test_normalize_run_payload_extracts_attributes_section ():
1320 payload = {
1421 "data" : {
@@ -122,28 +129,25 @@ def test_execute_attack_paths_query_serializes_graph(
122129 )
123130 graph = SimpleNamespace (nodes = [node , node_2 ], relationships = [relationship ])
124131
125- run_result = MagicMock ()
126- run_result .graph .return_value = graph
127-
128- session = MagicMock ()
129- session .run .return_value = run_result
130-
131- session_ctx = MagicMock ()
132- session_ctx .__enter__ .return_value = session
133- session_ctx .__exit__ .return_value = False
132+ graph_result = MagicMock ()
133+ graph_result .nodes = graph .nodes
134+ graph_result .relationships = graph .relationships
134135
135136 database_name = "db-tenant-test-tenant-id"
136137
137138 with patch (
138- "api.attack_paths.views_helpers.graph_database.get_session " ,
139- return_value = session_ctx ,
140- ) as mock_get_session :
139+ "api.attack_paths.views_helpers.graph_database.execute_read_query " ,
140+ return_value = graph_result ,
141+ ) as mock_execute_read_query :
141142 result = views_helpers .execute_attack_paths_query (
142143 database_name , definition , parameters , provider_id = provider_id
143144 )
144145
145- mock_get_session .assert_called_once_with (database_name )
146- session .run .assert_called_once_with (definition .cypher , parameters )
146+ mock_execute_read_query .assert_called_once_with (
147+ database = database_name ,
148+ cypher = definition .cypher ,
149+ parameters = parameters ,
150+ )
147151 assert result ["nodes" ][0 ]["id" ] == "node-1"
148152 assert result ["nodes" ][0 ]["properties" ]["complex" ]["items" ][0 ] == "value"
149153 assert result ["relationships" ][0 ]["label" ] == "OWNS"
@@ -163,17 +167,10 @@ def test_execute_attack_paths_query_wraps_graph_errors(
163167 database_name = "db-tenant-test-tenant-id"
164168 parameters = {"provider_uid" : "123" }
165169
166- class ExplodingContext :
167- def __enter__ (self ):
168- raise graph_database .GraphDatabaseQueryException ("boom" )
169-
170- def __exit__ (self , exc_type , exc , tb ):
171- return False
172-
173170 with (
174171 patch (
175- "api.attack_paths.views_helpers.graph_database.get_session " ,
176- return_value = ExplodingContext ( ),
172+ "api.attack_paths.views_helpers.graph_database.execute_read_query " ,
173+ side_effect = graph_database . GraphDatabaseQueryException ( "boom" ),
177174 ),
178175 patch ("api.attack_paths.views_helpers.logger" ) as mock_logger ,
179176 ):
@@ -185,6 +182,33 @@ def __exit__(self, exc_type, exc, tb):
185182 mock_logger .error .assert_called_once ()
186183
187184
185+ def test_execute_attack_paths_query_raises_permission_denied_on_read_only (
186+ attack_paths_query_definition_factory ,
187+ ):
188+ definition = attack_paths_query_definition_factory (
189+ id = "aws-rds" ,
190+ name = "RDS" ,
191+ short_description = "Short desc" ,
192+ description = "" ,
193+ cypher = "MATCH (n) RETURN n" ,
194+ parameters = [],
195+ )
196+ database_name = "db-tenant-test-tenant-id"
197+ parameters = {"provider_uid" : "123" }
198+
199+ with patch (
200+ "api.attack_paths.views_helpers.graph_database.execute_read_query" ,
201+ side_effect = graph_database .WriteQueryNotAllowedException (
202+ message = "Read query not allowed" ,
203+ code = "Neo.ClientError.Statement.AccessMode" ,
204+ ),
205+ ):
206+ with pytest .raises (PermissionDenied ):
207+ views_helpers .execute_attack_paths_query (
208+ database_name , definition , parameters , provider_id = "test-provider-123"
209+ )
210+
211+
188212def test_serialize_graph_filters_by_provider_id (attack_paths_graph_stub_classes ):
189213 provider_id = "provider-keep"
190214
@@ -216,3 +240,105 @@ def test_serialize_graph_filters_by_provider_id(attack_paths_graph_stub_classes)
216240 assert result ["nodes" ][0 ]["id" ] == "n1"
217241 assert len (result ["relationships" ]) == 1
218242 assert result ["relationships" ][0 ]["id" ] == "r1"
243+
244+
245+ # -- execute_read_query read-only enforcement ---------------------------------
246+
247+
248+ @pytest .fixture
249+ def mock_neo4j_session ():
250+ """Mock the Neo4j driver so execute_read_query uses a fake session."""
251+ mock_session = MagicMock (spec = neo4j .Session )
252+ mock_driver = MagicMock (spec = neo4j .Driver )
253+ mock_driver .session .return_value = mock_session
254+
255+ with patch ("api.attack_paths.database.get_driver" , return_value = mock_driver ):
256+ yield mock_session
257+
258+
259+ def test_execute_read_query_succeeds_with_select (mock_neo4j_session ):
260+ mock_graph = MagicMock (spec = neo4j .graph .Graph )
261+ mock_neo4j_session .execute_read .return_value = mock_graph
262+
263+ result = graph_database .execute_read_query (
264+ database = "test-db" ,
265+ cypher = "MATCH (n:AWSAccount) RETURN n LIMIT 10" ,
266+ )
267+
268+ assert result is mock_graph
269+
270+
271+ def test_execute_read_query_rejects_create (mock_neo4j_session ):
272+ mock_neo4j_session .execute_read .side_effect = _make_neo4j_error (
273+ "Writing in read access mode not allowed" ,
274+ "Neo.ClientError.Statement.AccessMode" ,
275+ )
276+
277+ with pytest .raises (graph_database .WriteQueryNotAllowedException ):
278+ graph_database .execute_read_query (
279+ database = "test-db" ,
280+ cypher = "CREATE (n:Node {name: 'test'}) RETURN n" ,
281+ )
282+
283+
284+ def test_execute_read_query_rejects_update (mock_neo4j_session ):
285+ mock_neo4j_session .execute_read .side_effect = _make_neo4j_error (
286+ "Writing in read access mode not allowed" ,
287+ "Neo.ClientError.Statement.AccessMode" ,
288+ )
289+
290+ with pytest .raises (graph_database .WriteQueryNotAllowedException ):
291+ graph_database .execute_read_query (
292+ database = "test-db" ,
293+ cypher = "MATCH (n:Node) SET n.name = 'updated' RETURN n" ,
294+ )
295+
296+
297+ def test_execute_read_query_rejects_delete (mock_neo4j_session ):
298+ mock_neo4j_session .execute_read .side_effect = _make_neo4j_error (
299+ "Writing in read access mode not allowed" ,
300+ "Neo.ClientError.Statement.AccessMode" ,
301+ )
302+
303+ with pytest .raises (graph_database .WriteQueryNotAllowedException ):
304+ graph_database .execute_read_query (
305+ database = "test-db" ,
306+ cypher = "MATCH (n:Node) DELETE n" ,
307+ )
308+
309+
310+ @pytest .mark .parametrize (
311+ "cypher" ,
312+ [
313+ "CALL apoc.create.vNode(['Label'], {name: 'test'}) YIELD node RETURN node" ,
314+ "MATCH (a)-[r]->(b) CALL apoc.create.vRelationship(a, 'REL', {}, b) YIELD rel RETURN rel" ,
315+ ],
316+ ids = ["apoc.create.vNode" , "apoc.create.vRelationship" ],
317+ )
318+ def test_execute_read_query_succeeds_with_apoc_virtual_create (
319+ mock_neo4j_session , cypher
320+ ):
321+ mock_graph = MagicMock (spec = neo4j .graph .Graph )
322+ mock_neo4j_session .execute_read .return_value = mock_graph
323+
324+ result = graph_database .execute_read_query (database = "test-db" , cypher = cypher )
325+
326+ assert result is mock_graph
327+
328+
329+ @pytest .mark .parametrize (
330+ "cypher" ,
331+ [
332+ "CALL apoc.create.node(['Label'], {name: 'test'}) YIELD node RETURN node" ,
333+ "MATCH (a), (b) CALL apoc.create.relationship(a, 'REL', {}, b) YIELD rel RETURN rel" ,
334+ ],
335+ ids = ["apoc.create.Node" , "apoc.create.Relationship" ],
336+ )
337+ def test_execute_read_query_rejects_apoc_real_create (mock_neo4j_session , cypher ):
338+ mock_neo4j_session .execute_read .side_effect = _make_neo4j_error (
339+ "There is no procedure with the name `apoc.create.node` registered" ,
340+ "Neo.ClientError.Procedure.ProcedureNotFound" ,
341+ )
342+
343+ with pytest .raises (graph_database .WriteQueryNotAllowedException ):
344+ graph_database .execute_read_query (database = "test-db" , cypher = cypher )
0 commit comments