Skip to content

Commit e19b3e0

Browse files
committed
async
1 parent 81a38d9 commit e19b3e0

File tree

1 file changed

+25
-7
lines changed

1 file changed

+25
-7
lines changed

e2e_batch/test_e2e_batch.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import unittest
2+
import time
23
from utils import (
34
upload_file_to_s3,
45
get_file_content_from_s3,
56
wait_for_ack_file,
67
check_ack_file_content,
78
validate_row_count,
8-
delete_file_from_s3,
99
generate_csv_files,
1010
SeedTestData
1111
)
@@ -60,11 +60,29 @@ def test_create_success(self):
6060
key = upload_file_to_s3(test.file_name, SOURCE_BUCKET, INPUT_PREFIX)
6161
test.key = key
6262

63-
for test in test_datas:
64-
ack_key = wait_for_ack_file(None, test.file_name, ACK_BUCKET, timeout=1200)
65-
self.ack_files.append(ack_key)
63+
process_acks_as_received(test_datas, ACK_BUCKET)
64+
65+
66+
def process_acks_as_received(test_datas, ack_bucket, poll_interval=2, timeout=1200):
67+
"""
68+
Polls for ACK files and processes them as soon as they are available.
69+
"""
70+
start_time = time.time()
71+
pending = {test.file_name: test for test in test_datas}
72+
processed = set()
6673

67-
validate_row_count(test.file_name, ack_key)
74+
while pending and (time.time() - start_time) < timeout:
75+
for file_name in list(pending.keys()):
76+
ack_key = wait_for_ack_file(None, file_name, ack_bucket, timeout=0)
77+
if ack_key:
78+
# Process the ACK immediately
79+
validate_row_count(file_name, ack_key)
80+
ack_content = get_file_content_from_s3(ack_bucket, ack_key)
81+
check_ack_file_content(ack_content, "OK", None, "CREATE")
82+
processed.add(file_name)
83+
del pending[file_name]
84+
if pending:
85+
time.sleep(poll_interval)
6886

69-
ack_content = get_file_content_from_s3(ACK_BUCKET, ack_key)
70-
check_ack_file_content(ack_content, "OK", None, "CREATE")
87+
if pending:
88+
raise TimeoutError(f"Timeout waiting for ACKs: {list(pending.keys())}")

0 commit comments

Comments
 (0)