Skip to content

New ThreadPool + thread pool facade#6224

Open
mzient wants to merge 10 commits intoNVIDIA:mainfrom
mzient:ThreadPoolFacade
Open

New ThreadPool + thread pool facade#6224
mzient wants to merge 10 commits intoNVIDIA:mainfrom
mzient:ThreadPoolFacade

Conversation

@mzient
Copy link
Contributor

@mzient mzient commented Feb 23, 2026

Category:

Refactoring (Redesign of existing code that doesn't affect functionality)

Description:

This change does the following:

  • it extracts an interface from the old ThreadPool (now called ThreadPool)
  • it renames ThreadPool to OldThreadPool
  • it adds NewThreadPool and a ThreadPool Facade that aggregates a pointer to a NewThreadPool and Job object
  • it enables the use of either OldThreadPool or (via the facade) NewThreadPool in the new executor
  • it adds an environment variable DALI_USE_NEW_THREAD_POOL, which can be used to enable the new thread pool
  • it extends the qa tests with running python core tests with the new thread pool
  • it extends decoder perf test - there's a 2% margin for the new thread pool vs old in respective decoders (legacy and ndvimgcodec)

Additional information:

Affected modules and functionalities:

Key points relevant for the review:

Tests:

New qa tests script

  • Existing tests apply
  • New tests added
    • Python tests
    • GTests
    • Benchmark
    • Other
  • N/A

Checklist

Documentation

  • Existing documentation applies
  • Documentation updated
    • Docstring
    • Doxygen
    • RST
    • Jupyter
    • Other
  • N/A

DALI team only

Requirements

  • Implements new requirements
  • Affects existing requirements
  • N/A

REQ IDs: N/A

JIRA TASK: N/A

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44656169]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44667089]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44667089]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44656169]: BUILD PASSED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44698015]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44698471]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44717682]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44717682]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44719838]: BUILD STARTED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44698471]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44719838]: BUILD FAILED

@dali-automaton
Copy link
Collaborator

CI MESSAGE: [44719838]: BUILD PASSED

@mzient mzient force-pushed the ThreadPoolFacade branch 2 times, most recently from 2362e72 to 77c81e3 Compare March 5, 2026 08:56
mzient added 5 commits March 5, 2026 14:36
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
@mzient mzient force-pushed the ThreadPoolFacade branch from 77c81e3 to 193e754 Compare March 5, 2026 13:36
mzient and others added 4 commits March 5, 2026 15:47
----
Signed-off-by: Michał Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
@mzient mzient force-pushed the ThreadPoolFacade branch from 193e754 to e485702 Compare March 5, 2026 14:47
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
@mzient mzient changed the title Thread pool facade New ThreadPool + thread pool facade Mar 5, 2026
@dali-automaton
Copy link
Collaborator

CI MESSAGE: [45440403]: BUILD STARTED

@mzient mzient marked this pull request as ready for review March 5, 2026 16:34
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 5, 2026

Greptile Summary

This PR introduces a thread-pool abstraction layer: it extracts a ThreadPool interface from the old concrete class (now OldThreadPool), adds a NewThreadPool backed by ThreadPoolBase, and provides a ThreadPoolFacade adapter so either implementation can be plugged into the executor at runtime via DALI_USE_NEW_THREAD_POOL. The vast majority of the change is mechanical renaming of ThreadPoolOldThreadPool across benchmarks, operators, and tests.

Key issues found:

  • Critical: device_id_ is never assigned in the NewThreadPool constructor (new_thread_pool.cc lines 26–43). The parameter is modified in-place and used to gate NVML setup, but is never stored into the member field. As a result OnThreadStart always sees device_id_ == nullopt and never creates a DeviceGuard, so new-pool worker threads run without a bound CUDA device.
  • Debug output: A std::cerr diagnostic ("!!! Forced use of NewThreadPool !!!") was left in exec2.cc and will print to stderr for every pipeline instantiation when the env var is set.
  • Performance test script (GraceHopper branch): The else branch in qa/TL1_decoder_perf/test.sh contains copy-paste errors where LOG1 is overwritten by the new-tp run (losing the baseline) and LOG2 is never written while being used as a reference in two perf_check comparisons; LOG2_TP is written twice (once without new-tp, once with). This would make the GraceHopper performance gate silently evaluate against incorrect or empty data.
  • Test script cosmetic issues: PERF_RESULT1_TP/PERF_RESULT2_TP are computed twice (first assignments are dead code), echo labels are wrong (PERF_RESULT1= instead of PERF_RESULT1_TP=), and CLEAN_AND_EXIT does not remove the new log files.

Confidence Score: 2/5

  • Not safe to merge as-is due to a critical CUDA device-binding bug in NewThreadPool and broken GraceHopper performance gate logic.
  • The device_id_ omission in NewThreadPool means every GPU pipeline using the new thread pool will have worker threads without the correct CUDA device set, which will silently corrupt work or crash. The performance test script bugs mean the GraceHopper benchmark gate is not actually enforcing the stated 2% regression limit for the new pool. Both issues need to be fixed before this is safe to enable for any user.
  • dali/pipeline/util/new_thread_pool.cc (critical device_id bug) and qa/TL1_decoder_perf/test.sh (GraceHopper branch log-file errors)

