diff --git a/tests/integ/test_stored_procedure.py b/tests/integ/test_stored_procedure.py index 48c33dc17a..2bdd0f44fc 100644 --- a/tests/integ/test_stored_procedure.py +++ b/tests/integ/test_stored_procedure.py @@ -2456,33 +2456,38 @@ def core_ingestion_logic(session_): import multiprocessing as mp from io import BytesIO import pandas as pd + import queue as queue_module queue = mp.Queue() def worker_process(parquet_queue): - - # Create a sample DataFrame - data = { - "id": [1, 2, 3, 4, 5], - "name": ["Alice", "Bob", "Charlie", "David", "Eve"], - "value": [100, 200, 300, 400, 500], - } - parquet_buffer = BytesIO() - df = pd.DataFrame(data) - df.to_parquet(parquet_buffer) - parquet_buffer.seek(0) - parquet_queue.put(parquet_buffer) + try: + # Create a sample DataFrame + data = { + "id": [1, 2, 3, 4, 5], + "name": ["Alice", "Bob", "Charlie", "David", "Eve"], + "value": [100, 200, 300, 400, 500], + } + parquet_buffer = BytesIO() + df = pd.DataFrame(data) + df.to_parquet(parquet_buffer) + parquet_buffer.seek(0) + parquet_queue.put(parquet_buffer) + except Exception as e: + parquet_queue.put(("error", str(e))) process = mp.Process(target=worker_process, args=(queue,)) process.start() # Wait for the process to complete process.join() - if process.exitcode != 0 or process.is_alive(): - return "failure" # Get the parquet buffer from the queue - parquet_buffer = queue.get() + try: + parquet_buffer = queue.get(timeout=30) + except queue_module.Empty: + return "failure" + parquet_buffer.seek(0) # Create a temporary stage