|
14 | 14 | from constants import ( |
15 | 15 | SOURCE_BUCKET, |
16 | 16 | INPUT_PREFIX, |
| 17 | + ACK_BUCKET, |
17 | 18 | environment |
18 | 19 | ) |
19 | 20 |
|
@@ -49,40 +50,29 @@ def test_create_success(self): |
49 | 50 | test.key = key |
50 | 51 |
|
51 | 52 | # dictionary of file name to track whether inf and bus acks have been received |
52 | | - pending = {test.file_name: {DestinationType.INF: True, DestinationType.BUS: True} for test in test_datas} |
53 | | - |
54 | 53 | start_time = time.time() |
55 | 54 | # while there are still pending files, poll for acks and forwarded files |
56 | | - while pending: |
57 | | - for file_name in list(pending.keys()): |
58 | | - test = pending[file_name] |
| 55 | + pending = True |
| 56 | + while pending and (time.time() - start_time) < max_timeout: |
| 57 | + pending = False |
| 58 | + for test_data in test_datas: |
59 | 59 | # loop through keys in test (inf and bus) |
60 | | - for key in test.keys(): |
61 | | - if test[key]: |
62 | | - is_pending = poll_destination(file_name, key) |
63 | | - if is_pending: |
64 | | - test[key] = False |
65 | | - for file_name in list(pending.keys()): |
66 | | - test = pending[file_name] |
67 | | - # if both inf and bus are False, remove from pending |
68 | | - if not test[DestinationType.INF] and not test[DestinationType.BUS]: |
69 | | - del pending[file_name] |
70 | | - |
71 | | - # if max_timeout exceeded, break |
72 | | - if (time.time() - start_time) > max_timeout: |
73 | | - break |
74 | | - |
| 60 | + for ack_key in test_data.ack_keys.keys(): |
| 61 | + if not test_data.ack_keys[ack_key]: |
| 62 | + found_ack_key = poll_destination(test_data.file_name, ack_key) |
| 63 | + if found_ack_key: |
| 64 | + test_data.ack_keys[ack_key] = found_ack_key |
| 65 | + else: |
| 66 | + pending = True |
75 | 67 | if pending: |
76 | 68 | time.sleep(1) |
77 | 69 |
|
78 | 70 | # Now validate all files have been processed correctly |
79 | | - for test in test_datas: |
| 71 | + for test_data in test_datas: |
80 | 72 | # Validate the ACK file |
81 | | - ack_content = get_file_content_from_s3(environment.ACK_BUCKET, test.file_name) |
82 | | - fwd_content = get_file_content_from_s3(environment.FORWARDEDFILE_BUCKET, test.fwd_key) |
| 73 | + inf_ack_content = get_file_content_from_s3(ACK_BUCKET, test_data.ack_keys[DestinationType.INF]) |
| 74 | + bus_ack_content = get_file_content_from_s3(ACK_BUCKET, test_data.ack_keys[DestinationType.BUS]) |
83 | 75 |
|
84 | | - check_ack_file_content(ack_content, "OK", None, test.action) |
85 | | - validate_row_count(test.file_name, test.key) |
86 | | - # Validate the forwarded file |
87 | | - validate_row_count(test.file_name, test.key) |
88 | | - check_ack_file_content(fwd_content, "OK", None, test.action) |
| 76 | + check_ack_file_content(inf_ack_content, "Success", None, test_data.actions) |
| 77 | + validate_row_count(test_data.file_name, test_data.ack_keys[DestinationType.BUS]) |
| 78 | + # how to validate bus ack content? |
0 commit comments