Important Files Changed

Filename Overview
dali/pipeline/util/new_thread_pool.h New file introducing NewThreadPool (wrapping ThreadPoolBase) and ThreadPoolFacade (adapting it to the ThreadPool interface). Header looks structurally sound; the critical device_id_ initialization bug is in the corresponding .cc.
dali/pipeline/util/new_thread_pool.cc Critical bug: device_id_ member is never assigned from the constructor parameter, so DeviceGuard is never created and new-pool threads run without a bound CUDA device.
dali/pipeline/util/thread_pool_interface.h New abstract interface ThreadPool extracted from the old concrete class; cleanly defines AddWork, RunAll, WaitForWork, NumThreads, and GetThreadIds as pure virtuals.
dali/pipeline/util/thread_pool.h Old ThreadPool renamed to OldThreadPool and made to implement the new ThreadPool interface. Work typedef changed from void(int) to void(), with the thread-indexed variant renamed WorkWithThreadIdx. Migration looks correct.
dali/pipeline/util/thread_pool.cc Implementation updated to match the renamed class and split AddWork overloads. Adds this_thread_idx_ assignment in ThreadMain, and a public no-arg WaitForWork() delegating to the private WaitForWork(bool). Straightforward migration.
dali/pipeline/executor/executor2/exec2.cc Adds runtime switching between OldThreadPool and NewThreadPool+ThreadPoolFacade wrappers. Contains a leftover debug std::cerr statement that will spam users' stderr when the new pool is enabled.
qa/TL1_decoder_perf/test.sh Multiple bugs in the GraceHopper branch: LOG1 baseline is overwritten by new-tp run, LOG2 is never written but later used for comparisons, and LOG2_TP is written twice. Also: redundant variable assignments, wrong echo labels, and missing log cleanup in CLEAN_AND_EXIT.
qa/TL0_python-self-test-core-newtp/test.sh New QA test script that re-runs the standard core tests with DALI_USE_NEW_THREAD_POOL=1 by sourcing the existing test. Simple and correct.
dali/pipeline/util/thread_pool_test.cc Tests updated to use OldThreadPool directly; test names and expected thread-name strings updated consistently. No issues.
dali/python/backend_impl.cc Factory returns shared_ptr<OldThreadPool> which implicitly upcasts to shared_ptr<ThreadPool> (the pybind11-registered type). Change is safe.

Class Diagram

%%{init: {'theme': 'neutral'}}%%
classDiagram
    class ThisThreadIdx {
        +this_thread_idx() int
    }
    class ThreadPool {
        <<interface>>
        +AddWork(void() work, int64 priority)
        +AddWork(void(int) work, int64 priority)
        +RunAll(bool wait)
        +WaitForWork()
        +NumThreads() int
        +GetThreadIds() vector~thread_id~
    }
    class ThreadPoolBase {
        +Init(int n, on_start_fn)
        +NumThreads() int
        +GetThreadIds() vector~thread_id~
        +this_thread_idx() int
    }
    class OldThreadPool {
        -vector~thread~ threads_
        -priority_queue work_queue_
        +AddWork(void() work, int64 priority)
        +AddWork(void(int) work, int64 priority)
        +RunAll(bool wait)
        +WaitForWork()
        +NumThreads() int
        +GetThreadIds() vector~thread_id~
    }
    class NewThreadPool {
        -optional~int~ device_id_
        -string name_
        -NvmlInstance nvml_handle_
        +NewThreadPool(int n, optional~int~ device_id, bool affinity, string name)
        -OnThreadStart(int idx, bool affinity) any
    }
    class ThreadPoolFacade {
        -ThreadPoolBase* tp_
        -optional~Job~ job_
        +AddWork(void() work, int64 priority)
        +AddWork(void(int) work, int64 priority)
        +RunAll(bool wait)
        +WaitForWork()
        +NumThreads() int
        +GetThreadIds() vector~thread_id~
    }

    ThisThreadIdx <|-- ThreadPool
    ThreadPool <|-- OldThreadPool
    ThreadPool <|-- ThreadPoolFacade
    ThreadPoolBase <|-- NewThreadPool
    ThreadPoolFacade --> ThreadPoolBase : delegates to (tp_*)
Loading

Comments Outside Diff (1)

  1. General comment

    CLEAN_AND_EXIT does not remove LOG1_TP / LOG2_TP

    The two new log files are never deleted on exit, leaving temporary files behind after every run:

Last reviewed commit: 5799845

