Skip to content

Commit 5128864

Browse files
committed
debug & async
1 parent 33f3189 commit 5128864

File tree

4 files changed

+44
-18
lines changed

4 files changed

+44
-18
lines changed

e2e_batch/test_e2e_batch.py

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ async def asyncTearDown(self):
3939
if environment != "ref":
4040
async def test_create_success(self):
4141
"""Test CREATE scenario."""
42-
input_file = generate_csv("PHYLIS", "0.3", action_flag="CREATE", offset=1)
42+
input_file = generate_csv("PHYLIS", "0.3",
43+
action_flag="CREATE", offset=1,
44+
vax_type="COVID19", ods="8HA94")
4345

4446
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
4547
self.uploaded_files.append(key)
@@ -55,7 +57,9 @@ async def test_create_success(self):
5557
async def test_duplicate_create(self):
5658
"""Test DUPLICATE scenario."""
5759

58-
input_file = generate_csv("PHYLIS", "0.3", action_flag="CREATE", same_id=True, offset=2)
60+
input_file = generate_csv("PHYLIS", "0.3",
61+
action_flag="CREATE", same_id=True, offset=2,
62+
vax_type="FLU", ods="8HK48")
5963

6064
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
6165
self.uploaded_files.append(key)
@@ -70,7 +74,8 @@ async def test_duplicate_create(self):
7074

7175
async def test_update_success(self):
7276
"""Test UPDATE scenario."""
73-
input_file = generate_csv("PHYLIS", "0.5", action_flag="UPDATE", offset=3)
77+
input_file = generate_csv("PHYLIS", "0.5", action_flag="UPDATE",
78+
offset=3, vax_type="MMR", ods="V0V8L")
7479

7580
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
7681
self.uploaded_files.append(key)
@@ -85,7 +90,9 @@ async def test_update_success(self):
8590

8691
async def test_reinstated_success(self):
8792
"""Test REINSTATED scenario."""
88-
input_file = generate_csv("PHYLIS", "0.5", action_flag="REINSTATED", offset=4)
93+
input_file = generate_csv("PHYLIS", "0.5",
94+
action_flag="REINSTATED", offset=4,
95+
vax_type="HPV", ods="DPSREDUCED")
8996

9097
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
9198
self.uploaded_files.append(key)
@@ -100,7 +107,9 @@ async def test_reinstated_success(self):
100107

101108
async def test_update_reinstated_success(self):
102109
"""Test UPDATE-REINSTATED scenario."""
103-
input_file = generate_csv("PHYLIS", "0.5", action_flag="UPDATE-REINSTATED", offset=5)
110+
input_file = generate_csv("PHYLIS", "0.5",
111+
action_flag="UPDATE-REINSTATED", offset=5,
112+
vax_type="MENACWY", ods="DPSFULL")
104113

105114
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
106115
self.uploaded_files.append(key)
@@ -115,7 +124,9 @@ async def test_update_reinstated_success(self):
115124

116125
async def test_delete_success(self):
117126
"""Test DELETE scenario."""
118-
input_file = generate_csv("PHYLIS", "0.8", action_flag="DELETE", offset=6)
127+
input_file = generate_csv("PHYLIS", "0.8",
128+
action_flag="DELETE", offset=6,
129+
vax_type="MMR", ods="V0V8L")
119130

120131
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
121132
self.uploaded_files.append(key)
@@ -130,7 +141,8 @@ async def test_delete_success(self):
130141

131142
async def test_pre_validation_error(self):
132143
"""Test PRE-VALIDATION error scenario."""
133-
input_file = generate_csv("PHYLIS", "TRUE", action_flag="CREATE", offset=7)
144+
input_file = generate_csv("PHYLIS", "TRUE", action_flag="CREATE",
145+
offset=7, vax_type="MMR", ods="X8E5B")
134146

135147
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
136148
self.uploaded_files.append(key)
@@ -145,7 +157,8 @@ async def test_pre_validation_error(self):
145157

146158
async def test_post_validation_error(self):
147159
"""Test POST-VALIDATION error scenario."""
148-
input_file = generate_csv("", "0.3", action_flag="CREATE", offset=8)
160+
input_file = generate_csv("", "0.3", action_flag="CREATE",
161+
offset=8, vax_type="3IN1", ods="YGJ")
149162

150163
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
151164
self.uploaded_files.append(key)
@@ -158,8 +171,9 @@ async def test_post_validation_error(self):
158171

159172
async def test_file_name_validation_error(self):
160173
"""Test FILE-NAME-VALIDATION error scenario."""
161-
input_file = generate_csv("PHYLIS", "0.3", action_flag="CREATE", file_key=True, offset=9)
162-
174+
input_file = generate_csv("PHYLIS", "0.3", action_flag="CREATE",
175+
file_key=True, offset=9,
176+
vax_type="HPV", ods="YGA")
163177
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
164178
self.uploaded_files.append(key)
165179

@@ -171,7 +185,9 @@ async def test_file_name_validation_error(self):
171185

