@@ -2456,33 +2456,38 @@ def core_ingestion_logic(session_):
24562456 import multiprocessing as mp
24572457 from io import BytesIO
24582458 import pandas as pd
2459+ import queue as queue_module
24592460
24602461 queue = mp .Queue ()
24612462
24622463 def worker_process (parquet_queue ):
2463-
2464- # Create a sample DataFrame
2465- data = {
2466- "id" : [1 , 2 , 3 , 4 , 5 ],
2467- "name" : ["Alice" , "Bob" , "Charlie" , "David" , "Eve" ],
2468- "value" : [100 , 200 , 300 , 400 , 500 ],
2469- }
2470- parquet_buffer = BytesIO ()
2471- df = pd .DataFrame (data )
2472- df .to_parquet (parquet_buffer )
2473- parquet_buffer .seek (0 )
2474- parquet_queue .put (parquet_buffer )
2464+ try :
2465+ # Create a sample DataFrame
2466+ data = {
2467+ "id" : [1 , 2 , 3 , 4 , 5 ],
2468+ "name" : ["Alice" , "Bob" , "Charlie" , "David" , "Eve" ],
2469+ "value" : [100 , 200 , 300 , 400 , 500 ],
2470+ }
2471+ parquet_buffer = BytesIO ()
2472+ df = pd .DataFrame (data )
2473+ df .to_parquet (parquet_buffer )
2474+ parquet_buffer .seek (0 )
2475+ parquet_queue .put (parquet_buffer )
2476+ except Exception as e :
2477+ parquet_queue .put (("error" , str (e )))
24752478
24762479 process = mp .Process (target = worker_process , args = (queue ,))
24772480 process .start ()
24782481
24792482 # Wait for the process to complete
24802483 process .join ()
2481- if process .exitcode != 0 or process .is_alive ():
2482- return "failure"
24832484
24842485 # Get the parquet buffer from the queue
2485- parquet_buffer = queue .get ()
2486+ try :
2487+ parquet_buffer = queue .get (timeout = 30 )
2488+ except queue_module .Empty :
2489+ return "failure"
2490+
24862491 parquet_buffer .seek (0 )
24872492
24882493 # Create a temporary stage
0 commit comments