Skip to content

Commit 94d7b03

Browse files
committed
Wip local test
1 parent 5614a78 commit 94d7b03

File tree

3 files changed

+114
-11
lines changed

3 files changed

+114
-11
lines changed

e2e_batch/test_e2e_batch.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import time
2+
from unittest import skipIf
3+
24
import unittest
35
from utils import (
46
generate_csv,
@@ -25,6 +27,7 @@
2527
)
2628

2729

30+
@skipIf(True)
2831
class TestE2EBatch(unittest.TestCase):
2932
def setUp(self):
3033
self.uploaded_files = [] # Tracks uploaded input keys

e2e_batch/test_simultaneous.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import unittest
2+
from utils import (
3+
generate_csv,
4+
upload_file_to_s3,
5+
get_file_content_from_s3,
6+
wait_for_ack_files,
7+
check_ack_file_content,
8+
validate_row_count,
9+
delete_file_from_s3
10+
)
11+
12+
from constants import (
13+
SOURCE_BUCKET,
14+
INPUT_PREFIX,
15+
ACK_BUCKET,
16+
environment
17+
)
18+
19+
20+
class TestE2EBatch(unittest.TestCase):
21+
def setUp(self):
22+
self.uploaded_files = [] # Tracks uploaded input keys
23+
self.ack_files = [] # Tracks ack keys
24+
25+
def tearDown(self):
26+
for file_key in self.uploaded_files:
27+
delete_file_from_s3(SOURCE_BUCKET, file_key)
28+
for ack_key in self.ack_files:
29+
delete_file_from_s3(ACK_BUCKET, ack_key)
30+
31+
@unittest.skipIf(environment == "ref")
32+
def test_create_success(self):
33+
"""Test CREATE scenario."""
34+
35+
# create 3 files to simulate simultaneous uploads
36+
input_files = generate_files("PHYLIS", "0.3", action_flags=["CREATE", "UPDATE", "DELETE"])
37+
38+
for input_file in input_files:
39+
key = upload_file_to_s3(input_file, SOURCE_BUCKET, INPUT_PREFIX)
40+
self.uploaded_files.append(key)
41+
42+
self.ack_files = wait_for_ack_files(None, input_file)
43+
44+
for input_file, ack_key in zip(input_files, self.ack_files):
45+
validate_row_count(input_file, ack_key)
46+
47+
ack_content = get_file_content_from_s3(ACK_BUCKET, ack_key)
48+
check_ack_file_content(ack_content, "OK", None, "CREATE")
49+
50+
51+
def generate_files(name_prefix, version, action_flags) -> list:
52+
"""Generate multiple CSV files with different action flags."""
53+
files = []
54+
for action in action_flags:
55+
file = generate_csv(name_prefix, version, action_flag=action)
56+
files.append(file)
57+
return files

e2e_batch/utils.py

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@
2727
)
2828

2929

