11import json
22import unittest
33import os
4+ import time
5+ from decimal import Decimal
6+ from copy import deepcopy
47from unittest import TestCase
58from moto import mock_dynamodb , mock_sqs
69from boto3 import resource as boto3_resource , client as boto3_client
7- from tests .utils_for_converter_tests import ValuesForTests
10+ from tests .utils_for_converter_tests import ValuesForTests , ErrorValuesForTests
811from unittest .mock import patch
912from botocore .config import Config
13+ from pathlib import Path
14+
1015
1116MOCK_ENV_VARS = {
1217 "AWS_SQS_QUEUE_URL" : "https://sqs.eu-west-2.amazonaws.com/123456789012/test-queue" ,
1318 "DELTA_TABLE_NAME" : "immunisation-batch-internal-dev-audit-test-table" ,
1419 "SOURCE" : "test-source" ,
1520}
1621
17- patch .dict (os .environ , MOCK_ENV_VARS ).start ()
18- from delta import handler
22+ with patch .dict ("os.environ" , MOCK_ENV_VARS ):
23+ from delta import handler , Converter
24+ from Converter import imms , ErrorRecords
1925
2026
27+ @patch .dict ("os.environ" , MOCK_ENV_VARS , clear = True )
2128@mock_dynamodb
2229@mock_sqs
23- class TestHandler ( TestCase ):
30+ class TestConvertToFlatJson ( unittest . TestCase ):
2431
2532 def setUp (self ):
2633 """Set up mock DynamoDB table."""
27- self .mock_dynamodb = mock_dynamodb ()
28- self .mock_dynamodb .start ()
2934 self .dynamodb_resource = boto3_resource ("dynamodb" , "eu-west-2" )
3035 self .table = self .dynamodb_resource .create_table (
3136 TableName = "immunisation-batch-internal-dev-audit-test-table" ,
@@ -50,21 +55,14 @@ def setUp(self):
5055 "IndexName" : "PatientGSI" ,
5156 "KeySchema" : [
5257 {"AttributeName" : "Operation" , "KeyType" : "HASH" },
53- {"AttributeName" : "SupplierSystem" , "KeyType" : "HASH " },
58+ {"AttributeName" : "SupplierSystem" , "KeyType" : "RANGE " },
5459 ],
5560 "Projection" : {"ProjectionType" : "ALL" },
5661 "ProvisionedThroughput" : {"ReadCapacityUnits" : 5 , "WriteCapacityUnits" : 5 },
5762 },
5863 ],
5964 )
6065
61- @classmethod
62- def tearDownClass (cls ):
63- dynamodb_resource = boto3_resource ("dynamodb" , "eu-west-2" )
64- table = dynamodb_resource .Table ("immunisation-batch-internal-dev-audit-test-table" )
65- table .delete ()
66- table .wait_until_not_exists ()
67-
6866 @staticmethod
6967 def get_event (event_name = "INSERT" , operation = "operation" , supplier = "EMIS" ):
7068 """Returns test event data."""
@@ -98,51 +96,103 @@ def assert_dynamodb_record(self, operation_flag, items, expected_values, expecte
9896 self .assertIn (key , filtered_items [0 ], f"{ key } is missing" )
9997 self .assertEqual (filtered_items [0 ][key ], expected_value , f"{ key } mismatch" )
10098
101- def test_handler_imms_flat_json (self ):
99+ def test_fhir_converter_json_direct_data (self ):
100+ """it should convert json data to flat fhir"""
101+ imms .clear ()
102+ json_data = json .dumps (ValuesForTests .json_data )
103+
104+ start = time .time ()
105+
106+ FHIRConverter = Converter (json_data )
107+ FlatFile = FHIRConverter .runConversion (False , True )
108+
109+ flatJSON = json .dumps (FlatFile )
110+
111+ expected_imms_value = deepcopy (ValuesForTests .expected_imms ) # UPDATE is currently the default action-flag
112+ expected_imms = json .dumps (expected_imms_value )
113+ self .assertEqual (flatJSON , expected_imms )
114+
115+ errorRecords = FHIRConverter .getErrorRecords ()
116+
117+ if len (errorRecords ) > 0 :
118+ print ("Converted With Errors" )
119+ else :
120+ print ("Converted Successfully" )
121+
122+ end = time .time ()
123+ print (end - start )
124+
125+ def test_fhir_converter_json_error_scenario (self ):
126+ """it should convert json data to flat fhir - error scenarios"""
127+ error_test_cases = [ErrorValuesForTests .missing_json , ErrorValuesForTests .json_dob_error ]
128+
129+ for test_case in error_test_cases :
130+ imms .clear ()
131+ ErrorRecords .clear ()
132+ json_data = json .dumps (test_case )
133+
134+ start = time .time ()
135+
136+ FHIRConverter = Converter (json_data )
137+ FlatFile = FHIRConverter .runConversion (False , True )
138+
139+ flatJSON = json .dumps (FlatFile )
140+
141+ # if len(flatJSON) > 0:
142+ # print(flatJSON)
143+ # Fix error handling
144+ # expected_imms = ErrorValuesForTests.get_expected_imms_error_output
145+ # self.assertEqual(flatJSON, expected_imms)
146+
147+ errorRecords = FHIRConverter .getErrorRecords ()
148+
149+ if len (errorRecords ) > 0 :
150+ print ("Converted With Errors" )
151+ print (f"Error records -error scenario { errorRecords } " )
152+ else :
153+ print ("Converted Successfully" )
154+
155+ end = time .time ()
156+ print (end - start )
157+
158+ def test_handler_imms_convert_to_flat_json (self ):
102159 """Test that the Imms field contains the correct flat JSON data for CREATE, UPDATE, and DELETE operations."""
103- expected_action_flags = {"CREATE" : "NEW" , "UPDATE" : "UPDATE" , "DELETE" : "DELETE" }
160+ expected_action_flags = [
161+ {"Operation" : "CREATE" , "EXPECTED_ACTION_FLAG" : "NEW" },
162+ {"Operation" : "UPDATE" , "EXPECTED_ACTION_FLAG" : "UPDATE" },
163+ {"Operation" : "DELETE" , "EXPECTED_ACTION_FLAG" : "DELETE" },
164+ ]
104165
105- for operation , expected_action_flag in expected_action_flags .items ():
106- with self .subTest (operation = operation ):
166+ for test_case in expected_action_flags :
167+ with self .subTest (test_case ["Operation" ]):
168+ imms .clear ()
107169
108- event = self .get_event (operation = operation )
170+ event = self .get_event (operation = test_case [ "Operation" ] )
109171
110- # Call the handler
111172 response = handler (event , None )
112173
113- # Verify response success
114- # self.assertEqual(response["statusCode"], 200)
115- # self.assertEqual(response["body"], "Records processed successfully")
116-
117174 # Retrieve items from DynamoDB
118175 result = self .table .scan ()
119176 items = result .get ("Items" , [])
120- # print(f"DYNAMO THESE ITEMS : {items}")
121177
122- # Fetch expected values from the module
123178 expected_values = ValuesForTests .expected_static_values
124- expected_imms = ValuesForTests .get_expected_imms (expected_action_flag )
125- # print(f"EXPETECED IMMS : {expected_imms}")
126- # Call the assertion function
127- self . assert_dynamodb_record ( expected_action_flag , items , expected_values , expected_imms , response )
128- self . tearDown ( )
129- # self.tearDown()
179+ expected_imms = ValuesForTests .get_expected_imms (test_case [ "EXPECTED_ACTION_FLAG" ] )
180+
181+ self . assert_dynamodb_record (
182+ test_case [ "EXPECTED_ACTION_FLAG" ] , items , expected_values , expected_imms , response
183+ )
184+
130185 result = self .table .scan ()
131186 items = result .get ("Items" , [])
132- # print(f"AFTER tests runDYNAMO THESE ITEMS : {items}")
133-
134- def tearDown (self ):
135- scan = self .table .scan ()
136- with self .table .batch_writer () as batch :
137- for item in scan .get ("Items" , []):
138- batch .delete_item (Key = {"PK" : item ["PK" ]})
139- result = self .table .scan ()
140- items = result .get ("Items" , [])
141- # print(f"FINAL TABLE ITEMS DYNAMO: {items}")
142-
143- # result = self.table.scan()
144- # items = result.get("Items", [])
145- # print(f"FINAL TABLE ITEMS DYNAMO: {items}")
187+ self .clear_table ()
188+
189+ def clear_table (self ):
190+ scan = self .table .scan ()
191+ with self .table .batch_writer () as batch :
192+ for item in scan .get ("Items" , []):
193+ batch .delete_item (Key = {"PK" : item ["PK" ]})
194+ result = self .table .scan ()
195+ items = result .get ("Items" , [])
146196
147197 if __name__ == "__main__" :
148198 unittest .main ()
0 commit comments