Checklist
Steps to reproduce
import asyncio
import mode.threads
class CrashingServiceThread(mode.threads.ServiceThread):
async def on_start(self):
raise RuntimeError('I am here to crash')
async def main():
await CrashingServiceThread().start()
if __name__ == "__main__":
asyncio.run(main())
Expected behavior
Process exits.
Actual behavior
Process hangs.
Full traceback
'CrashingServiceThread' crashed: RuntimeError('I am here to crash')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 218, in _serve
await self._default_start()
File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 739, in _default_start
await self._actually_start()
File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 756, in _actually_start
await self.on_start()
File "app2.py", line 6, in on_start
raise RuntimeError('I am here to crash')
RuntimeError: I am here to crash
--> Hangs here. ^C is pressed.
Traceback (most recent call last):
File "app2.py", line 12, in <module>
asyncio.run(main())
File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 570, in run_until_complete
self.run_forever()
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 538, in run_forever
self._run_once()
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1746, in _run_once
event_list = self._selector.select(timeout)
File "/usr/local/lib/python3.7/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
--> Another ^C.
Exception ignored in: <module 'threading' from '/usr/local/lib/python3.7/threading.py'>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/threading.py", line 1307, in _shutdown
lock.acquire()
KeyboardInterrupt
Versions
- Python 3.7.4
- Mode 4.1.6
- Debian GNU/Linux 10 (buster), Windows 8.1
Additional Info
mode.threads.ServiceThread and mode.threads.WorkerThread locks each other when an exception is raised.
I've dug up the problem, but I don't have sufficient knowledge of this lib to fix this properly.
Hire is what I've found (my comments are denoted by ###):
class WorkerThread(threading.Thread):
def run(self) -> None:
try:
self.service._start_thread()
finally:
### self._is_stopped.set() done hire
### but WorkerThread.stop() is called inside self.service._start_thread()
### and infinitely waits, so we never reach this
self._set_stopped()
def stop(self) -> None:
self._is_stopped.wait() ### we infinitely wait here because run() is still running up the stack
if self.is_alive():
self.join(threading.TIMEOUT_MAX)
class ServiceThread(Service):
def _start_thread(self) -> None:
# set the default event loop for this thread
asyncio.set_event_loop(self.thread_loop)
try:
self.thread_loop.run_until_complete(self._serve())
except Exception:
# if self._serve raises an exception we need to set
# shutdown here, since _shutdown_thread will not execute.
### I think this is not true, _shutdown_thread is executed
### in case of the exception in _serve.
self.set_shutdown()
raise
async def _shutdown_thread(self) -> None:
await self._default_stop_children()
await self.on_thread_stop()
self.set_shutdown()
await self._default_stop_futures()
if self._thread is not None:
### problem is here
### May be we shouldn't call this from inside the thread
self._thread.stop()
await self._default_stop_exit_stacks()
async def _serve(self) -> None:
try:
# start the service
await self._default_start()
# allow ServiceThread.start() to return
# when wait_for_thread is enabled.
await self.on_thread_started()
notify(self._thread_running)
await self.wait_until_stopped()
except asyncio.CancelledError:
raise
except BaseException as exc: # pylint: disable=broad-except
self.on_crash('{0!r} crashed: {1!r}', self.label, exc)
await self.crash(exc)
if self.beacon.root is not None:
await self.beacon.root.data.crash(exc)
raise
finally:
await self._shutdown_thread() ### this is called in case of the exception
It also causes faust to hang as well:
import faust
import mode
app = faust.App(
"some_app",
broker="kafka://localhost:9092",
# agent_supervisor=mode.CrashingSupervisor, # This won't help
)
@app.task
async def some_task():
raise RuntimeError("Some Error")
if __name__ == "__main__":
app.main()
Process will hang in crashed state.
Also, it seems to be causing this faust issue.
Checklist
masterbranch of Mode.Steps to reproduce
Expected behavior
Process exits.
Actual behavior
Process hangs.
Full traceback
'CrashingServiceThread' crashed: RuntimeError('I am here to crash') Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 218, in _serve await self._default_start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 739, in _default_start await self._actually_start() File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 756, in _actually_start await self.on_start() File "app2.py", line 6, in on_start raise RuntimeError('I am here to crash') RuntimeError: I am here to crash --> Hangs here. ^C is pressed. Traceback (most recent call last): File "app2.py", line 12, in <module> asyncio.run(main()) File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run return loop.run_until_complete(main) File "/usr/local/lib/python3.7/asyncio/base_events.py", line 570, in run_until_complete self.run_forever() File "/usr/local/lib/python3.7/asyncio/base_events.py", line 538, in run_forever self._run_once() File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1746, in _run_once event_list = self._selector.select(timeout) File "/usr/local/lib/python3.7/selectors.py", line 468, in select fd_event_list = self._selector.poll(timeout, max_ev) KeyboardInterrupt --> Another ^C. Exception ignored in: <module 'threading' from '/usr/local/lib/python3.7/threading.py'> Traceback (most recent call last): File "/usr/local/lib/python3.7/threading.py", line 1307, in _shutdown lock.acquire() KeyboardInterruptVersions
Additional Info
mode.threads.ServiceThreadandmode.threads.WorkerThreadlocks each other when an exception is raised.I've dug up the problem, but I don't have sufficient knowledge of this lib to fix this properly.
Hire is what I've found (my comments are denoted by
###):It also causes faust to hang as well:
Process will hang in crashed state.
Also, it seems to be causing this faust issue.