Skip to content

Commit 2ae79a3

Browse files
authored
Improve ThreadPool and Worker for python 3.14t (#1554)
Signed-off-by: Bala.FA <bala@minio.io>
1 parent b11622f commit 2ae79a3

2 files changed

Lines changed: 51 additions & 46 deletions

File tree

.github/workflows/ci.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ jobs:
3030
runs-on: ${{ matrix.os }}
3131
strategy:
3232
matrix:
33-
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
33+
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14", "3.14t"]
3434
os: [ubuntu-latest, windows-latest, macos-latest]
3535

3636
steps:
37-
- uses: actions/checkout@v5
37+
- uses: actions/checkout@v6
3838
- name: Run spell check on Ubuntu
3939
if: matrix.os == 'ubuntu-latest'
4040
uses: codespell-project/actions-codespell@master
@@ -64,6 +64,8 @@ jobs:
6464
pip install -e .
6565
pytest
6666
- name: Run functional tests on Ubuntu
67+
env:
68+
PYTHON_GIL: ${{ (matrix.python-version == '3.14t') && '0' || '' }}
6769
if: matrix.os == 'ubuntu-latest'
6870
run: |
6971
wget --quiet -O /tmp/minio https://dl.min.io/server/minio/release/linux-amd64/minio

minio/helpers.py

Lines changed: 47 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import re
2323
import urllib.parse
2424
from queue import Queue
25-
from threading import BoundedSemaphore, Lock, Thread
26-
from typing import Mapping, Optional
25+
from threading import BoundedSemaphore, Event, Lock, Thread
26+
from typing import Any, Callable, Mapping, Optional
2727

2828
from . import __title__, __version__
2929
from .compat import HTTPHeaderDict, HTTPQueryDict, quote
@@ -566,86 +566,89 @@ def build(
566566

567567

568568
class Worker(Thread):
569-
""" Thread executing tasks from a given tasks queue """
569+
"""Thread executing tasks from a given tasks queue"""
570570

571571
def __init__(
572-
self,
573-
tasks_queue: Queue,
574-
results_queue: Queue,
575-
exceptions_queue: Queue,
572+
self,
573+
tasks_queue: Queue,
574+
results_queue: Queue,
575+
exceptions_queue: Queue,
576+
abort_event: Event,
576577
):
577-
Thread.__init__(self, daemon=True)
578+
super().__init__(daemon=True)
578579
self._tasks_queue = tasks_queue
579580
self._results_queue = results_queue
580581
self._exceptions_queue = exceptions_queue
582+
self._abort_event = abort_event
581583
self.start()
582584

583-
def run(self):
584-
""" Continuously receive tasks and execute them """
585+
def run(self) -> None:
586+
"""Continuously receive tasks and execute them"""
585587
while True:
586588
task = self._tasks_queue.get()
587-
if not task:
589+
590+
# Poison pill to stop the thread
591+
if task is None:
588592
self._tasks_queue.task_done()
589593
break
590-
func, args, kargs, cleanup_func = task
591-
# No exception detected in any thread,
592-
# continue the execution.
593-
if self._exceptions_queue.empty():
594+
595+
func, args, kwargs, cleanup_func = task
596+
597+
# 3.14t Optimization: Use an Event check instead of Queue.empty().
598+
# This is a thread-safe way to stop processing if another thread
599+
# failed.
600+
if not self._abort_event.is_set():
594601
try:
595-
result = func(*args, **kargs)
602+
result = func(*args, **kwargs)
596603
self._results_queue.put(result)
597604
except Exception as ex: # pylint: disable=broad-except
605+
# Signal all threads to stop executing new tasks
606+
self._abort_event.set()
598607
self._exceptions_queue.put(ex)
599608

600-
# call cleanup i.e. Semaphore.release irrespective of task
601-
# execution to avoid race condition.
609+
# Always cleanup (release semaphore) and mark task done
602610
cleanup_func()
603-
# Mark this task as done, whether an exception happened or not
604611
self._tasks_queue.task_done()
605612

606613

607614
class ThreadPool:
608-
""" Pool of threads consuming tasks from a queue """
609-
_results_queue: Queue
610-
_exceptions_queue: Queue
611-
_tasks_queue: Queue
612-
_sem: BoundedSemaphore
613-
_num_threads: int
615+
"""Pool of threads consuming tasks from a queue"""
614616

615617
def __init__(self, num_threads: int):
616-
self._results_queue = Queue()
617-
self._exceptions_queue = Queue()
618-
self._tasks_queue = Queue()
618+
self._results_queue: Queue[Any] = Queue()
619+
self._exceptions_queue: Queue[Exception] = Queue()
620+
self._tasks_queue: Queue[tuple | None] = Queue()
619621
self._sem = BoundedSemaphore(num_threads)
622+
self._abort_event = Event()
620623
self._num_threads = num_threads
621624

622-
def add_task(self, func, *args, **kargs):
623-
"""
624-
Add a task to the queue. Calling this function can block
625-
until workers have a room for processing new tasks. Blocking
626-
the caller also prevents the latter from allocating a lot of
627-
memory while workers are still busy running their assigned tasks.
628-
"""
625+
def add_task(self, func: Callable, *args: Any, **kwargs: Any) -> None:
626+
"""Add a task to the queue. Blocks if the pool is full"""
629627
self._sem.acquire() # pylint: disable=consider-using-with
630628
cleanup_func = self._sem.release
631-
self._tasks_queue.put((func, args, kargs, cleanup_func))
629+
self._tasks_queue.put((func, args, kwargs, cleanup_func))
632630

633-
def start_parallel(self):
634-
""" Prepare threads to run tasks"""
631+
def start_parallel(self) -> None:
632+
"""Prepare threads to run tasks"""
635633
for _ in range(self._num_threads):
636634
Worker(
637-
self._tasks_queue, self._results_queue, self._exceptions_queue,
635+
self._tasks_queue,
636+
self._results_queue,
637+
self._exceptions_queue,
638+
self._abort_event
638639
)
639640

640641
def result(self) -> Queue:
641-
""" Stop threads and return the result of all called tasks """
642-
# Send None to all threads to cleanly stop them
642+
"""Stop threads and return the results"""
643+
# 1. Send "Poison Pill" to all threads
643644
for _ in range(self._num_threads):
644645
self._tasks_queue.put(None)
645-
# Wait for completion of all the tasks in the queue
646+
647+
# 2. Wait for completion
646648
self._tasks_queue.join()
647-
# Check if one of the thread raised an exception, if yes
648-
# raise it here in the function
649+
650+
# 3. Check for exceptions collected during execution
649651
if not self._exceptions_queue.empty():
650652
raise self._exceptions_queue.get()
653+
651654
return self._results_queue

0 commit comments

Comments
 (0)