Add non-cooperative jobs to new ThreadPool#6245
Conversation
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
|
CI MESSAGE: [45240584]: BUILD STARTED |
Greptile SummaryThis PR templatizes Key changes:
Confidence Score: 4/5
Important Files Changed
Class Diagram%%{init: {'theme': 'neutral'}}%%
classDiagram
class ThisThreadIdx {
+static int this_thread_idx()
#static thread_local int this_thread_idx_
}
class JobBaseFields_false {
<<empty specialization>>
}
class JobBaseFields_true {
#std::mutex mtx_
#std::condition_variable cv_
}
class JobBase_cooperative {
<<template bool cooperative>>
#DoWait()
#DoNotify()
#std::atomic_int num_pending_tasks_
#std::atomic_bool running_
#int total_tasks_
#bool wait_started_
#bool wait_completed_
#const void* executor_
}
class JobImpl_cooperative {
<<template bool cooperative>>
+AddTask(runnable, priority)
+Run(ThreadPoolBase, wait)
+Run(Executor, wait)
+Wait()
+Discard()
}
class IncrementalJobImpl_cooperative {
<<template bool cooperative>>
+AddTask(runnable)
+Run(ThreadPoolBase, wait)
+Run(Executor, wait)
+Wait()
+Discard()
}
class ThreadPoolBase {
+Init(num_threads, on_thread_start)
+AddTask(f)
+NumThreads()
+GetThreadIds()
+static this_thread_pool()
}
%% Aliases
class Job["Job = JobImpl<false>"]
class CooperativeJob["CooperativeJob = JobImpl<true>"]
class IncrementalJob["IncrementalJob = IncrementalJobImpl<false>"]
class CooperativeIncrementalJob["CooperativeIncrementalJob = IncrementalJobImpl<true>"]
JobBase_cooperative --|> JobBaseFields_false : false specialization
JobBase_cooperative --|> JobBaseFields_true : true specialization
JobImpl_cooperative --|> JobBase_cooperative
IncrementalJobImpl_cooperative --|> JobBase_cooperative
ThreadPoolBase --|> ThisThreadIdx
Job ..> JobImpl_cooperative : alias false
CooperativeJob ..> JobImpl_cooperative : alias true
IncrementalJob ..> IncrementalJobImpl_cooperative : alias false
CooperativeIncrementalJob ..> IncrementalJobImpl_cooperative : alias true
Last reviewed commit: 8155ff5 |
| * A cooperative job can be waited for from inside the thread pool the job is running in. While | ||
| * the calling thread executes `Wait` on the job, some scheduled task might be picked up from the | ||
| * thread pool and executed int the contex of the calling thread. | ||
| */ | ||
| using CooperativeJob = JobImpl<true>; | ||
|
|
||
| /** Cooperative incremental job. |
There was a problem hiding this comment.
Typo in docstring: "int the contex" should be "in the context".
| * A cooperative job can be waited for from inside the thread pool the job is running in. While | |
| * the calling thread executes `Wait` on the job, some scheduled task might be picked up from the | |
| * thread pool and executed int the contex of the calling thread. | |
| */ | |
| using CooperativeJob = JobImpl<true>; | |
| /** Cooperative incremental job. | |
| * A cooperative job can be waited for from inside the thread pool the job is running in. While | |
| * the calling thread executes `Wait` on the job, some scheduled task might be picked up from the | |
| * thread pool and executed in the context of the calling thread. |
Additional Comments (1)
A test should be added to verify that non-cooperative |
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
|
CI MESSAGE: [45243011]: BUILD STARTED |
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
|
@greptileai There are no existing usages, so there's no backward compatibility issue. |
|
Got it - if there are no existing usages of this interface yet, then backward compatibility isn't a concern here. Thanks for clarifying! |
|
CI MESSAGE: [45243643]: BUILD STARTED |
| }); | ||
|
|
||
| job.Run(tp, false); | ||
| EXPECT_THROW(job.Wait(), std::logic_error); |
There was a problem hiding this comment.
Test should verify exception message for robustness:
The non-cooperative reentrant test currently only checks the exception type:
EXPECT_THROW(job.Wait(), std::logic_error);It doesn't verify the actual error message. Adding a message check would make the test more robust against accidentally catching a different std::logic_error from unrelated code.
Recommendation: Use EXPECT_THAT with a matcher to verify the message:
EXPECT_THROW(
{
try {
job.Wait();
FAIL() << "Expected std::logic_error";
} catch (const std::logic_error &e) {
EXPECT_THAT(e.what(), testing::HasSubstr("Cannot run this job from inside the thread pool"));
throw;
}
},
std::logic_error
);Or wrap the Wait call and check the message separately.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
| /** Runs the job in the thread pool. | ||
| * | ||
| * More tasks can be added after this call, but they won't start until another call to Run. | ||
| */ | ||
| void Run(ThreadPoolBase &tp, bool wait); |
There was a problem hiding this comment.
IncrementalJobImpl::Run() docstring incompletely describes wait parameter behavior
The docstring says "More tasks can be added after this call, but they won't start until another call to Run." This is only true when wait = false. When wait = true, the method internally calls Wait(), which sets wait_started_ = true and blocks subsequent AddTask() calls.
Fix: Clarify the wait parameter's effect:
| /** Runs the job in the thread pool. | |
| * | |
| * More tasks can be added after this call, but they won't start until another call to Run. | |
| */ | |
| void Run(ThreadPoolBase &tp, bool wait); | |
| /** Runs the job in the thread pool. | |
| * | |
| * If `wait` is false, more tasks can be added after this call, but they won't start | |
| * until another call to Run. If `wait` is true, this call also waits for completion, | |
| * after which no more tasks can be added (equivalent to calling Wait()). | |
| */ | |
| void Run(ThreadPoolBase &tp, bool wait); |
| @@ -36,15 +38,19 @@ void JobBase::DoWait() { | |||
| return; | |||
| } | |||
|
|
|||
| if (executor_ == nullptr) | |||
| if (this->executor_ == nullptr) | |||
| throw std::logic_error("This job hasn't been run - cannot wait for it."); | |||
There was a problem hiding this comment.
wait_started_ poisoned when executor_ == nullptr
wait_started_ is set to true at line 34 before the executor_ == nullptr guard at line 41. If Wait() is called on a job that has tasks but has not yet been Run(), the exception "This job hasn't been run - cannot wait for it." is correctly thrown — but wait_started_ remains true while wait_completed_ remains false.
A subsequent call sequence like:
Job job;
job.AddTask([]() {});
try { job.Wait(); } catch (...) {} // forgot to Run first
job.Run(tp, false);
job.Wait(); // ← throws "already been waited for" — wrong!…silently fails to wait for the job, and then the destructor throws "The job is not empty, but hasn't been discarded or waited for." because wait_completed_ is never set. The error message seen by the caller is also misleading since the job was never successfully waited for.
Fix: Move all precondition checks (the executor_ == nullptr and non-cooperative reentrance checks) above the wait_started_ = true assignment, so the flag is only mutated when the wait is actually going to proceed.
Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
|
!build |
|
CI MESSAGE: [45262286]: BUILD STARTED |
| /** Returns the ids of the threads in the thread pool */ | ||
| auto GetThreadIds() const { | ||
| int n = threads_.size(); | ||
| std::vector<std::thread::id> ids(n); | ||
| for (int i = 0; i < n; i++) | ||
| ids[i] = threads_[i].get_id(); | ||
| return ids; | ||
| } |
There was a problem hiding this comment.
New public API GetThreadIds() has no test coverage
GetThreadIds() is a new public method added in this PR but is not exercised by any test in thread_pool_base_test.cc. Given that it reads threads_ without acquiring mtx_, it would also be useful to document the thread-safety contract (i.e. that it is safe to call after Init() returns and before Shutdown() begins).
Consider adding a simple test — e.g. verifying that GetThreadIds() returns exactly NumThreads() unique IDs after Init().
|
CI MESSAGE: [45243643]: BUILD FAILED |
|
CI MESSAGE: [45262286]: BUILD PASSED |
|
CI MESSAGE: [45243643]: BUILD PASSED |
| using CooperativeIncrementalJob = IncrementalJobImpl<true>; | ||
|
|
||
| class DLL_PUBLIC ThreadPoolBase { | ||
| class DLL_PUBLIC ThreadPoolBase : public ThisThreadIdx { |
There was a problem hiding this comment.
Nitpick:
| class DLL_PUBLIC ThreadPoolBase : public ThisThreadIdx { | |
| class DLL_PUBLIC ThreadPoolBase : private ThisThreadIdx { |
My understanding is that the thread index is only supposed to be accessed through ThisThreadIdx. Is that correct?
There was a problem hiding this comment.
Not really - in derived classes it can be used as a member, with an unqualified name. It also makes sense (to some extent) to use ThreadPoolType::this_thread_idx()
Category:
New feature (non-breaking change which adds functionality)
Refactoring (Redesign of existing code that doesn't affect functionality)
Description:
This PR adds more lightweight non-cooperative jobs to new thread pool.
Prior to this change, all jobs had a cooperative Wait option - this enabled Wait to be called from within the same thread pool as the one in which the job is running. To avoid deadlocks, the Wait would pick up tasks for execution until the Job is complete. This cannot be implemented (as of C++20) with
atomic_waitand required extra mutex/condvar. While the mutex would remain unused without cooperative wait, the notification of the condition as always necessary.This change adds a flavor of Job that doesn't allow cooperative wait and therefore can be implemented with just atomic_wait.
Additional information:
Affected modules and functionalities:
Key points relevant for the review:
Tests:
Checklist
Documentation
DALI team only
Requirements
REQ IDs: N/A
JIRA TASK: N/A