From 43c49afc54161cf2074632d7b76998f77aee4223 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 14 May 2026 12:45:04 +0000 Subject: [PATCH 01/10] Fix the flaky immediate shutdown. The test was trying to ensure that when shutdown is called, it happens "immediately", and not wait until the timeout. The problem in our current code is that we don't know whether another thread is in "spin" during the shutdown. If it is, then there is some possibility that we deleted the shutdown guard condition *before* the spin built the waitset, in which case we would spin until the timeout, and thus fail the test. The fix is to mark when a thread is in spin, and then have shutdown wait for the spin to complete before destroying resources. If spin is waiting, then it will be woken by the guard condition immediately, and the shutdown will proceed quickly. If the spin is stuck in a long-running user callback, then we still need a timeout because we don't want shutdown to take forever. While I was in here, I also enabled this test for the MultiThreadedExecutor since it should succeed for that now as well. Signed-off-by: Chris Lalancette --- rclpy/rclpy/executors.py | 35 +++++++++++++++++++++++------------ rclpy/test/test_executor.py | 3 +-- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index d03d770d6..5501cefa7 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -19,9 +19,7 @@ from functools import partial import inspect import os -from threading import Condition -from threading import Lock -from threading import RLock +import threading import time from types import TracebackType from typing import Any @@ -89,7 +87,7 @@ class _WorkTracker: def __init__(self) -> None: # Number of tasks that are being executed self._num_work_executing = 0 - self._work_condition = Condition() + self._work_condition = threading.Condition() def __enter__(self) -> None: """Increment the amount of executing work by 1.""" @@ -212,12 +210,12 @@ def __init__(self, *, context: Optional[Context] = None) -> None: super().__init__() self._context = get_default_context() if context is None else context self._nodes: Set[Node] = set() - self._nodes_lock = RLock() + self._nodes_lock = threading.RLock() # all tasks that are not complete or canceled self._pending_tasks: Dict[Task[Any], TaskData] = {} # tasks that are ready to execute self._ready_tasks: Deque[Task[Any]] = deque() - self._tasks_lock = Lock() + self._tasks_lock = threading.Lock() # This is triggered when wait_for_ready_callbacks should rebuild the wait list self._guard: Optional[GuardCondition] = GuardCondition( callback=None, callback_group=None, context=self._context) @@ -225,7 +223,7 @@ def __init__(self, *, context: Optional[Context] = None) -> None: self._is_shutdown = False self._work_tracker = _WorkTracker() # Protect against shutdown() being called in parallel in two threads - self._shutdown_lock = Lock() + self._shutdown_lock = threading.Lock() # State for wait_for_ready_callbacks to reuse generator self._cb_iter: Optional[YieldedCallback] = None self._last_args: Optional[tuple[object, ...]] = None @@ -238,24 +236,28 @@ def __init__(self, *, context: Optional[Context] = None) -> None: # True when the executor is spinning self._is_spinning = False # Protects access to _is_spinning - self._is_spinning_lock = Lock() + self._is_spinning_cond = threading.Condition() + self._spinning_thread: Optional[threading.Thread] = None def _enter_spin(self) -> None: """Mark the executor as spinning and prevent concurrent spins.""" - with self._is_spinning_lock: + with self._is_spinning_cond: if self._is_spinning: raise RuntimeError('Executor is already spinning') self._is_spinning = True + self._spinning_thread = threading.current_thread() def _exit_spin(self) -> None: """Clear the spinning flag.""" - with self._is_spinning_lock: + with self._is_spinning_cond: self._is_spinning = False + self._spinning_thread = None + self._is_spinning_cond.notify_all() @property def is_spinning(self) -> bool: """Return whether the executor is currently spinning.""" - with self._is_spinning_lock: + with self._is_spinning_cond: return self._is_spinning @property @@ -328,6 +330,15 @@ def shutdown(self, timeout_sec: Optional[float] = None) -> bool: with self._nodes_lock: self._nodes = set() + with self._is_spinning_cond: + if self._spinning_thread is not threading.current_thread(): + # Cap the wait to avoid hanging the process if a spinner + # is stuck in a callback that never returns. + wait_timeout = timeout_sec if ( + timeout_sec is not None and timeout_sec > 0) else 5.0 + self._is_spinning_cond.wait_for( + lambda: not self._is_spinning, timeout=wait_timeout) + with self._shutdown_lock: if self._guard: self._guard.destroy() @@ -1087,7 +1098,7 @@ def __init__( 'Use the SingleThreadedExecutor instead.') self._futures: List[Future[Any]] = [] self._executor = ThreadPoolExecutor(num_threads) - self._futures_lock = Lock() + self._futures_lock = threading.Lock() def _spin_once_impl( self, diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 18472109d..54a3313b3 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -80,10 +80,9 @@ def test_single_threaded_executor_executes(self) -> None: finally: executor.shutdown() - @unittest.skip('Flaky on CI - see issue #1648') def test_executor_immediate_shutdown(self) -> None: self.assertIsNotNone(self.node.handle) - for cls in [SingleThreadedExecutor, EventsExecutor]: + for cls in [SingleThreadedExecutor, MultiThreadedExecutor, EventsExecutor]: with self.subTest(cls=cls): executor = cls(context=self.context) try: From 6ca0ddb66d4c57f0a60abc3a4898cc39101a262a Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 14 May 2026 15:41:14 +0000 Subject: [PATCH 02/10] Fix some dead code in executor shutdown. Previous to this commit, the shutdown had code like this: with self._shutdown_lock: if not self._is_shutdown: self._is_shutdown = True ... if not self._is_shutdown: if not self._work_tracker.wait(timeout_sec): return False But that doesn't make much sense; that second "if not self._is_shutdown" is always False, since we set self._is_shutdown to True right above. The easy fix here is to save off whether we were the ones doing the shutdown, and then only do the wait if that is true. But that then deadlocks if this shutdown is being called from within a callback. Since that is behavior we want to preserve, we need more fixes. The fixes in this PR track when a thread enters and exits doing work. We then modify "wait" to not wait for the thread that is ourselves if we are in a callback. We go a bit further and remove the second "if not self._is_shutdown" check as well, and unconditionally wait on work to be completed. This is to support the case where the user calls shutdown with a timeout, and the timeout expires before the work finishes. In that case, the user could call shutdown again and have it succeed. The result is that all of this should now work together, not deadlock, and actually do the waiting. Signed-off-by: Chris Lalancette --- rclpy/rclpy/executors.py | 40 ++++++++++++++++++++++---- rclpy/test/test_executor.py | 56 +++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 5501cefa7..1e2a511cf 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -88,33 +88,51 @@ def __init__(self) -> None: # Number of tasks that are being executed self._num_work_executing = 0 self._work_condition = threading.Condition() + # Per-thread reentrant counter of in-flight work, so callers can + # tell whether the calling thread is itself currently inside + # __enter__/__exit__ — used by Executor.shutdown() to avoid + # self-deadlock when shutdown is invoked from inside a callback. + self._executing_thread_counts: Dict[threading.Thread, int] = {} def __enter__(self) -> None: """Increment the amount of executing work by 1.""" with self._work_condition: self._num_work_executing += 1 + t = threading.current_thread() + self._executing_thread_counts[t] = ( + self._executing_thread_counts.get(t, 0) + 1) def __exit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exctb: Optional[TracebackType]) -> None: """Decrement the amount of work executing by 1.""" with self._work_condition: self._num_work_executing -= 1 + t = threading.current_thread() + count = self._executing_thread_counts[t] - 1 + if count == 0: + del self._executing_thread_counts[t] + else: + self._executing_thread_counts[t] = count self._work_condition.notify_all() def wait(self, timeout_sec: Optional[float] = None) -> bool: """ Wait until all work completes. + Work being executed by the calling thread is excluded from the count, + since that work is necessarily blocked on this call returning. + :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 :type timeout_sec: float or None :rtype: bool True if all work completed """ if timeout_sec is not None and timeout_sec < 0.0: timeout_sec = None - # Wait for all work to complete with self._work_condition: if not self._work_condition.wait_for( - lambda: self._num_work_executing == 0, timeout_sec): + lambda: self._num_work_executing == self._executing_thread_counts.get( + threading.current_thread(), 0), + timeout_sec): return False return True @@ -317,14 +335,24 @@ def shutdown(self, timeout_sec: Optional[float] = None) -> bool: timeout expires before all outstanding work is done. """ with self._shutdown_lock: - if not self._is_shutdown: + initiated_shutdown = not self._is_shutdown + if initiated_shutdown: self._is_shutdown = True # Tell executor it's been shut down if self._guard: self._guard.trigger() - if not self._is_shutdown: - if not self._work_tracker.wait(timeout_sec): - return False + # Wait for any in-flight callbacks on OTHER threads to drain. Done + # unconditionally (not just for the initiating call) so that: + # - concurrent shutdown() calls don't race past the wait and start + # destroying state while callbacks are still running, and + # - a caller who got False back from a timed-out shutdown() can + # simply call shutdown() again (with a longer or no timeout) and + # have the second call actually wait + finish cleanup. + # _work_tracker.wait excludes work being executed by the calling + # thread, so this is safe from inside a callback — it will not + # self-deadlock. + if not self._work_tracker.wait(timeout_sec): + return False # Clean up stuff that won't be used anymore with self._nodes_lock: diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 54a3313b3..4e7a83731 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -787,6 +787,62 @@ def timer_callback() -> None: self.assertTrue(shutdown_event.wait(120)) self.node.destroy_timer(tmr) + def test_shutdown_timeout_then_retry(self) -> None: + """ + Verify that every shutdown() call waits for callbacks to drain. + + A shutdown() call that times out while a callback is in flight + must leave the executor in a state where calling shutdown() again + will also wait for the callback. The wait must NOT be gated on + "this call initiated shutdown" -- otherwise a caller who got + False back from a timed-out shutdown() and retries will skip the + wait entirely on the second call and race cleanup against the + still-running callback. + """ + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + + callback_started = threading.Event() + callback_should_finish = threading.Event() + + def long_callback() -> None: + callback_started.set() + # Bound the wait so a broken test fails rather than hangs. + callback_should_finish.wait(timeout=10) + + tmr = self.node.create_timer(0.1, long_callback) + executor.add_node(self.node) + spin_thread = threading.Thread(target=executor.spin, daemon=True) + spin_thread.start() + + try: + # Wait for the callback to be running so the work_tracker count + # is guaranteed to be non-zero when shutdown's wait runs. + self.assertTrue(callback_started.wait(timeout=5)) + + # First shutdown: this is the one that flips _is_shutdown to + # True. Times out because the callback is still in flight. + self.assertFalse(executor.shutdown(timeout_sec=0.1)) + + # Second shutdown: _is_shutdown is already True. The callback + # is STILL in flight. The wait must run again (regardless of + # who initiated) and time out -- if it incorrectly skipped + # the wait, this would return True and race cleanup against + # the running callback. + self.assertFalse(executor.shutdown(timeout_sec=0.1)) + + # Release the callback. + callback_should_finish.set() + + # Final shutdown: callbacks have drained (or will momentarily). + # The wait should now succeed and cleanup should complete. + self.assertTrue(executor.shutdown(timeout_sec=5)) + finally: + # Guard rails in case an assertion above interrupts the flow. + callback_should_finish.set() + spin_thread.join(timeout=5) + self.node.destroy_timer(tmr) + def test_context_manager(self) -> None: self.assertIsNotNone(self.node.handle) From 396f00fc4dc9c9b5d4d72607342a693f7f3a667d Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 14 May 2026 17:49:40 +0000 Subject: [PATCH 03/10] Make executor shutdown honor the timeout_sec. Previous to this change, internally in shutdown() timeout_sec was set to 5 seconds in all cases except where the user passed in a positive number. Even then it wasn't completely honored, and since there is more than one wait in there, it could take multiple times the timeout_sec. This change fixes all of that by honoring the documentation: None - wait forever negative - wait forever 0 - don't wait at all positive - wait for this amount of time That last one also only cumulatively waits as well. Signed-off-by: Chris Lalancette --- rclpy/rclpy/executors.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 1e2a511cf..723ee8388 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -341,6 +341,20 @@ def shutdown(self, timeout_sec: Optional[float] = None) -> bool: # Tell executor it's been shut down if self._guard: self._guard.trigger() + # The timeout applies to the whole shutdown operation — both the + # callback drain and the spinner exit — not to each wait + # individually. Convert it into a deadline once; each wait below + # gets only the time remaining against that deadline. + if timeout_sec is None or timeout_sec < 0: + deadline: Optional[float] = None # block forever + else: + deadline = time.monotonic() + timeout_sec + + def remaining_timeout() -> Optional[float]: + if deadline is None: + return None + return max(0.0, deadline - time.monotonic()) + # Wait for any in-flight callbacks on OTHER threads to drain. Done # unconditionally (not just for the initiating call) so that: # - concurrent shutdown() calls don't race past the wait and start @@ -351,7 +365,7 @@ def shutdown(self, timeout_sec: Optional[float] = None) -> bool: # _work_tracker.wait excludes work being executed by the calling # thread, so this is safe from inside a callback — it will not # self-deadlock. - if not self._work_tracker.wait(timeout_sec): + if not self._work_tracker.wait(remaining_timeout()): return False # Clean up stuff that won't be used anymore @@ -360,12 +374,15 @@ def shutdown(self, timeout_sec: Optional[float] = None) -> bool: with self._is_spinning_cond: if self._spinning_thread is not threading.current_thread(): - # Cap the wait to avoid hanging the process if a spinner - # is stuck in a callback that never returns. - wait_timeout = timeout_sec if ( - timeout_sec is not None and timeout_sec > 0) else 5.0 - self._is_spinning_cond.wait_for( - lambda: not self._is_spinning, timeout=wait_timeout) + # Wait for the spin thread to acknowledge shutdown and + # exit before we destroy the guards (which the spinner + # may still be holding in its wait_set). If the wait + # times out, return False per the contract — don't + # destroy resources that the spinner might still touch. + if not self._is_spinning_cond.wait_for( + lambda: not self._is_spinning, + timeout=remaining_timeout()): + return False with self._shutdown_lock: if self._guard: From dec34908f1fcbb89169a467648c114f117137ad7 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 20 May 2026 09:53:37 -0400 Subject: [PATCH 04/10] Fix concurrent shutdown deadlock from worker callbacks. When two callbacks running on different MultiThreadedExecutor worker threads both call executor.shutdown() concurrently, both calls land in _work_tracker.wait(). The previous predicate only excluded the calling thread's own in-flight work, so each call counted the other caller's still-running callback as work it had to wait for. Since both callbacks are blocked in shutdown(), neither can finish, and both waits block until the timeout expires. This commit teaches _WorkTracker to track which threads are currently parked in wait() and exclude any in-flight work owned by those threads from every waiter's drain check. Once a thread is inside wait() it cannot be making progress on its callback, so counting that work as pending only ever causes spurious blocking. While in there, simplify the bookkeeping: _num_work_executing was a cached sum of _executing_thread_counts.values(), but the drain check doesn't need a sum -- only whether each executing thread is also a waiter. The predicate is now a single subset check, and the redundant counter is gone. Adds test_concurrent_shutdown_from_two_callbacks as a regression test. Pre-fix, both shutdowns time out and return False. Signed-off-by: Chris Lalancette Co-Authored-By: Claude Opus 4.7 (1M context) --- rclpy/rclpy/executors.py | 53 ++++++++++++++++++++---------- rclpy/test/test_executor.py | 65 +++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 17 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 723ee8388..db34d44a3 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -85,28 +85,30 @@ class _WorkTracker: """Track the amount of work that is in progress.""" def __init__(self) -> None: - # Number of tasks that are being executed - self._num_work_executing = 0 self._work_condition = threading.Condition() - # Per-thread reentrant counter of in-flight work, so callers can - # tell whether the calling thread is itself currently inside - # __enter__/__exit__ — used by Executor.shutdown() to avoid - # self-deadlock when shutdown is invoked from inside a callback. + # Per-thread reentrant counter of in-flight work. A thread has + # an entry iff it is currently inside __enter__/__exit__; the + # value is the reentrance depth. The set of keys is exactly the + # set of threads currently running a callback. self._executing_thread_counts: Dict[threading.Thread, int] = {} + # Threads currently parked inside wait(). A thread in this set + # cannot be making progress on its own callback, so its + # in-flight work (if any) should not block other waiters -- + # otherwise two callbacks on different worker threads both + # calling Executor.shutdown() would deadlock on each other. + self._waiting_threads: Set[threading.Thread] = set() def __enter__(self) -> None: - """Increment the amount of executing work by 1.""" + """Mark the calling thread as executing a callback.""" with self._work_condition: - self._num_work_executing += 1 t = threading.current_thread() self._executing_thread_counts[t] = ( self._executing_thread_counts.get(t, 0) + 1) def __exit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exctb: Optional[TracebackType]) -> None: - """Decrement the amount of work executing by 1.""" + """Mark the calling thread as having finished a callback.""" with self._work_condition: - self._num_work_executing -= 1 t = threading.current_thread() count = self._executing_thread_counts[t] - 1 if count == 0: @@ -119,8 +121,12 @@ def wait(self, timeout_sec: Optional[float] = None) -> bool: """ Wait until all work completes. - Work being executed by the calling thread is excluded from the count, - since that work is necessarily blocked on this call returning. + Work being executed by the calling thread is excluded from the wait, + since that work is necessarily blocked on this call returning. Work + being executed by any other thread that is itself currently parked in + wait() is also excluded, so concurrent shutdown() calls from inside + callbacks on different worker threads don't deadlock waiting for + each other. :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 :type timeout_sec: float or None @@ -128,12 +134,25 @@ def wait(self, timeout_sec: Optional[float] = None) -> bool: """ if timeout_sec is not None and timeout_sec < 0.0: timeout_sec = None + current = threading.current_thread() + + def other_work_drained() -> bool: + # True once every thread with in-flight work is itself parked + # here in wait() and thus making no further progress on its + # callback. Equivalently: no thread is actively executing. + return self._executing_thread_counts.keys() <= self._waiting_threads + with self._work_condition: - if not self._work_condition.wait_for( - lambda: self._num_work_executing == self._executing_thread_counts.get( - threading.current_thread(), 0), - timeout_sec): - return False + self._waiting_threads.add(current) + # A new waiter may have just satisfied an existing waiter's + # condition (its in-flight work is now excluded). + self._work_condition.notify_all() + try: + if not self._work_condition.wait_for(other_work_drained, timeout_sec): + return False + finally: + self._waiting_threads.discard(current) + self._work_condition.notify_all() return True diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 4e7a83731..b67e69967 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -17,6 +17,7 @@ import threading import time from typing import Generator +from typing import List from typing import Optional from typing import Protocol from typing import Set @@ -24,6 +25,7 @@ import warnings import rclpy +from rclpy.callback_groups import MutuallyExclusiveCallbackGroup from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.context import Context from rclpy.executors import Executor @@ -843,6 +845,69 @@ def long_callback() -> None: spin_thread.join(timeout=5) self.node.destroy_timer(tmr) + def test_concurrent_shutdown_from_two_callbacks(self) -> None: + """ + Two callbacks on different MultiThreadedExecutor workers calling + shutdown() concurrently must not deadlock on each other. + + _work_tracker.wait excludes the calling thread's own in-flight work + from its predicate, but if it counted other waiters' in-flight work + as still pending, two callbacks both inside shutdown() would each + wait for the other's callback to finish -- which can't happen, + because both callbacks are blocked in shutdown(). + """ + self.assertIsNotNone(self.node.handle) + executor = MultiThreadedExecutor(num_threads=2, context=self.context) + + # Distinct callback groups so the two timers can be dispatched to + # separate worker threads concurrently. (The default callback + # group is MutuallyExclusive at the node level.) + cb_group_a = MutuallyExclusiveCallbackGroup() + cb_group_b = MutuallyExclusiveCallbackGroup() + + # Use a barrier to make both callbacks reach shutdown() at the + # same time, so they are both inside _work_tracker.wait + # simultaneously -- the scenario the regression guards against. + barrier = threading.Barrier(2) + results: List[bool] = [] + results_lock = threading.Lock() + + def shutdown_from_callback() -> None: + try: + barrier.wait(timeout=5) + except threading.BrokenBarrierError: + return + ok = executor.shutdown(timeout_sec=5, wait_for_threads=False) + with results_lock: + results.append(ok) + + tmr_a = self.node.create_timer( + 0.05, shutdown_from_callback, callback_group=cb_group_a) + tmr_b = self.node.create_timer( + 0.05, shutdown_from_callback, callback_group=cb_group_b) + + executor.add_node(self.node) + spin_thread = threading.Thread(target=executor.spin, daemon=True) + spin_thread.start() + + try: + # Each shutdown() has an internal 5s timeout; allow both + # callbacks plus the spinner to wind down with margin. + spin_thread.join(timeout=15) + self.assertFalse( + spin_thread.is_alive(), + 'spin thread did not exit -- concurrent shutdown deadlocked') + with results_lock: + self.assertEqual(len(results), 2) + self.assertTrue( + all(results), + f'shutdown() returned False (timed out): {results}') + finally: + barrier.abort() + spin_thread.join(timeout=5) + self.node.destroy_timer(tmr_a) + self.node.destroy_timer(tmr_b) + def test_context_manager(self) -> None: self.assertIsNotNone(self.node.handle) From 3bc975748a6569249d453ad817d8170f89be31c7 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 20 May 2026 09:54:09 -0400 Subject: [PATCH 05/10] Fix MultiThreadedExecutor.shutdown() called from worker callback. Calling executor.shutdown() from inside a callback running on one of MultiThreadedExecutor's own worker threads previously raised RuntimeError("cannot join current thread"). The wrapper called self._executor.shutdown(wait=wait_for_threads), which (when wait_for_threads is True, the default) makes concurrent.futures.ThreadPoolExecutor join every worker via t.join(). If the caller is itself one of those workers, that loop hits t.join() on the current thread and Python refuses. Do the join loop ourselves instead. Mark the pool shut down with wait=False, then iterate self._executor._threads and skip the current thread. The current worker can't be waited on from itself anyway -- the rest of the callback will finish after shutdown() returns and the worker will exit naturally. Adds test_shutdown_from_multithreaded_executor_callback as a regression test. Pre-fix, shutdown() raises out of the callback. Signed-off-by: Chris Lalancette Co-Authored-By: Claude Opus 4.7 (1M context) --- rclpy/rclpy/executors.py | 19 +++++++++++++++-- rclpy/test/test_executor.py | 41 +++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index db34d44a3..e80520c80 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -1232,10 +1232,25 @@ def shutdown( :param timeout_sec: Seconds to wait. Block forever if ``None`` or negative. Don't wait if 0. :param wait_for_threads: If true, this function will block until all executor threads - have joined. + have joined. When shutdown() is called from inside a callback running on one of + this executor's worker threads, the *current* thread is necessarily excluded from + that join (Python cannot join a thread with itself) -- the rest of the callback + will finish after this returns and the worker will exit naturally. :return: ``True`` if all outstanding callbacks finished executing, or ``False`` if the timeout expires before all outstanding work is done. """ success: bool = super().shutdown(timeout_sec) - self._executor.shutdown(wait=wait_for_threads) + # Always tell the pool to shut down without waiting: if shutdown() + # was called from inside a callback running on one of these + # workers, letting ThreadPoolExecutor.shutdown(wait=True) join the + # current thread would raise RuntimeError. We do the joins below + # ourselves so we can skip the current thread. + self._executor.shutdown(wait=False) + if wait_for_threads: + current = threading.current_thread() + # Snapshot before iterating; _threads is stable post-shutdown + # (no new workers are spawned) but we copy defensively. + for t in list(self._executor._threads): + if t is not current: + t.join() return success diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index b67e69967..73deb2c44 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -845,6 +845,47 @@ def long_callback() -> None: spin_thread.join(timeout=5) self.node.destroy_timer(tmr) + def test_shutdown_from_multithreaded_executor_callback(self) -> None: + """ + MultiThreadedExecutor.shutdown() called from inside a callback + running on one of its own worker threads must not raise. + + ThreadPoolExecutor.shutdown(wait=True) joins every worker; the + worker that's currently executing the callback would otherwise + be joined to itself, raising RuntimeError("cannot join current + thread"). The wrapper must skip the current thread. + """ + self.assertIsNotNone(self.node.handle) + executor = MultiThreadedExecutor(num_threads=2, context=self.context) + + shutdown_returned = threading.Event() + shutdown_error: List[BaseException] = [] + + def timer_callback() -> None: + try: + # Default wait_for_threads=True is what triggers the bug. + executor.shutdown(timeout_sec=5) + except BaseException as e: + shutdown_error.append(e) + finally: + shutdown_returned.set() + + tmr = self.node.create_timer(0.1, timer_callback) + executor.add_node(self.node) + spin_thread = threading.Thread(target=executor.spin, daemon=True) + spin_thread.start() + + try: + self.assertTrue( + shutdown_returned.wait(timeout=15), + 'shutdown() never returned from inside the callback') + self.assertFalse( + shutdown_error, + f'shutdown() raised: {shutdown_error!r}') + finally: + spin_thread.join(timeout=5) + self.node.destroy_timer(tmr) + def test_concurrent_shutdown_from_two_callbacks(self) -> None: """ Two callbacks on different MultiThreadedExecutor workers calling From 5fbb0928e539d83302c166d8331b701b6bed1ad7 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 20 May 2026 10:21:23 -0400 Subject: [PATCH 06/10] Keep concurrent shutdown waiters in the drain set until the callback ends. The previous _WorkTracker fix added the calling thread to a _waiting_threads set on entry to wait() and removed it on exit. That ensured two concurrent waiters wouldn't deadlock on each other's still-running callback, but it created a different race: the second waiter's predicate becomes True the instant it adds itself, returns immediately, and its finally block removes it again -- which flips the first waiter's predicate back to False before the first waiter has had a chance to re-evaluate after the notify. The first waiter then ends up waiting until the second waiter's __exit__ runs (i.e., until the second waiter's callback ends, after the rest of its shutdown completes), serializing the two shutdowns instead of letting them proceed in parallel. Fix it by holding the waiter membership stable for as long as the thread's callback is still in flight: wait() does not remove the entry in its finally block when the thread is still inside the work_tracker; __exit__ removes the entry when the callback ends. External callers (no in-flight callback, never call __enter__) still get removed in wait()'s finally, since no __exit__ will run for them. A regression in test_concurrent_shutdown_from_two_callbacks exposed the serialization: the test gated on spin_thread.join(), but the spinner exits as soon as _is_shutdown is set, which can happen before either callback has finished its shutdown call -- so the test read results too early and saw only the faster callback's entry. Switch the test to wait on an explicit event signaled once both callbacks have appended their result. Signed-off-by: Chris Lalancette Co-Authored-By: Claude Opus 4.7 (1M context) --- rclpy/rclpy/executors.py | 53 +++++++++++++++++++++++++------------ rclpy/test/test_executor.py | 50 +++++++++------------------------- 2 files changed, 48 insertions(+), 55 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index e80520c80..b4ba6d90a 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -88,14 +88,19 @@ def __init__(self) -> None: self._work_condition = threading.Condition() # Per-thread reentrant counter of in-flight work. A thread has # an entry iff it is currently inside __enter__/__exit__; the - # value is the reentrance depth. The set of keys is exactly the + # value is the reentrance depth. The set of keys is the # set of threads currently running a callback. self._executing_thread_counts: Dict[threading.Thread, int] = {} - # Threads currently parked inside wait(). A thread in this set - # cannot be making progress on its own callback, so its - # in-flight work (if any) should not block other waiters -- + # Threads whose in-flight callback (if any) is committed to + # finishing rather than making further progress -- i.e., the + # thread is parked in wait() or has already returned from + # wait() and is finishing the surrounding shutdown. Their + # callback work should not block other waiters' drain checks; # otherwise two callbacks on different worker threads both # calling Executor.shutdown() would deadlock on each other. + # An entry is removed only when the owning callback ends + # (__exit__) or, for external callers with no in-flight + # callback, immediately when their wait() returns. self._waiting_threads: Set[threading.Thread] = set() def __enter__(self) -> None: @@ -113,6 +118,9 @@ def __exit__(self, exc_type: Optional[Type[BaseException]], count = self._executing_thread_counts[t] - 1 if count == 0: del self._executing_thread_counts[t] + # The thread's callback has ended, so it's no longer + # "committed to finishing" -- drop its waiter membership. + self._waiting_threads.discard(t) else: self._executing_thread_counts[t] = count self._work_condition.notify_all() @@ -123,10 +131,9 @@ def wait(self, timeout_sec: Optional[float] = None) -> bool: Work being executed by the calling thread is excluded from the wait, since that work is necessarily blocked on this call returning. Work - being executed by any other thread that is itself currently parked in - wait() is also excluded, so concurrent shutdown() calls from inside - callbacks on different worker threads don't deadlock waiting for - each other. + being executed by any other thread that has itself entered wait() is + also excluded, so concurrent shutdown() calls from inside callbacks + on different worker threads don't deadlock waiting for each other. :param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0 :type timeout_sec: float or None @@ -137,22 +144,34 @@ def wait(self, timeout_sec: Optional[float] = None) -> bool: current = threading.current_thread() def other_work_drained() -> bool: - # True once every thread with in-flight work is itself parked - # here in wait() and thus making no further progress on its - # callback. Equivalently: no thread is actively executing. + # True once every thread with in-flight work is itself in the + # waiting set, i.e., committed to finishing rather than making + # progress on its callback. return self._executing_thread_counts.keys() <= self._waiting_threads with self._work_condition: - self._waiting_threads.add(current) - # A new waiter may have just satisfied an existing waiter's - # condition (its in-flight work is now excluded). - self._work_condition.notify_all() + added_self = current not in self._waiting_threads + if added_self: + self._waiting_threads.add(current) + # A new waiter may have just satisfied an existing + # waiter's condition (its in-flight work is now excluded). + self._work_condition.notify_all() try: if not self._work_condition.wait_for(other_work_drained, timeout_sec): return False finally: - self._waiting_threads.discard(current) - self._work_condition.notify_all() + # Keep the waiter membership while a callback is still + # in flight on this thread -- removing it now would let + # other concurrent waiters' predicates flip back to + # False and re-block until our callback ends, even + # though our callback is committed to finishing (we are + # past wait() and the rest of shutdown is cleanup). + # __exit__ will drop the membership when the callback + # ends. For external callers with no in-flight + # callback, no __exit__ will run, so discard here. + if added_self and current not in self._executing_thread_counts: + self._waiting_threads.discard(current) + self._work_condition.notify_all() return True diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 73deb2c44..f27b9b3d0 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -768,7 +768,6 @@ def timer_callback() -> None: self.node.destroy_timer(tmr) def test_shutdown_executor_from_callback(self) -> None: - """https://github.com/ros2/rclpy/issues/944: allow for executor shutdown from callback.""" self.assertIsNotNone(self.node.handle) timer_period = 0.1 # TODO(bmartin427) This seems like an invalid test to me? executor.shutdown() is @@ -790,17 +789,6 @@ def timer_callback() -> None: self.node.destroy_timer(tmr) def test_shutdown_timeout_then_retry(self) -> None: - """ - Verify that every shutdown() call waits for callbacks to drain. - - A shutdown() call that times out while a callback is in flight - must leave the executor in a state where calling shutdown() again - will also wait for the callback. The wait must NOT be gated on - "this call initiated shutdown" -- otherwise a caller who got - False back from a timed-out shutdown() and retries will skip the - wait entirely on the second call and race cleanup against the - still-running callback. - """ self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) @@ -846,15 +834,6 @@ def long_callback() -> None: self.node.destroy_timer(tmr) def test_shutdown_from_multithreaded_executor_callback(self) -> None: - """ - MultiThreadedExecutor.shutdown() called from inside a callback - running on one of its own worker threads must not raise. - - ThreadPoolExecutor.shutdown(wait=True) joins every worker; the - worker that's currently executing the callback would otherwise - be joined to itself, raising RuntimeError("cannot join current - thread"). The wrapper must skip the current thread. - """ self.assertIsNotNone(self.node.handle) executor = MultiThreadedExecutor(num_threads=2, context=self.context) @@ -887,16 +866,6 @@ def timer_callback() -> None: self.node.destroy_timer(tmr) def test_concurrent_shutdown_from_two_callbacks(self) -> None: - """ - Two callbacks on different MultiThreadedExecutor workers calling - shutdown() concurrently must not deadlock on each other. - - _work_tracker.wait excludes the calling thread's own in-flight work - from its predicate, but if it counted other waiters' in-flight work - as still pending, two callbacks both inside shutdown() would each - wait for the other's callback to finish -- which can't happen, - because both callbacks are blocked in shutdown(). - """ self.assertIsNotNone(self.node.handle) executor = MultiThreadedExecutor(num_threads=2, context=self.context) @@ -912,6 +881,7 @@ def test_concurrent_shutdown_from_two_callbacks(self) -> None: barrier = threading.Barrier(2) results: List[bool] = [] results_lock = threading.Lock() + all_done = threading.Event() def shutdown_from_callback() -> None: try: @@ -921,6 +891,8 @@ def shutdown_from_callback() -> None: ok = executor.shutdown(timeout_sec=5, wait_for_threads=False) with results_lock: results.append(ok) + if len(results) == 2: + all_done.set() tmr_a = self.node.create_timer( 0.05, shutdown_from_callback, callback_group=cb_group_a) @@ -932,14 +904,16 @@ def shutdown_from_callback() -> None: spin_thread.start() try: - # Each shutdown() has an internal 5s timeout; allow both - # callbacks plus the spinner to wind down with margin. - spin_thread.join(timeout=15) - self.assertFalse( - spin_thread.is_alive(), - 'spin thread did not exit -- concurrent shutdown deadlocked') + # Wait for both callbacks to finish their shutdown calls -- + # don't gate on the spinner exiting, since the spinner can + # exit before either callback has appended its result. + self.assertTrue( + all_done.wait(timeout=15), + f'only {len(results)}/2 shutdowns completed -- ' + 'concurrent shutdown deadlocked') + spin_thread.join(timeout=5) + self.assertFalse(spin_thread.is_alive(), 'spin thread did not exit') with results_lock: - self.assertEqual(len(results), 2) self.assertTrue( all(results), f'shutdown() returned False (timed out): {results}') From c92bbb87c7e93dbabf8aa6beedadb48cfd74d0fb Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 20 May 2026 17:04:52 -0400 Subject: [PATCH 07/10] Fix await coroutine unblocking during context manager. Signed-off-by: Chris Lalancette --- rclpy/rclpy/executors.py | 57 ++++++++++++++++++++++------------- rclpy/test/test_executor.py | 60 +++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 21 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index b4ba6d90a..bc5f7ed8d 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -14,6 +14,7 @@ from collections import deque from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager from contextlib import ExitStack from dataclasses import dataclass from functools import partial @@ -103,27 +104,41 @@ def __init__(self) -> None: # callback, immediately when their wait() returns. self._waiting_threads: Set[threading.Thread] = set() - def __enter__(self) -> None: - """Mark the calling thread as executing a callback.""" - with self._work_condition: - t = threading.current_thread() - self._executing_thread_counts[t] = ( - self._executing_thread_counts.get(t, 0) + 1) - - def __exit__(self, exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], exctb: Optional[TracebackType]) -> None: - """Mark the calling thread as having finished a callback.""" + @contextmanager + def track_callback(self) -> Generator[None, None, None]: + """ + Track an in-flight callback for the duration of the context. + + The owning thread is captured at enter time and used to decrement + the per-thread count when the context exits -- even if the exit + runs on a different thread. That happens when a coroutine using + this context manager is suspended at an inner ``await`` and then + closed via GC (e.g. during executor teardown): ``coro.close()`` + raises ``GeneratorExit`` at the suspension point on whatever + thread the GC happened to run on, and the ``with`` block's + unwinding -- including this finally -- runs on that thread, not + the original worker thread. Using ``threading.current_thread()`` + in the finally would either lose the decrement (best case) or + ``KeyError`` (current case). + """ + owner = threading.current_thread() with self._work_condition: - t = threading.current_thread() - count = self._executing_thread_counts[t] - 1 - if count == 0: - del self._executing_thread_counts[t] - # The thread's callback has ended, so it's no longer - # "committed to finishing" -- drop its waiter membership. - self._waiting_threads.discard(t) - else: - self._executing_thread_counts[t] = count - self._work_condition.notify_all() + self._executing_thread_counts[owner] = ( + self._executing_thread_counts.get(owner, 0) + 1) + try: + yield + finally: + with self._work_condition: + count = self._executing_thread_counts[owner] - 1 + if count == 0: + del self._executing_thread_counts[owner] + # The thread's callback has ended, so it's no longer + # "committed to finishing" -- drop its waiter + # membership. + self._waiting_threads.discard(owner) + else: + self._executing_thread_counts[owner] = count + self._work_condition.notify_all() def wait(self, timeout_sec: Optional[float] = None) -> bool: """ @@ -762,7 +777,7 @@ async def handler(entity: 'EntityT', gc: GuardCondition, is_shutdown: bool, entity._executor_event = False gc.trigger() return - with work_tracker: + with work_tracker.track_callback(): # The take_from_wait_list method here is expected to return either an async def # method or None if there is no work to do. call_coroutine = take_from_wait_list(entity) diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index f27b9b3d0..d6eca117c 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -833,6 +833,66 @@ def long_callback() -> None: spin_thread.join(timeout=5) self.node.destroy_timer(tmr) + def test_work_tracker_coroutine_closed_on_different_thread(self) -> None: + """ + A coroutine using _WorkTracker.track_callback() that gets closed + from a thread other than the one that started it must not raise. + + This is the Windows GC scenario: an executor's handler coroutine + suspends at an inner ``await``, the executor is torn down, and + Python GC calls ``coro.close()`` later -- ``GeneratorExit`` is + raised at the suspension point and the ``with`` block unwinds + on whatever thread the GC ran on, not the original worker + thread. The bookkeeping must still clean up correctly. + """ + from rclpy.executors import _WorkTracker + + class YieldOnce: + def __await__(self) -> Generator[None, None, None]: + yield None + return None + + wt = _WorkTracker() + + async def callback_like() -> None: + with wt.track_callback(): + await YieldOnce() + + coro = callback_like() + + # Start the coroutine on thread A: runs through track_callback's + # __enter__ (incrementing the count for thread A) and suspends + # at ``await YieldOnce()``. + def thread_a() -> None: + coro.send(None) + + ta = threading.Thread(target=thread_a, name='WorkerA') + ta.start() + ta.join(timeout=5) + self.assertFalse(ta.is_alive()) + + # Close the coroutine from thread B (a different thread, mimicking + # the GC thread). Pre-fix this raises KeyError because the + # __exit__ looked up _executing_thread_counts[current_thread] + # and current_thread is thread B, not the thread that entered. + errors: List[BaseException] = [] + + def thread_b() -> None: + try: + coro.close() + except BaseException as e: + errors.append(e) + + tb = threading.Thread(target=thread_b, name='GCThread') + tb.start() + tb.join(timeout=5) + self.assertFalse(tb.is_alive()) + + self.assertFalse(errors, f'close() raised: {errors!r}') + self.assertFalse( + wt._executing_thread_counts, + f'work tracker not cleaned up: {wt._executing_thread_counts!r}') + def test_shutdown_from_multithreaded_executor_callback(self) -> None: self.assertIsNotNone(self.node.handle) executor = MultiThreadedExecutor(num_threads=2, context=self.context) From 7033d8a79995604b04361dbf5be613e87c6727a4 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 21 May 2026 01:25:34 +0000 Subject: [PATCH 08/10] Lint. Signed-off-by: Chris Lalancette --- rclpy/test/test_executor.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index d6eca117c..a7a3ce1c6 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -28,6 +28,7 @@ from rclpy.callback_groups import MutuallyExclusiveCallbackGroup from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.context import Context +from rclpy.executors import _WorkTracker from rclpy.executors import Executor from rclpy.executors import MultiThreadedExecutor from rclpy.executors import ShutdownException @@ -834,20 +835,8 @@ def long_callback() -> None: self.node.destroy_timer(tmr) def test_work_tracker_coroutine_closed_on_different_thread(self) -> None: - """ - A coroutine using _WorkTracker.track_callback() that gets closed - from a thread other than the one that started it must not raise. - - This is the Windows GC scenario: an executor's handler coroutine - suspends at an inner ``await``, the executor is torn down, and - Python GC calls ``coro.close()`` later -- ``GeneratorExit`` is - raised at the suspension point and the ``with`` block unwinds - on whatever thread the GC ran on, not the original worker - thread. The bookkeeping must still clean up correctly. - """ - from rclpy.executors import _WorkTracker - class YieldOnce: + def __await__(self) -> Generator[None, None, None]: yield None return None From 33c77c9d58ec9d8e9ea3246fbcfe51ec8dcd3bbf Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Tue, 2 Jun 2026 08:11:47 -0500 Subject: [PATCH 09/10] Test waiter leak on timeout (#1674) * Add test case for _WorkTracker waiter membership leak on timeout Signed-off-by: Michael Carroll * Fix waiter membership leak in _WorkTracker.wait on timeout Signed-off-by: Michael Carroll --------- Signed-off-by: Michael Carroll --- rclpy/rclpy/executors.py | 15 +++++---- rclpy/test/test_executor.py | 62 +++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index bc5f7ed8d..9eb93732f 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -171,9 +171,10 @@ def other_work_drained() -> bool: # A new waiter may have just satisfied an existing # waiter's condition (its in-flight work is now excluded). self._work_condition.notify_all() + drained = False try: - if not self._work_condition.wait_for(other_work_drained, timeout_sec): - return False + drained = self._work_condition.wait_for(other_work_drained, timeout_sec) + return drained finally: # Keep the waiter membership while a callback is still # in flight on this thread -- removing it now would let @@ -184,10 +185,12 @@ def other_work_drained() -> bool: # __exit__ will drop the membership when the callback # ends. For external callers with no in-flight # callback, no __exit__ will run, so discard here. - if added_self and current not in self._executing_thread_counts: - self._waiting_threads.discard(current) - self._work_condition.notify_all() - return True + # However, if we timed out (not drained), we are NOT committed + # to finishing successfully, so we must discard the membership. + if not drained or (added_self and current not in self._executing_thread_counts): + if added_self: + self._waiting_threads.discard(current) + self._work_condition.notify_all() @overload diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index a7a3ce1c6..635c007a9 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -972,6 +972,68 @@ def shutdown_from_callback() -> None: self.node.destroy_timer(tmr_a) self.node.destroy_timer(tmr_b) + def test_work_tracker_waiter_leak_on_timeout(self) -> None: + wt = _WorkTracker() + + worker_a_running = threading.Event() + worker_a_should_exit = threading.Event() + worker_b_running = threading.Event() + + errors_a = [] + wait_returned_a = [] + + def worker_a_thread(): + try: + with wt.track_callback(): + worker_a_running.set() + # Call wait with a short timeout. This times out because + # Worker B is executing a callback and not waiting. + res = wt.wait(timeout_sec=0.1) + wait_returned_a.append(res) + # Stay alive inside the callback context + worker_a_should_exit.wait() + except Exception as e: + errors_a.append(e) + + def worker_b_thread(): + with wt.track_callback(): + worker_b_running.set() + # Run until Worker A's wait times out + time.sleep(0.5) + + tb = threading.Thread(target=worker_b_thread, name='WorkerB') + tb.start() + + ta = threading.Thread(target=worker_a_thread, name='WorkerA') + ta.start() + + self.assertTrue(worker_a_running.wait(timeout=2.0)) + self.assertTrue(worker_b_running.wait(timeout=2.0)) + + # Wait for Worker B to finish + tb.join(timeout=2.0) + + self.assertFalse(errors_a) + self.assertEqual(wait_returned_a, [False]) + + # MainThread calls wait(). Since Worker B finished, only Worker A is active. + # However, Worker A leaked into _waiting_threads from the timeout. + # MainThread's wait() will prematurely evaluate to True and exit instantly. + start_time = time.monotonic() + res_main = wt.wait(timeout_sec=0.2) + elapsed = time.monotonic() - start_time + + try: + # Under the bug, res_main is True and elapsed is ~0.0s. + # In the corrected code, it correctly blocks/returns False after 0.2s. + self.assertFalse( + res_main, + 'MainThread wait should have timed out because WorkerA is still running') + self.assertGreaterEqual(elapsed, 0.15) + finally: + worker_a_should_exit.set() + ta.join(timeout=2.0) + def test_context_manager(self) -> None: self.assertIsNotNone(self.node.handle) From 403dcb97cb26621d882ac5d6f6aec09e3f764b77 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 2 Jun 2026 13:43:43 +0000 Subject: [PATCH 10/10] Feedback from review. Signed-off-by: Chris Lalancette --- rclpy/rclpy/executors.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 9eb93732f..db51a8cee 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -187,10 +187,9 @@ def other_work_drained() -> bool: # callback, no __exit__ will run, so discard here. # However, if we timed out (not drained), we are NOT committed # to finishing successfully, so we must discard the membership. - if not drained or (added_self and current not in self._executing_thread_counts): - if added_self: - self._waiting_threads.discard(current) - self._work_condition.notify_all() + if added_self and (not drained or current not in self._executing_thread_counts): + self._waiting_threads.discard(current) + self._work_condition.notify_all() @overload