From cd1385ff6faebea1c33f0a4ada0b852452f45c4b Mon Sep 17 00:00:00 2001 From: Nadav Elkabets <32939935+nadavelkabets@users.noreply.github.com> Date: Sat, 18 Apr 2026 21:04:35 +0300 Subject: [PATCH 1/3] Bugfix: executor doesn't propagate exception from task that awaited a future (#1643) * Schedule the original task when task awaits a future Signed-off-by: Nadav Elkabets * Add MultiThreadedExecutor to test Signed-off-by: Nadav Elkabets * Add tests for awaiting a done future and task cancellation during await Signed-off-by: Nadav Elkabets * Removed unused variable Signed-off-by: Nadav Elkabets --------- Signed-off-by: Nadav Elkabets (cherry picked from commit aac0ebb424a3045b20b52d6e62575f3af2fabfc8) # Conflicts: # rclpy/rclpy/task.py # rclpy/test/test_executor.py --- rclpy/rclpy/executors.py | 4 ++ rclpy/rclpy/task.py | 39 +++++++++++++++-- rclpy/test/test_executor.py | 84 +++++++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 4 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 453664c8e..0a2e9d87c 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -661,6 +661,10 @@ def _wait_for_ready_callbacks( ready_tasks_count = len(self._ready_tasks) for _ in range(ready_tasks_count): task = self._ready_tasks.popleft() + # Skip tasks that were cancelled or set done while awaiting a + # future and got rescheduled when the future completed + if task.cancelled() or task.done(): + continue task_data = self._pending_tasks[task] node = task_data.source_node if node is None or node in nodes_to_use: diff --git a/rclpy/rclpy/task.py b/rclpy/rclpy/task.py index 1686da0e2..b5640ea90 100644 --- a/rclpy/rclpy/task.py +++ b/rclpy/rclpy/task.py @@ -44,8 +44,13 @@ def __init__(self, *, executor=None): # An exception raised by the handler when called self._exception = None self._exception_fetched = False +<<<<<<< HEAD # callbacks to be scheduled after this task completes self._callbacks = [] +======= + # callbacks or tasks to be scheduled after this task completes + self._callbacks: List[Union[Callable[['Future[T]'], None], 'Task[Any]']] = [] +>>>>>>> aac0ebb (Bugfix: executor doesn't propagate exception from task that awaited a future (#1643)) # Lock for threadsafety self._lock = threading.Lock() # An executor to use when scheduling done callbacks @@ -157,10 +162,18 @@ def _schedule_or_invoke_done_callbacks(self): if executor is not None: # Have the executor take care of the callbacks for callback in callbacks: - executor.create_task(callback, self) + if isinstance(callback, Task): + executor._call_task_in_next_spin(callback) + else: + executor.create_task(callback, self) else: # No executor, call right away for callback in callbacks: + if isinstance(callback, Task): + warnings.warn( + 'Dropping task awaiting future: ' + 'executor reference could not be resolved') + continue try: callback(self) except Exception as e: @@ -201,7 +214,26 @@ def add_done_callback(self, callback): if invoke: callback(self) +<<<<<<< HEAD def remove_done_callback(self, callback: Callable[['Future'], None]) -> bool: +======= + def _add_waiting_task(self, task: 'Task[Any]') -> None: + """Schedule a task to resume when this future completes.""" + with self._lock: + if not self._pending(): + assert self._executor is not None + executor = self._executor() + if executor is not None: + executor._call_task_in_next_spin(task) + else: + warnings.warn( + 'Dropping task awaiting future: ' + 'executor reference could not be resolved') + else: + self._callbacks.append(task) + + def remove_done_callback(self, callback: Callable[['Future[T]'], None]) -> bool: +>>>>>>> aac0ebb (Bugfix: executor doesn't propagate exception from task that awaited a future (#1643)) """ Remove a previously-added done callback. @@ -317,9 +349,8 @@ def _add_resume_callback(self, future: Future, executor) -> None: elif future_executor is not executor: raise RuntimeError('A task can only await futures associated with the same executor') - # The future is associated with the same executor, so we can resume the task directly - # in the done callback - future.add_done_callback(lambda _: self.__call__()) + # Register the task to resume when the future is done or cancelled + future._add_waiting_task(self) def _complete_task(self) -> None: """Cleanup after task finished.""" diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 19218223e..79575afc9 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -414,7 +414,91 @@ async def coro2(): self.assertTrue(future1.done()) self.assertEqual('Sentinel Result 1', future1.result()) +<<<<<<< HEAD def test_create_task_during_spin(self): +======= + def test_coroutine_exception_after_await(self) -> None: + """Exception in a coroutine after awaiting a future must propagate.""" + self.assertIsNotNone(self.node.handle) + # EventsExecutor excluded - segfaults on exception propagation (#1641) + for cls in [SingleThreadedExecutor, MultiThreadedExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) + + first_fut = executor.create_future() + second_fut = executor.create_future() + + async def coro_that_raises() -> None: + first_fut.set_result(None) + await second_fut + raise RuntimeError('Expected error after await') + + task = executor.create_task(coro_that_raises) + + executor.spin_until_future_complete(first_fut, timeout_sec=5) + self.assertFalse(task.done()) + # Resolve the inner future — triggers resume + second_fut.set_result(None) + + with self.assertRaises(RuntimeError) as cm: + executor.spin_until_future_complete(task, timeout_sec=5) + self.assertIn('Expected error after await', str(cm.exception)) + + def test_cancel_task_while_awaiting_future(self) -> None: + """Cancelling a task parked on a future must not crash the dispatch loop.""" + self.assertIsNotNone(self.node.handle) + # EventsExecutor excluded - see #1641 + for cls in [SingleThreadedExecutor, MultiThreadedExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) + + first_fut = executor.create_future() + second_fut = executor.create_future() + third_fut = executor.create_future() + + async def coro() -> None: + first_fut.set_result(None) + await second_fut + third_fut.set_result(None) + + task = executor.create_task(coro) + + executor.spin_until_future_complete(first_fut, timeout_sec=5) + self.assertFalse(task.done()) + + task.cancel() + self.assertTrue(task.cancelled()) + + second_fut.set_result(None) + + executor.spin_until_future_complete(first_fut, timeout_sec=5) + self.assertFalse(third_fut.done()) + + def test_await_already_completed_future(self) -> None: + """Awaiting an already-completed future must resume and return its result.""" + self.assertIsNotNone(self.node.handle) + # EventsExecutor excluded - see #1641 + for cls in [SingleThreadedExecutor, MultiThreadedExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) + + fut: Future[str] = executor.create_future() + fut.set_result('done') # complete before the task runs + + async def coro() -> str: + return await fut # type: ignore[return-value] + + task = executor.create_task(coro) + + executor.spin_until_future_complete(task, timeout_sec=5) + self.assertTrue(task.done()) + self.assertEqual('done', task.result()) + + def test_create_task_during_spin(self) -> None: +>>>>>>> aac0ebb (Bugfix: executor doesn't propagate exception from task that awaited a future (#1643)) self.assertIsNotNone(self.node.handle) for cls in [SingleThreadedExecutor, EventsExecutor]: with self.subTest(cls=cls): From 6aa7dcc91739b154af251b5f268505a163549c8e Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Tue, 28 Apr 2026 20:09:43 -0500 Subject: [PATCH 2/3] Resolve conflicts for Jazzy backport of #1643 --- rclpy/rclpy/task.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/rclpy/rclpy/task.py b/rclpy/rclpy/task.py index b5640ea90..2782cfd87 100644 --- a/rclpy/rclpy/task.py +++ b/rclpy/rclpy/task.py @@ -44,13 +44,8 @@ def __init__(self, *, executor=None): # An exception raised by the handler when called self._exception = None self._exception_fetched = False -<<<<<<< HEAD - # callbacks to be scheduled after this task completes - self._callbacks = [] -======= # callbacks or tasks to be scheduled after this task completes - self._callbacks: List[Union[Callable[['Future[T]'], None], 'Task[Any]']] = [] ->>>>>>> aac0ebb (Bugfix: executor doesn't propagate exception from task that awaited a future (#1643)) + self._callbacks = [] # Lock for threadsafety self._lock = threading.Lock() # An executor to use when scheduling done callbacks From e9b78006d7932edf4c6a6a76b1f8b6f7a221e022 Mon Sep 17 00:00:00 2001 From: Alejandro Hernandez Cordero Date: Tue, 23 Jun 2026 11:33:06 +0200 Subject: [PATCH 3/3] Fixed merge Signed-off-by: Alejandro Hernandez Cordero --- rclpy/rclpy/task.py | 4 ---- rclpy/test/test_executor.py | 4 ---- 2 files changed, 8 deletions(-) diff --git a/rclpy/rclpy/task.py b/rclpy/rclpy/task.py index 2782cfd87..ebd81321b 100644 --- a/rclpy/rclpy/task.py +++ b/rclpy/rclpy/task.py @@ -209,9 +209,6 @@ def add_done_callback(self, callback): if invoke: callback(self) -<<<<<<< HEAD - def remove_done_callback(self, callback: Callable[['Future'], None]) -> bool: -======= def _add_waiting_task(self, task: 'Task[Any]') -> None: """Schedule a task to resume when this future completes.""" with self._lock: @@ -228,7 +225,6 @@ def _add_waiting_task(self, task: 'Task[Any]') -> None: self._callbacks.append(task) def remove_done_callback(self, callback: Callable[['Future[T]'], None]) -> bool: ->>>>>>> aac0ebb (Bugfix: executor doesn't propagate exception from task that awaited a future (#1643)) """ Remove a previously-added done callback. diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 79575afc9..77b8ae766 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -414,9 +414,6 @@ async def coro2(): self.assertTrue(future1.done()) self.assertEqual('Sentinel Result 1', future1.result()) -<<<<<<< HEAD - def test_create_task_during_spin(self): -======= def test_coroutine_exception_after_await(self) -> None: """Exception in a coroutine after awaiting a future must propagate.""" self.assertIsNotNone(self.node.handle) @@ -498,7 +495,6 @@ async def coro() -> str: self.assertEqual('done', task.result()) def test_create_task_during_spin(self) -> None: ->>>>>>> aac0ebb (Bugfix: executor doesn't propagate exception from task that awaited a future (#1643)) self.assertIsNotNone(self.node.handle) for cls in [SingleThreadedExecutor, EventsExecutor]: with self.subTest(cls=cls):