30-
def generate_csv(fore_name, dose_amount, action_flag, headers="NHS_NUMBER", same_id=False, file_key=False):
30+
def generate_csv(fore_name, dose_amount,
31+
action_flag, headers="NHS_NUMBER",
32+
same_id=False, file_key=False,
33+
vax_type="RSV", ods="YGM41",
34+
timestamp=None):
3135
"""
3236
Generate a CSV file with 2 or 3 rows depending on the action_flag.
3337
@@ -85,12 +89,7 @@ def generate_csv(fore_name, dose_amount, action_flag, headers="NHS_NUMBER", same
8589
data.append(create_row(unique_id, "fore_name", dose_amount, "UPDATE", headers))
8690

8791
df = pd.DataFrame(data)
88-
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%f")[:-3]
89-
file_name = (
90-
f"COVID19_Vaccinations_v4_YGM41_{timestamp}.csv"
91-
if file_key
92-
else f"COVID19_Vaccinations_v5_YGM41_{timestamp}.csv"
93-
)
92+
file_name = get_file_name(vax_type, ods, file_key, timestamp)
9493
df.to_csv(file_name, index=False, sep="|", quoting=csv.QUOTE_MINIMAL)
9594
return file_name
9695

@@ -138,7 +137,7 @@ def delete_file_from_s3(bucket, key):
138137
raise Exception(f"Unexpected error during file deletion: {e}")
139138

140139

141-
def wait_for_ack_file(ack_prefix, input_file_name, timeout=120):
140+
def wait_for_ack_file(ack_prefix, input_file_name, timeout=1200):
142141
"""Poll the ACK_BUCKET for an ack file that contains the input_file_name as a substring."""
143142

144143
filename_without_ext = input_file_name[:-4] if input_file_name.endswith(".csv") else input_file_name
@@ -156,6 +155,35 @@ def wait_for_ack_file(ack_prefix, input_file_name, timeout=120):
156155
key = obj["Key"]
157156
if search_pattern in key:
158157
return key
158+
159+
time.sleep(5)
160+
raise AckFileNotFoundError(
161+
f"Ack file matching '{search_pattern}' not found in bucket {ACK_BUCKET} within {timeout} seconds."
162+
)
163+
164+
165+
def wait_for_ack_files(ack_prefix, input_file_name, n_files_expected=1, timeout=1200):
166+
"""Poll the ACK_BUCKET for an ack file that contains the input_file_name as a substring."""
167+
168+
filename_without_ext = input_file_name[:-4] if input_file_name.endswith(".csv") else input_file_name
169+
if ack_prefix:
170+
search_pattern = f"{ACK_PREFIX}{filename_without_ext}"
171+
ack_prefix = ACK_PREFIX
172+
else:
173+
search_pattern = f"{FORWARDEDFILE_PREFIX}{filename_without_ext}"
174+
ack_prefix = FORWARDEDFILE_PREFIX
175+
start_time = time.time()
176+
matched_files = []
177+
178+
while time.time() - start_time < timeout:
179+
response = s3_client.list_objects_v2(Bucket=ACK_BUCKET, Prefix=ack_prefix)
180+
if "Contents" in response:
181+
for obj in response["Contents"]:
182+
key = obj["Key"]
183+
if search_pattern in key:
184+
matched_files.append(key)
185+
if len(matched_files) >= n_files_expected:
186+
return matched_files
159187
time.sleep(5)
160188
raise AckFileNotFoundError(
161189
f"Ack file matching '{search_pattern}' not found in bucket {ACK_BUCKET} within {timeout} seconds."
@@ -391,7 +419,7 @@ def upload_config_file(value):
391419
upload_file_to_s3(PERMISSIONS_CONFIG_FILE_KEY, CONFIG_BUCKET, INPUT_PREFIX)
392420

393421

394-
def generate_csv_with_ordered_100000_rows(file_name=None):
422+
def generate_csv_with_ordered_100000_rows(vax_type, ods):
395423
"""
396424
Generate a CSV where:
397425
- 100 sets of (NEW → UPDATE → DELETE) are created.
@@ -443,8 +471,7 @@ def generate_csv_with_ordered_100000_rows(file_name=None):
443471

444472
# Convert to DataFrame and save as CSV
445473
df = pd.DataFrame(full_data)
446-
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%f")[:-3]
447-
file_name = f"RSV_Vaccinations_v5_YGM41_{timestamp}.csv" if not file_name else file_name
474+
file_name = get_file_name(vax_type, ods, "5")
448475
df.to_csv(file_name, index=False, sep="|", quoting=csv.QUOTE_MINIMAL)
449476
return file_name
450477

@@ -465,3 +492,19 @@ def verify_final_ack_file(file_key):
465492
f"All values OK: {all_ok}"
466493
)
467494
return True
495+
496+
497+
def get_file_name(vax_type, ods, file_key, timestamp=None):
498+
version = "4" if file_key else "5"
499+
500+
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%f")[:-3] if not timestamp else timestamp
501+
return f"{vax_type}_Vaccinations_v{version}_{ods}_{timestamp}.csv"
502+
503+
504+
def generate_fileset(name_prefix, version, action_flags) -> list:
505+
"""Generate multiple CSV files with different action flags."""
506+
files = []
507+
for action in action_flags:
508+
file = generate_csv(name_prefix, version, action_flag=action)
509+
files.append(file)
510+
return files

0 commit comments

Comments
 (0)