11import json
22from datetime import datetime
3- from unittest .mock import Mock , patch , MagicMock , call
3+ from unittest .mock import Mock , patch , MagicMock , call , mock_open
44
55import apsw
6+ import logging
67
8+ from nettacker .api .helpers import structure
79from nettacker .database .db import (
810 db_inputs ,
911 create_connection ,
@@ -767,6 +769,39 @@ def test_find_events_sqlite(self, mock_create_conn):
767769 expected = ['{"event1": "data1"}' , '{"event2": "data2"}' ]
768770 assert result == expected
769771
772+ @patch ("nettacker.database.db.logging.warn" )
773+ @patch ("nettacker.database.db.create_connection" )
774+ def test_find_events_sqlite_exception (self , mock_create_conn , mock_warn ):
775+ mock_connection = Mock ()
776+ mock_cursor = Mock ()
777+ mock_create_conn .return_value = (mock_connection , mock_cursor )
778+
779+ mock_cursor .execute .side_effect = Exception ("DB error" )
780+ result = find_events ("192.168.1.1" , "http" , "scan_123" )
781+
782+ assert result == []
783+ mock_warn .assert_called_once_with ("Database query failed..." )
784+
785+ @patch ("nettacker.database.db.create_connection" )
786+ def test_find_events_sqlalchemy (self , mock_create_conn ):
787+ mock_session = Mock ()
788+ mock_create_conn .return_value = mock_session
789+
790+ mock_row1 = Mock ()
791+ mock_row2 = Mock ()
792+ mock_row1 .json_event = '{"event": "scan started"}'
793+ mock_row2 .json_event = '{"event": "port open"}'
794+ mock_session .query .return_value .filter .return_value .all .return_value = [mock_row1 , mock_row2 ]
795+
796+ result = find_events ("192.168.1.1" , "http" , "scan_123" )
797+ assert result == [
798+ '{"event": "scan started"}' ,
799+ '{"event": "port open"}'
800+ ]
801+
802+ mock_session .query .assert_called_once ()
803+ mock_session .query .return_value .filter .return_value .all .assert_called_once ()
804+
770805 # -------------------------------------------------------
771806 # tests for select_reports
772807 # -------------------------------------------------------
@@ -782,7 +817,7 @@ def test_select_reports_sqlite(self, mock_create_conn):
782817 (1 , "2024-01-01" , "scan_123" , "/tmp/report.json" , '{"target": "192.168.1.1"}' )
783818 ]
784819
785- result = select_reports (1 )
820+ result = select_reports (self . page )
786821
787822 mock_cursor .execute .assert_called_with (
788823 """
@@ -805,6 +840,50 @@ def test_select_reports_sqlite(self, mock_create_conn):
805840 ]
806841 assert result == expected
807842
843+ @patch ("nettacker.database.db.logging.warn" )
844+ @patch ("nettacker.database.db.create_connection" )
845+ def test_select_reports_sqlite_exception (self , mock_create_conn , mock_warn ):
846+ mock_connection = Mock ()
847+ mock_cursor = Mock ()
848+ mock_create_conn .return_value = (mock_connection , mock_cursor )
849+ mock_cursor .query .side_effect = Exception ("DB Error" )
850+
851+ result = select_reports (self .page )
852+ assert result == structure (status = "error" , msg = "database error!" )
853+ mock_warn .assert_called_once_with ("Could not retrieve report..." )
854+
855+
856+ @patch ("nettacker.database.db.create_connection" )
857+ def test_select_reports_sqlalchemy (self , mock_create_conn ):
858+ mock_session = Mock ()
859+ mock_create_conn .return_value = mock_session
860+
861+ mock_report = Mock ()
862+ mock_report .id = 1
863+ mock_report .date = "2024-01-01"
864+ mock_report .scan_unique_id = "scan_123"
865+ mock_report .report_path_filename = "/tmp/report.json"
866+ mock_report .options = json .dumps ({"target" : "192.168.1.1" })
867+
868+ mock_session .query .return_value .order_by .return_value .offset .return_value .limit .return_value = [mock_report ]
869+ result = select_reports (self .page )
870+
871+ assert result == [{
872+ "id" : 1 ,
873+ "date" : "2024-01-01" ,
874+ "scan_id" : "scan_123" ,
875+ "report_path_filename" : "/tmp/report.json" ,
876+ "options" : {"target" : "192.168.1.1" }
877+ }]
878+
879+
880+ @patch ("nettacker.database.db.create_connection" )
881+ def test_select_reports_sqlalchemy_exception (self , mock_create_conn ):
882+ mock_session = Mock ()
883+ mock_create_conn .return_value = mock_session
884+ mock_session .query .side_effect = Exception ("DB Error" )
885+ result = select_reports (self .page )
886+ assert result == structure (status = "error" , msg = "database error!" )
808887 # -------------------------------------------------------
809888 # tests for get_scan_result
810889 # -------------------------------------------------------
@@ -836,6 +915,22 @@ def test_get_scan_result_sqlite(self, mock_open, mock_create_conn):
836915 assert filename == "/tmp/report.json"
837916 assert content == b'{"result": "data"}'
838917
918+ @patch ("nettacker.database.db.create_connection" )
919+ @patch ("builtins.open" , new_callable = mock_open , read_data = b"mock file content" )
920+ def test_get_scan_result_sqlalchemy (self , mock_open_builtin , mock_create_conn ):
921+ mock_session = Mock ()
922+ mock_create_conn .return_value = mock_session
923+
924+ mock_report = Mock ()
925+ mock_report .report_path_filename = "/tmp/mock_report.json"
926+
927+ mock_session .query .return_value .filter_by .return_value .first .return_value = mock_report
928+
929+ filename , content = get_scan_result (1 )
930+ assert filename == "/tmp/mock_report.json"
931+ assert content == b"mock file content"
932+
933+ mock_open_builtin .assert_called_once_with ("/tmp/mock_report.json" , "rb" )
839934 # -------------------------------------------------------
840935 # tests for last_host_logs
841936 # -------------------------------------------------------
@@ -849,7 +944,7 @@ def test_last_host_logs_sqlite(self, mock_create_conn):
849944
850945 # Mock the sequence of database calls
851946 mock_cursor .fetchall .side_effect = [
852- [("192.168.1.1" ,)], # targets
947+ [(self . target ,)], # targets
853948 [("port_scan" ,)], # module_names for target
854949 [("port_scan" ,), ("vuln_scan" ,)], # events for target
855950 ]
0 commit comments