Comment on lines +26 to +43
NewThreadPool::NewThreadPool(
int num_threads,
std::optional<int> device_id,
bool set_affinity,
std::string name)
: name_(name) {
if (device_id.has_value() && *device_id == CPU_ONLY_DEVICE_ID)
device_id = std::nullopt;
#if NVML_ENABLED
// We use NVML only for setting thread affinity
if (device_id.has_value() && set_affinity) {
nvml_handle_ = nvml::NvmlInstance::CreateNvmlInstance();
}
#endif
Init(num_threads, [=, this](int thread_idx) {
return OnThreadStart(thread_idx, set_affinity);
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

device_id_ never assigned — CUDA device context silently lost

The constructor parameter device_id is modified (nullified for CPU_ONLY_DEVICE_ID) and used to conditionally initialize nvml_handle_, but it is never stored into the member device_id_. Because device_id_ defaults to std::nullopt, OnThreadStart at line 49 will never create the DeviceGuard, meaning threads spawned by NewThreadPool will never bind to the requested CUDA device.

NewThreadPool::NewThreadPool(
      int num_threads,
      std::optional<int> device_id,
      bool set_affinity,
      std::string name)
      : name_(name) {
  if (device_id.has_value() && *device_id == CPU_ONLY_DEVICE_ID)
    device_id = std::nullopt;
  device_id_ = device_id;   // ← add this line
#if NVML_ENABLED
  ...

Without this assignment, any GPU-bound pipeline using the new thread pool will run operator CPU threads without a current CUDA context, which will cause silent failures or hard crashes on any CUDA API call made from those threads.

Comment on lines +353 to +355
if (UseNewThreadPool()) {
std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl;
if (graph_info_.num_cpu > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug std::cerr left in production code

This diagnostic print was presumably added during development but should not be shipped:

Suggested change
if (UseNewThreadPool()) {
std::cerr << "\n!!! Forced use of NewThreadPool !!!" << std::endl;
if (graph_info_.num_cpu > 0) {
if (UseNewThreadPool()) {
if (graph_info_.num_cpu > 0) {

Remove the std::cerr line entirely — it will spam every user's stderr whenever they set DALI_USE_NEW_THREAD_POOL=1.

Comment on lines +44 to +46
taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2_TP}
DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 | tee ${LOG1}
DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py --width_hint 6000 --height_hint 6000 -b 408 -d 0 -g gpu -w 100 -t 100000 -i ${DALI_EXTRA_PATH}/db/single/jpeg -p rn50 -j 72 --hw_load 0.11 --experimental_decoder | tee ${LOG2_TP}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GraceHopper branch: log-file naming errors corrupt baseline data

In the else (GraceHopper) branch, three separate bugs are introduced together:

  1. Line 44 — the non-new-tp experimental_decoder run writes to ${LOG2_TP} instead of ${LOG2}.
  2. Line 45 — the new-tp legacy run writes to ${LOG1}, overwriting the baseline measurement captured on the line above it.
  3. Line 46 — the new-tp experimental_decoder run also writes to ${LOG2_TP}, overwriting the result from line 44.

End state: LOG1 holds new-tp data (not baseline), LOG2 is never written, and LOG2_TP holds new-tp data (not old-tp). Downstream, PERF_RESULT3 (perf_check "${LOG2}" …) and PERF_RESULT2_TP (perf_check "${LOG2_TP}" "$(extract_perf "${LOG2}")" 2) will both silently evaluate against stale/empty values, making the performance gate meaningless for GraceHopper.

The Hopper branch (lines 29–34) follows the correct pattern and should be mirrored:

    taskset --cpu-list 0-71 python hw_decoder_bench.py ... | tee ${LOG1}
    taskset --cpu-list 0-71 python hw_decoder_bench.py ... --experimental_decoder | tee ${LOG2}
    DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py ... | tee ${LOG1_TP}
    DALI_USE_NEW_THREAD_POOL=1 taskset --cpu-list 0-71 python hw_decoder_bench.py ... --experimental_decoder | tee ${LOG2_TP}

Comment on lines +87 to +88
PERF_RESULT1_TP=$(perf_check "${LOG1_TP}" "$MIN_PERF")
PERF_RESULT2_TP=$(perf_check "${LOG2_TP}" "$MIN_PERF2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant first assignments to PERF_RESULT1_TP / PERF_RESULT2_TP

PERF_RESULT1_TP and PERF_RESULT2_TP are assigned here (absolute-minimum check) and then unconditionally reassigned to relative-comparison results on lines 93–94. The values computed on lines 87–88 are never read and should be removed to avoid confusion.

Suggested change
PERF_RESULT1_TP=$(perf_check "${LOG1_TP}" "$MIN_PERF")
PERF_RESULT2_TP=$(perf_check "${LOG2_TP}" "$MIN_PERF2")
PERF_RESULT1_NDD=$(perf_check "${LOG1_NDD}" "$MIN_PERF_NDD")

Comment on lines +99 to +100
echo "PERF_RESULT1=${PERF_RESULT1_TP}"
echo "PERF_RESULT2=${PERF_RESULT2_TP}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong echo labels for new-thread-pool results

Both lines print PERF_RESULT1= and PERF_RESULT2= rather than PERF_RESULT1_TP= and PERF_RESULT2_TP=. The CI log will show duplicate keys, making it impossible to distinguish old- vs new-thread-pool results at a glance.

Suggested change
echo "PERF_RESULT1=${PERF_RESULT1_TP}"
echo "PERF_RESULT2=${PERF_RESULT2_TP}"
echo "PERF_RESULT1_TP=${PERF_RESULT1_TP}"
echo "PERF_RESULT2_TP=${PERF_RESULT2_TP}"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants