|
3 | 3 | from utils import ( |
4 | 4 | upload_file_to_s3, |
5 | 5 | get_file_content_from_s3, |
6 | | - wait_for_ack_file, |
7 | 6 | check_ack_file_content, |
8 | 7 | validate_row_count, |
9 | 8 | generate_csv_files, |
10 | | - SeedTestData |
| 9 | + SeedTestData, |
| 10 | + TestData, |
| 11 | + poll_destination |
11 | 12 | ) |
12 | 13 |
|
13 | 14 | from constants import ( |
14 | 15 | SOURCE_BUCKET, |
15 | 16 | INPUT_PREFIX, |
16 | | - ACK_BUCKET, |
17 | 17 | environment |
18 | 18 | ) |
19 | 19 |
|
|
22 | 22 | DELETE = "DELETE" |
23 | 23 |
|
24 | 24 |
|
25 | | -ods_vaccines = { |
26 | | - "DPSFULL": ["3IN1", "COVID19", "FLU", "HPV", "MENACWY", "MMR", "RSV"], |
27 | | - "DPSREDUCED": ["3IN1", "COVID19", "FLU", "HPV", "MENACWY", "MMR", "RSV"], |
28 | | - "V0V8L": ["3IN1", "FLU", "HPV", "MENACWY", "MMR"], |
29 | | - "8HK48": ["FLU"], |
30 | | - "8HA94": ["COVID19"], |
31 | | - "X26": ["MMR", "RSV"], |
32 | | - "X8E5B": ["MMR", "RSV"], |
33 | | - "YGM41": ["3IN1", "COVID19", "HPV", "MENACWY", "MMR", "RSV"], |
34 | | - "YGJ": ["3IN1", "COVID19", "HPV", "MENACWY", "MMR", "RSV"], |
35 | | - "YGA": ["3IN1", "HPV", "MENACWY", "MMR", "RSV"], |
36 | | - "YGMYW": ["3IN1", "HPV", "MENACWY", "MMR", "RSV"], |
37 | | -} |
38 | | - |
39 | 25 | seed_datas = [ |
40 | 26 | SeedTestData("Create", "V0V8L", [CREATE]), |
41 | 27 | SeedTestData("Update", "8HK48", [CREATE, UPDATE]), |
|
50 | 36 |
|
51 | 37 | class TestE2EBatch(unittest.TestCase): |
52 | 38 |
|
53 | | - @unittest.skipIf(environment == "ref") |
| 39 | + @unittest.skipIf(environment == "ref", "Skip for ref") |
54 | 40 | def test_create_success(self): |
55 | 41 | """Test CREATE scenario.""" |
| 42 | + max_timeout = 1200 # seconds |
56 | 43 |
|
57 | | - test_datas = generate_csv_files(seed_datas) |
| 44 | + test_datas: list[TestData] = generate_csv_files(seed_datas) |
58 | 45 |
|
59 | 46 | for test in test_datas: |
| 47 | + |
60 | 48 | key = upload_file_to_s3(test.file_name, SOURCE_BUCKET, INPUT_PREFIX) |
61 | 49 | test.key = key |
62 | 50 |
|
63 | | - process_acks_as_received(test_datas, ACK_BUCKET) |
64 | | - |
65 | | - |
66 | | -def process_acks_as_received(test_datas, ack_bucket, poll_interval=2, timeout=1200): |
67 | | - """ |
68 | | - Polls for ACK files and processes them as soon as they are available. |
69 | | - """ |
70 | | - start_time = time.time() |
71 | | - pending = {test.file_name: test for test in test_datas} |
72 | | - processed = set() |
73 | | - |
74 | | - while pending and (time.time() - start_time) < timeout: |
75 | | - for file_name in list(pending.keys()): |
76 | | - ack_key = wait_for_ack_file(None, file_name, ack_bucket, timeout=1) |
77 | | - if ack_key: |
78 | | - # Process the ACK immediately |
79 | | - validate_row_count(file_name, ack_key) |
80 | | - ack_content = get_file_content_from_s3(ack_bucket, ack_key) |
81 | | - check_ack_file_content(ack_content, "OK", None, "CREATE") |
82 | | - processed.add(file_name) |
83 | | - del pending[file_name] |
84 | | - if pending: |
85 | | - time.sleep(poll_interval) |
86 | | - |
87 | | - if pending: |
88 | | - raise TimeoutError(f"Timeout waiting for ACKs: {list(pending.keys())}") |
| 51 | + # dictionary of file name to track whether inf and bus acks have been received |
| 52 | + pending = {test.file_name: {"inf": True, "bus": True} for test in test_datas} |
| 53 | + |
| 54 | + start_time = time.time() |
| 55 | + # while there are still pending files, poll for acks and forwarded files |
| 56 | + while pending: |
| 57 | + for file_name in list(pending.keys()): |
| 58 | + test = pending[file_name] |
| 59 | + for key in ["inf", "bus"]: |
| 60 | + if test[key]: |
| 61 | + inf_key = poll_destination(file_name, check_ack=test.check_ack) |
| 62 | + if inf_key: |
| 63 | + test[key] = False |
| 64 | + for file_name in list(pending.keys()): |
| 65 | + test = pending[file_name] |
| 66 | + # if both inf and bus are False, remove from pending |
| 67 | + if not test["inf"] and not test["bus"]: |
| 68 | + del pending[file_name] |
| 69 | + |
| 70 | + # if max_timeout exceeded, break |
| 71 | + if (time.time() - start_time) > max_timeout: |
| 72 | + break |
| 73 | + |
| 74 | + if pending: |
| 75 | + time.sleep(5) |
| 76 | + |
| 77 | + # Now validate all files have been processed correctly |
| 78 | + for test in test_datas: |
| 79 | + # Validate the ACK file |
| 80 | + ack_content = get_file_content_from_s3(environment.ACK_BUCKET, test.file_name) |
| 81 | + fwd_content = get_file_content_from_s3(environment.FORWARDEDFILE_BUCKET, test.fwd_key) |
| 82 | + |
| 83 | + check_ack_file_content(ack_content, "OK", None, test.action) |
| 84 | + validate_row_count(test.file_name, test.key) |
| 85 | + # Validate the forwarded file |
| 86 | + validate_row_count(test.file_name, test.key) |
| 87 | + check_ack_file_content(fwd_content, "OK", None, test.action) |
0 commit comments