172186
async def test_header_name_validation_error(self):
173187
"""Test HEADER-NAME-VALIDATION error scenario."""
174-
input_file = generate_csv("PHYLIS", "0.3", action_flag="CREATE", headers="NH_NUMBER", offset=10)
188+
input_file = generate_csv("PHYLIS", "0.3", action_flag="CREATE",
189+
headers="NH_NUMBER", offset=10,
190+
vax_type="3IN1", ods="YGMYW")
175191

176192
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
177193
self.uploaded_files.append(key)
@@ -191,7 +207,8 @@ async def test_invalid_permission(self):
191207
await upload_config_file("MMR_FULL") # permissions_config.json is updated here
192208
await asyncio.sleep(20)
193209

194-
input_file = generate_csv("PHYLIS", "0.3", action_flag="CREATE", offset=11)
210+
input_file = generate_csv("PHYLIS", "0.3", action_flag="CREATE",
211+
offset=11, vax_type="PINNACLE", ods="8J1100001")
195212

196213
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
197214
self.uploaded_files.append(key)
@@ -208,7 +225,8 @@ async def test_invalid_permission(self):
208225
else:
209226
async def test_end_to_end_speed_test_with_100000_rows(self):
210227
"""Test end_to_end_speed_test_with_100000_rows scenario with full integration"""
211-
input_file = generate_csv_with_ordered_100000_rows(12)
228+
input_file = generate_csv_with_ordered_100000_rows(12,
229+
vax_type="COVID19", ods="DPSFULL")
212230

213231
key = await upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
214232
self.uploaded_files.append(key)

e2e_batch/utils.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ def get_timestamp_with_offset(offset):
3838
return dt.strftime("%Y%m%dT%H%M%S%f")[:-3]
3939

4040

41-
def generate_csv(fore_name, dose_amount, action_flag, offset, headers="NHS_NUMBER", same_id=False, file_key=False):
41+
def generate_csv(fore_name, dose_amount,
42+
action_flag, offset,
43+
headers="NHS_NUMBER",
44+
same_id=False, file_key=False,
45+
vax_type="RSV", ods="YGM41"):
4246
"""
4347
Generate a CSV file with 2 or 3 rows depending on the action_flag.
4448
@@ -103,7 +107,7 @@ def generate_csv(fore_name, dose_amount, action_flag, offset, headers="NHS_NUMBE
103107
# if file_key
104108
# else f"RSV_Vaccinations_v5_YGM41_{timestamp}.csv"
105109
# )
106-
file_name = get_file_name("RSV", "YGM41", file_key, offset)
110+
file_name = get_file_name(vax_type, ods, file_key, offset)
107111
# if test_name == file_name:
108112
# print("SAW> File name generation is consistent.")
109113

@@ -410,7 +414,7 @@ async def upload_config_file(value):
410414
upload_file_to_s3(PERMISSIONS_CONFIG_FILE_KEY, CONFIG_BUCKET, INPUT_PREFIX)
411415

412416

413-
def generate_csv_with_ordered_100000_rows(offset):
417+
def generate_csv_with_ordered_100000_rows(vax_type, ods, offset):
414418
"""
415419
Generate a CSV where:
416420
- 100 sets of (NEW → UPDATE → DELETE) are created.
@@ -464,7 +468,7 @@ def generate_csv_with_ordered_100000_rows(offset):
464468
df = pd.DataFrame(full_data)
465469
# timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%f")[:-3]
466470
# file_name = f"RSV_Vaccinations_v5_YGM41_{timestamp}.csv" if not file_name else file_name
467-
file_name = get_file_name("RSV", "YGM41", "5", day_offset=offset)
471+
file_name = get_file_name(vax_type, ods, "5", day_offset=offset)
468472

469473
# file_name = get_file_name("RSV", "YGM41", "5") if not file_name else file_name
470474
df.to_csv(file_name, index=False, sep="|", quoting=csv.QUOTE_MINIMAL)

filenameprocessor/src/send_sqs_message.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@ def send_to_supplier_queue(message_body: dict, vaccine_type: str, supplier: str)
1414
logger.error(error_message)
1515
raise InvalidSupplierError(error_message)
1616

17-
try:
17+
try:
1818
queue_url = os.getenv("QUEUE_URL")
1919
sqs_client.send_message(
2020
QueueUrl=queue_url, MessageBody=json_dumps(message_body), MessageGroupId=f"{supplier}_{vaccine_type}"
2121
)
2222
logger.info("Message sent to SQS queue for supplier: %s", supplier)
23+
logger.info("MessageGroupId: %s", f"{supplier}_{vaccine_type}")
2324
except Exception as error: # pylint: disable=broad-exception-caught
2425
error_message = f"An unexpected error occurred whilst sending to SQS: {error}"
2526
logger.error(error_message)

recordprocessor/src/batch_processor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ def main(event: str) -> None:
110110
"""Process each row of the file"""
111111
logger.info("task started")
112112
start = time.time()
113+
# dump the event to logs
114+
logger.info("Batch Processor")
115+
logger.info("event")
113116
n_rows_processed = 0
114117
try:
115118
n_rows_processed = process_csv_to_fhir(incoming_message_body=json.loads(event))

0 commit comments

Comments
 (0)