@@ -1323,19 +1323,31 @@ def test_case_sensitive_copy_into(session):
13231323)
13241324def test_threading_error_handling_with_stop_event (session ):
13251325 """Test that when one thread fails, it properly sets stop event and cancels other threads."""
1326+
1327+ # Helper function to drain a queue into a list
1328+ def drain_queue (q ):
1329+ """Convert a queue to a list by draining all items."""
1330+ items = []
1331+ while not q .empty ():
1332+ try :
1333+ items .append (q .get_nowait ())
1334+ except queue .Empty :
1335+ break
1336+ return items
1337+
13261338 with tempfile .TemporaryDirectory () as temp_dir :
13271339 dbpath = os .path .join (temp_dir , "testsqlite3.db" )
13281340 table_name , _ , _ , _ = sqlite3_db (dbpath )
13291341
13301342 # Track which partitions have been processed to simulate error on specific partition
1331- processed_partitions = set ()
1343+ processed_partitions = queue . Queue ()
13321344 original_task_fetch = _task_fetch_data_from_source_with_retry
13331345
13341346 def mock_task_fetch_with_selective_error (
13351347 worker , partition , partition_idx , parquet_queue , stop_event = None
13361348 ):
13371349 assert stop_event
1338- processed_partitions .add (partition_idx )
1350+ processed_partitions .put (partition_idx )
13391351
13401352 # Simulate error on the second partition (partition_idx=1)
13411353 if partition_idx == 1 :
@@ -1374,8 +1386,8 @@ def mock_task_fetch_with_selective_error(
13741386 )
13751387
13761388 # Track futures and their cancel calls to verify cancellation behavior
1377- created_futures = []
1378- cancelled_futures = []
1389+ created_futures = queue . Queue ()
1390+ cancelled_futures = queue . Queue ()
13791391 original_submit = ThreadPoolExecutor .submit
13801392
13811393 def track_submit (self , fn , * args , ** kwargs ):
@@ -1385,12 +1397,12 @@ def track_submit(self, fn, *args, **kwargs):
13851397 original_cancel = future .cancel
13861398
13871399 def track_cancel ():
1388- cancelled_futures .append (future )
1400+ cancelled_futures .put (future )
13891401 return original_cancel ()
13901402
13911403 future .cancel = track_cancel
13921404
1393- created_futures .append (future )
1405+ created_futures .put (future )
13941406 return future
13951407
13961408 with mock .patch (
@@ -1413,37 +1425,27 @@ def track_cancel():
14131425 custom_schema = SQLITE3_DB_CUSTOM_SCHEMA_STRING ,
14141426 fetch_with_process = False , # Use threading mode
14151427 )
1416- # Verify that futures were created (confirming threading mode was used)
1417- assert len (created_futures ) == 3 , "Should have created 3 thread futures"
1418-
14191428 # Give threads a moment to be cancelled/complete
14201429 import time
14211430
14221431 time .sleep (0.2 )
14231432
1424- # Verify that at least one partition was processed before error
1425- assert (
1426- len ( processed_partitions ) >= 1
1427- ), "At least one partition should have been processed"
1433+ # Convert queues to lists for assertion checks using helper function
1434+ processed_list = drain_queue ( processed_partitions )
1435+ cancelled_list = drain_queue ( cancelled_futures )
1436+ created_list = drain_queue ( created_futures )
14281437
1429- # Verify that partition 1 was the one that caused the error
1438+ # Verify threading behavior: 3 futures created, partition 1 failed, some (but not all) futures cancelled
14301439 assert (
1431- 1 in processed_partitions
1432- ), "Partition 1 should have been processed and caused error"
1433-
1434- # Verify that cancel() was called on some futures
1435- # Due to timing, at least some futures should have been cancelled
1436- assert (
1437- len (cancelled_futures ) > 0
1438- ), f"Expected some futures to be cancelled, but got { len (cancelled_futures )} "
1439-
1440- # Verify that not all futures were cancelled (the one that errored should complete, not be cancelled)
1441- assert len (cancelled_futures ) < len (
1442- created_futures
1443- ), "Not all futures should be cancelled"
1440+ len (created_list ) == 3
1441+ and len (processed_list ) >= 1
1442+ and 1 in processed_list
1443+ and len (cancelled_list ) > 0
1444+ and len (cancelled_list ) < 3
1445+ ), f"Threading verification failed: created={ len (created_list )} , processed={ processed_list } , cancelled={ len (cancelled_list )} "
14441446
14451447 # Additional verification: check that cancelled futures were indeed not done when cancelled
1446- for future in cancelled_futures :
1448+ for future in cancelled_list :
14471449 # The future should either be cancelled or done by now
14481450 assert (
14491451 future .cancelled () or future .done ()
0 commit comments