Skip to content

Use NewThreadPool in dynamic mode. By default use only one instance of ThreadPool per device.#6254

Merged
mzient merged 13 commits intoNVIDIA:mainfrom
mzient:ndd_new_thread_pool
Mar 19, 2026
Merged

Use NewThreadPool in dynamic mode. By default use only one instance of ThreadPool per device.#6254
mzient merged 13 commits intoNVIDIA:mainfrom
mzient:ndd_new_thread_pool

Conversation

@mzient
Copy link
Contributor

@mzient mzient commented Mar 13, 2026

Category:

Refactoring (Redesign of existing code that doesn't affect functionality)

Description:

This change exposes Python bindings for NewThreadPool and ThreadPoolFacade (with owning pointer) and uses them in Dynamic Mode. The ThreadPool is no longer thread-local.

Also, since the new thread pool can be shared, it is now possible to give a thread pool instance to EvalContext contructor.

Additional information:

Affected modules and functionalities:

Key points relevant for the review:

Tests:

  • Existing tests apply
  • New tests added
    • Python tests
    • GTests
    • Benchmark
    • Other
  • N/A

Checklist

Documentation

  • Existing documentation applies
  • Documentation updated
    • Docstring
    • Doxygen
    • RST
    • Jupyter
    • Other
  • N/A

DALI team only

Requirements

  • Implements new requirements
  • Affects existing requirements
  • N/A

REQ IDs: N/A

JIRA TASK: DALI-4640

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 13, 2026

Greptile Summary

This PR exposes Python bindings for NewThreadPool and ThreadPoolFacade and migrates the Dynamic Mode away from a per-thread _ThreadPool (old implementation) to a shared, per-device _DefaultThreadPool backed by the new NewThreadPool C++ class. A new thread_pool constructor parameter on EvalContext allows callers to share a single pool across contexts. set_num_threads / get_num_threads are moved to the new _thread_pool module and re-exported from the dynamic namespace.

Key changes:

  • backend_impl.cc: Adds ExposeNewThreadPool (binding for NewThreadPool/ThreadPoolBase) and PyThreadPoolFacade with correct destructor ordering via a private ThreadPoolOwner base class, ensuring shared_tp_ outlives ThreadPoolFacade::~ThreadPoolFacade().
  • _thread_pool.py (new): Introduces ThreadPool (Python subclass of _NewThreadPool), _DefaultThreadPool (per-device lazy pool with double-checked locking), set_num_threads, and get_num_threads. Two issues remain: _CURRENT_DEVICE_ID = -1 clashes with DALI's CPU_ONLY_DEVICE_ID = -1 convention, potentially misdirecting users who pass -1 explicitly; and _DefaultThreadPool.get() only re-creates the pool when tp is None, not when tp exists but is stale (relies entirely on _set_num_threads to have nullified the pool first).
  • _eval_context.py: Removes the _tls thread-local storage and per-thread pool recreation logic; thread_pool is now a regular property delegating to _get_default_thread_pool when no explicit pool is set.
  • _ops.py: ctx.thread_pool._create_facade() creates a fresh PyThreadPoolFacade per operator invocation, giving each run its own job list while sharing the underlying thread pool.
  • Tests add coverage for shared pool construction, num_threads binding, and a concurrent-read/single-write stress test for _DefaultThreadPool.

Confidence Score: 3/5

  • PR is mostly safe but has two logic issues in the new _thread_pool.py that could surface subtle bugs: the _CURRENT_DEVICE_ID=-1 sentinel collision and the stale-pool path in get().
  • The C++ destructor ordering fix is sound, the validation ordering regression from previous review rounds has been reworked correctly, and the dead-code/inverted-guard issues from earlier rounds are resolved. The two remaining issues are in new Python code: (1) the -1 sentinel ambiguity can silently give users a GPU pool when they intended CPU-only, and (2) get() returns a stale pool in the narrow window between _global_num_threads being updated and _set_num_threads nullifying the pool — acceptable only because set_num_threads is documented as not thread-safe. No crashes or data corruption expected in typical usage, but API confusion around device_id=-1 is a real user-facing risk.
  • dali/python/nvidia/dali/experimental/dynamic/_thread_pool.py — _CURRENT_DEVICE_ID sentinel and get() stale-pool guard deserve attention before merge.

Important Files Changed

Filename Overview
dali/python/nvidia/dali/experimental/dynamic/_thread_pool.py New module introducing ThreadPool (Python wrapper over _NewThreadPool), _DefaultThreadPool (per-device lazy pool with double-checked locking), and set_num_threads/get_num_threads. Contains two logic issues: _CURRENT_DEVICE_ID=-1 clashes with DALI's CPU_ONLY_DEVICE_ID=-1 convention, and get() only re-enters the lock when tp is None — missing the stale-but-non-None pool case.
dali/python/nvidia/dali/experimental/dynamic/_eval_context.py Refactored to remove thread-local ThreadPool in favour of a shared ThreadPool passed at construction or resolved via _get_default_thread_pool; adds thread_pool constructor parameter. Previous issues (inverted guard, CPU_ONLY_DEVICE_ID attribute error, global state mutation before validation) appear to have been addressed. Minor docstring clarification needed for device_id=None semantics.
dali/python/backend_impl.cc Adds ExposeNewThreadPool (binding for NewThreadPool/ThreadPoolBase) and PyThreadPoolFacade. The destructor ordering issue from previous review is correctly fixed via private ThreadPoolOwner base class, ensuring shared_tp_ outlives ThreadPoolFacade::~ThreadPoolFacade(). _create_facade correctly returns shared_ptr for use with _Workspace.
dali/python/nvidia/dali/experimental/dynamic/_ops.py Simplified: dead else branch (num_threads=1 fallback) removed, workspace now created with ctx.thread_pool._create_facade() ensuring a fresh ThreadPoolFacade (and its job list) per operator invocation while reusing the underlying thread pool.
dali/test/python/experimental_mode/test_eval_context.py Good new tests for thread pool integration and thread-safety of the default pool under concurrent set_num_threads calls. The thread-safety test runs set_num_threads from the main thread while workers read the pool, which the docstring marks as UB — the test is validating best-effort safety rather than a guaranteed contract.
dali/test/python/experimental_mode/test_ndd_cpu_only.py Adds test_thread_pool for CPU-only ThreadPool construction, but skips when any GPU is present. The CPU-only + GPU-present combination (device_id=None on a GPU machine) has no direct test coverage.
dali/python/nvidia/dali/experimental/dynamic/init.py One-line addition exporting _thread_pool symbols (ThreadPool, set_num_threads, get_num_threads) from the dynamic namespace. Change is correct and complete.

Sequence Diagram

sequenceDiagram
    participant User
    participant EvalContext
    participant ThreadPool as ThreadPool (Python)
    participant DefaultTP as _DefaultThreadPool
    participant NewTP as _NewThreadPool (C++)
    participant Facade as PyThreadPoolFacade (C++)
    participant Workspace as _Workspace (C++)

    User->>EvalContext: EvalContext(thread_pool=tp)
    EvalContext->>EvalContext: validate tp.device_id == device_id
    EvalContext->>EvalContext: self._thread_pool = tp

    alt thread_pool is None (default pool)
        User->>EvalContext: ctx.thread_pool (property)
        EvalContext->>DefaultTP: _get_default_thread_pool(device_id)
        DefaultTP->>DefaultTP: get() — double-checked lock
        DefaultTP->>ThreadPool: ThreadPool(num_threads, device_id)
        ThreadPool->>NewTP: super().__init__(num_threads, device_id)
        NewTP-->>DefaultTP: shared_ptr[ThreadPoolBase]
        DefaultTP-->>EvalContext: ThreadPool instance
    end

    User->>EvalContext: operator.__call__(inputs)
    EvalContext->>ThreadPool: ctx.thread_pool._create_facade()
    ThreadPool->>Facade: PyThreadPoolFacade(shared_ptr[ThreadPoolBase])
    Facade-->>EvalContext: shared_ptr[ThreadPool] (facade)
    EvalContext->>Workspace: _Workspace(facade, cuda_stream)
    Workspace->>Workspace: SetupAndRun(workspace, batch_size)
    Workspace-->>EvalContext: outputs

    User->>EvalContext: set_num_threads(n)
    EvalContext->>DefaultTP: _set_num_threads(n) — nullify stale pools
    DefaultTP->>DefaultTP: self._thread_pool = None
Loading

Comments Outside Diff (2)

  1. dali/python/nvidia/dali/experimental/dynamic/_thread_pool.py, line 22-46 (link)

    _CURRENT_DEVICE_ID = -1 collides with DALI's CPU_ONLY_DEVICE_ID

    ThreadPool._CURRENT_DEVICE_ID = -1 is used as the sentinel meaning "use the current CUDA device". However, DALI conventionally uses CPU_ONLY_DEVICE_ID = -1 (as seen throughout the codebase and in _ops.py / _eval_context.py). A user familiar with DALI who explicitly passes device_id=-1 expecting CPU-only behaviour (consistent with CPU_ONLY_DEVICE_ID) will instead get the current GPU device:

    # User intends CPU-only (matching DALI's CPU_ONLY_DEVICE_ID = -1 convention)
    tp = ndd.ThreadPool(4, device_id=-1)
    # Actual result: device_id is mapped to Device.current().device_id (e.g. GPU 0)
    assert tp.device_id == 0   # not None

    The correct way to request CPU-only is device_id=None, but nothing prevents the -1 confusion since the docstring only documents None, not the internal sentinel.

    Consider using a private sentinel object instead of -1 to avoid the collision:

    class _CurrentDevice:
        pass
    
    _CURRENT_DEVICE_SENTINEL = _CurrentDevice()
    
    def __init__(self, num_threads=None, *, device_id=_CURRENT_DEVICE_SENTINEL):
        if device_id is ThreadPool._CURRENT_DEVICE_SENTINEL:
            device_id = _device.Device.current().device_id
        ...

    This makes the sentinel unambiguous and eliminates any clash with integer device IDs.

  2. dali/python/nvidia/dali/experimental/dynamic/_thread_pool.py, line 85-93 (link)

    get() skips stale-pool check when pool is non-None

    The outer guard if tp is None: means that when a non-None pool exists but its num_threads no longer matches get_num_threads() (i.e., set_num_threads updated _global_num_threads but has not yet reached the _set_num_threads call in the loop below it), get() returns the stale pool without entering the lock.

    # _global_num_threads just set to 42, but _set_num_threads not yet called
    tp = self._thread_pool   # non-None, but num_threads=4
    if tp is None:           # False → skips recreation
        ...
    return tp                # returns pool with 4 threads while get_num_threads() == 42

    In practice this window is very small and set_num_threads is documented as not thread-safe, but a defensive recreation check for the non-None case would close the gap entirely:

    def get(self):
        tp = self._thread_pool
        num_threads = get_num_threads()
        if tp is None or tp.num_threads != num_threads:
            with self._mutex:
                tp = self._thread_pool
                num_threads = get_num_threads()
                if tp is None or tp.num_threads != num_threads:
                    self._thread_pool = ThreadPool(num_threads, device_id=self._device_id)
                tp = self._thread_pool
        return tp

    This mirrors the existing inner lock check and makes get() self-consistent regardless of when _set_num_threads runs.

Last reviewed commit: "Improve test. Lint."

@mzient mzient force-pushed the ndd_new_thread_pool branch from be9f2df to 9d223c1 Compare March 13, 2026 17:06
@mzient mzient force-pushed the ndd_new_thread_pool branch from 90c4b1d to 1665da1 Compare March 17, 2026 08:48
@rostan-t rostan-t self-assigned this Mar 17, 2026
@stiepan stiepan removed their assignment Mar 17, 2026

mod = sys.modules[__name__]

if nenv := os.environ.get("DALI_NUM_THREADS", None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not new (just moved here). We should documented but it seems a bit out of scope.

_global_default_thread_pool = {}


class _DefaultThreadPool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is a bit confusing as it's not really a thread pool but a wrapper on a thread pool

@mzient mzient force-pushed the ndd_new_thread_pool branch from 4cb8eb7 to 74e2353 Compare March 17, 2026 13:15
@mzient
Copy link
Contributor Author

mzient commented Mar 17, 2026

@greptileai the value of CPU_ONLY_DEVICE_ID is -99999, so there's no collision with _CURRENT_DEVICE_ID at -1.
Regarding the supposed race condition - if the user writes code that gets the default thread pool at the same time as another thread sets set_num_thread, it's already a race condition - and the behavior is better than alternatives that would require excessive locking.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 17, 2026

You're absolutely right on both points:

  1. Device ID values: Confirmed - CPU_ONLY_DEVICE_ID = -99999 (from include/dali/core/common.h:36) vs _CURRENT_DEVICE_ID = -1, so there's no collision.

  2. Race condition: Fair point. Since set_num_threads is explicitly documented as not thread-safe with a warning that concurrent calls result in undefined behavior, the current implementation is reasonable. Adding locks throughout would add overhead to protect against a scenario that's already documented as UB. The race you describe (getting pool while another thread changes thread count) falls into that "user shouldn't do this" category.

The design trade-off makes sense - optimizing for the correct-use case rather than adding defensive overhead for documented misuse.

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [46421321]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [46421321]: BUILD FAILED

…f ThreadPool per device.

Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
mzient and others added 12 commits March 19, 2026 10:40
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
Add thread-safety test for set_num_threads and default thread pool.

Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
@mzient mzient force-pushed the ndd_new_thread_pool branch from 74e2353 to 522782e Compare March 19, 2026 09:41
@dali-automaton
Copy link
Collaborator

CI MESSAGE: [46508447]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [46508447]: BUILD PASSED

@mzient
Copy link
Contributor Author

mzient commented Mar 19, 2026

@greptileai CPU_ONLY_DEVICE_ID is -99999, there no collision with _CURRENT_DEVICE_ID.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 19, 2026

Got it, thanks for the clarification. With CPU_ONLY_DEVICE_ID set to -99999, there's indeed no risk of collision with _CURRENT_DEVICE_ID. That resolves my concern.

@mzient mzient merged commit 96915c3 into NVIDIA:main Mar 19, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants