diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 6b2ab4b288..5da01dbc6c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -149,6 +149,13 @@ def _try_libev_import(): except DependencyException as e: return (None, e) +def _try_asyncio_import(): + try: + from cassandra.io.asyncioreactor import AsyncioConnection + return (AsyncioConnection, None) + except (ImportError, DependencyException) as e: + return (None, e) + def _try_asyncore_import(): try: from cassandra.io.asyncorereactor import AsyncoreConnection @@ -168,7 +175,7 @@ def _connection_reduce_fn(val,import_fn): log = logging.getLogger(__name__) -conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import) +conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncio_import, _try_asyncore_import) (conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[])) if not conn_class: raise DependencyException("Unable to load a default connection class", excs) @@ -876,25 +883,21 @@ def default_retry_policy(self, policy): This determines what event loop system will be used for managing I/O with Cassandra. These are the current options: - * :class:`cassandra.io.asyncorereactor.AsyncoreConnection` * :class:`cassandra.io.libevreactor.LibevConnection` + * :class:`cassandra.io.asyncioreactor.AsyncioConnection` + * :class:`cassandra.io.asyncorereactor.AsyncoreConnection` (Python < 3.12 only) * :class:`cassandra.io.eventletreactor.EventletConnection` (requires monkey-patching - see doc for details) * :class:`cassandra.io.geventreactor.GeventConnection` (requires monkey-patching - see doc for details) * :class:`cassandra.io.twistedreactor.TwistedConnection` - * EXPERIMENTAL: :class:`cassandra.io.asyncioreactor.AsyncioConnection` - - By default, ``AsyncoreConnection`` will be used, which uses - the ``asyncore`` module in the Python standard library. - - If ``libev`` is installed, ``LibevConnection`` will be used instead. - If ``gevent`` or ``eventlet`` monkey-patching is detected, the corresponding - connection class will be used automatically. + The default is selected automatically using the following priority: - ``AsyncioConnection``, which uses the ``asyncio`` module in the Python - standard library, is also available, but currently experimental. Note that - it requires ``asyncio`` features that were only introduced in the 3.4 line - in 3.4.6, and in the 3.5 line in 3.5.1. + 1. If ``gevent`` or ``eventlet`` monkey-patching is detected, the + corresponding connection class will be used. + 2. If the ``libev`` C extension is available, ``LibevConnection`` is used. + 3. ``AsyncioConnection`` is used as the standard-library fallback. This is + the preferred default on Python 3.12+ where ``asyncore`` was removed. + 4. On Python < 3.12, ``AsyncoreConnection`` is used as a last resort. """ control_connection_timeout = 2.0 diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 95f92e26e0..3ac32751e8 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -1,5 +1,6 @@ from cassandra.connection import Connection, ConnectionShutdown +import atexit import asyncio import logging import os @@ -11,19 +12,30 @@ log = logging.getLogger(__name__) -# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and -# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the -# managed coroutines are generator-based, not native coroutines. See PEP 492: -# https://www.python.org/dev/peps/pep-0492/#coroutine-objects +def _cleanup(): + """ + Module-level cleanup called at interpreter shutdown via atexit. + Stops the asyncio event loop and joins the loop thread. + """ + loop = AsyncioConnection._loop + thread = AsyncioConnection._loop_thread + if loop is not None: + try: + loop.call_soon_threadsafe(loop.stop) + except RuntimeError: + # loop may already be closed post-fork or during shutdown + pass + if thread is not None: + thread.join(timeout=1.0) + if thread.is_alive(): + log.warning( + "Event loop thread could not be joined, so shutdown may not be clean. " + "Please call Cluster.shutdown() to avoid this.") + else: + log.debug("Event loop thread was joined") -try: - asyncio.run_coroutine_threadsafe -except AttributeError: - raise ImportError( - 'Cannot use asyncioreactor without access to ' - 'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)' - ) +atexit.register(_cleanup) class AsyncioTimer(object): @@ -67,11 +79,12 @@ def finish(self): class AsyncioConnection(Connection): """ - An experimental implementation of :class:`.Connection` that uses the - ``asyncio`` module in the Python standard library for its event loop. + An implementation of :class:`.Connection` that uses the ``asyncio`` + module in the Python standard library for its event loop. - Note that it requires ``asyncio`` features that were only introduced in the - 3.4 line in 3.4.6, and in the 3.5 line in 3.5.1. + This is the preferred connection class on Python 3.12+ where the + ``asyncore`` module has been removed. It is also used as a fallback + when the libev C extension is not available. """ _loop = None @@ -106,18 +119,33 @@ def __init__(self, *args, **kwargs): def initialize_reactor(cls): with cls._lock: if cls._pid != os.getpid(): - cls._loop = None + log.debug("Detected fork, clearing and reinitializing reactor state") + cls.handle_fork() if cls._loop is None: cls._loop = asyncio.new_event_loop() - asyncio.set_event_loop(cls._loop) - if not cls._loop_thread: + if not cls._loop_thread or not cls._loop_thread.is_alive(): # daemonize so the loop will be shut down on interpreter # shutdown cls._loop_thread = Thread(target=cls._loop.run_forever, daemon=True, name="asyncio_thread") cls._loop_thread.start() + @classmethod + def handle_fork(cls): + """ + Called after a fork. Cleans up any reactor state from the parent + process so that a fresh event loop can be started in the child. + """ + if cls._loop is not None: + try: + cls._loop.call_soon_threadsafe(cls._loop.stop) + except RuntimeError: + pass + cls._loop = None + cls._loop_thread = None + cls._pid = os.getpid() + @classmethod def create_timer(cls, timeout, callback): return AsyncioTimer(timeout, callback, loop=cls._loop) @@ -173,7 +201,7 @@ def push(self, data): async def _push_msg(self, chunks): # This lock ensures all chunks of a message are sequential in the Queue - with await self._write_queue_lock: + async with self._write_queue_lock: for chunk in chunks: self._write_queue.put_nowait(chunk) diff --git a/tests/__init__.py b/tests/__init__.py index 7799b51399..8388059826 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -54,7 +54,7 @@ def is_monkey_patched(): return is_gevent_monkey_patched() or is_eventlet_monkey_patched() MONKEY_PATCH_LOOP = bool(os.getenv('MONKEY_PATCH_LOOP', False)) -EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "libev") +EVENT_LOOP_MANAGER = os.getenv('EVENT_LOOP_MANAGER', "asyncio") # If set to to true this will force the Cython tests to run regardless of whether they are installed diff --git a/tests/unit/io/test_asyncioreactor.py b/tests/unit/io/test_asyncioreactor.py index 65708d41dc..3fc1067f55 100644 --- a/tests/unit/io/test_asyncioreactor.py +++ b/tests/unit/io/test_asyncioreactor.py @@ -1,19 +1,21 @@ AsyncioConnection, ASYNCIO_AVAILABLE = None, False try: from cassandra.io.asyncioreactor import AsyncioConnection - import asynctest ASYNCIO_AVAILABLE = True except (ImportError, SyntaxError): AsyncioConnection = None ASYNCIO_AVAILABLE = False from tests import is_monkey_patched, connection_class -from tests.unit.io.utils import TimerCallback, TimerTestMixin +from tests.unit.io.utils import TimerCallback, TimerTestMixin, submit_and_wait_for_completion -from unittest.mock import patch +from unittest.mock import patch, MagicMock, Mock, AsyncMock +import asyncio +import socket as stdlib_socket import unittest import time +import threading skip_me = (is_monkey_patched() or (not ASYNCIO_AVAILABLE) or @@ -56,7 +58,7 @@ def setUp(self): socket_patcher.start() old_selector = AsyncioConnection._loop._selector - AsyncioConnection._loop._selector = asynctest.TestSelector() + AsyncioConnection._loop._selector = MagicMock() def reset_selector(): AsyncioConnection._loop._selector = old_selector @@ -65,6 +67,31 @@ def reset_selector(): super(AsyncioTimerTests, self).setUp() + def test_multi_timer_validation(self): + """ + Override with a wider tolerance for asyncio's thread-based scheduling, + which has inherently more jitter than libev's native event loop. + """ + from tests.unit.io.utils import get_timeout + pending_callbacks = [] + completed_callbacks = [] + + for gross_time in range(0, 100, 1): + timeout = get_timeout(gross_time, 0, 100, 100, False) + callback = TimerCallback(timeout) + self.create_timer(timeout, callback.invoke) + pending_callbacks.append(callback) + + while len(pending_callbacks) != 0: + for callback in pending_callbacks: + if callback.was_invoked(): + pending_callbacks.remove(callback) + completed_callbacks.append(callback) + time.sleep(.1) + + for callback in completed_callbacks: + self.assertAlmostEqual(callback.expected_wait, callback.get_wait_time(), delta=.25) + def test_timer_cancellation(self): # Various lists for tracking callback stage timeout = .1 @@ -75,3 +102,291 @@ def test_timer_cancellation(self): time.sleep(.2) # Assert that the cancellation was honored self.assertFalse(callback.was_invoked()) + + +@unittest.skipIf(is_monkey_patched(), 'runtime is monkey patched for another reactor') +@unittest.skipIf(connection_class is not AsyncioConnection, + 'not running asyncio tests; current connection_class is {}'.format(connection_class)) +@unittest.skipUnless(ASYNCIO_AVAILABLE, "asyncio is not available for this runtime") +class AsyncioConnectionTest(unittest.TestCase): + """ + Tests for AsyncioConnection covering write, read, close, and error + handling at the reactor level. Unlike the ReactorTestMixin used by + asyncore/libev, these tests exercise the public interface (push/close) + because handle_read/handle_write are async coroutines running inside + the event loop thread. + """ + + @classmethod + def setUpClass(cls): + if skip_me: + return + # Force a fresh reactor so we aren't affected by a previous test + # class that may have stopped the shared event loop. + if AsyncioConnection._loop is not None: + try: + AsyncioConnection._loop.call_soon_threadsafe( + AsyncioConnection._loop.stop) + except RuntimeError: + pass + if AsyncioConnection._loop_thread: + AsyncioConnection._loop_thread.join(timeout=1.0) + AsyncioConnection._loop = None + AsyncioConnection._loop_thread = None + AsyncioConnection.initialize_reactor() + cls._loop = AsyncioConnection._loop + # Save original loop methods so we can restore after each test + cls._orig_sock_recv = cls._loop.sock_recv + cls._orig_sock_sendall = cls._loop.sock_sendall + + @classmethod + def tearDownClass(cls): + if skip_me: + return + cls._loop.sock_recv = cls._orig_sock_recv + cls._loop.sock_sendall = cls._orig_sock_sendall + + def _make_connection(self): + """ + Create an AsyncioConnection with mocked socket and _connect_socket. + Loop socket methods are pre-mocked so that the handle_read/handle_write + coroutines started in __init__ don't hit real I/O. + """ + mock_socket = MagicMock(spec=stdlib_socket.socket) + mock_socket.fileno.return_value = 99 + mock_socket.setblocking = MagicMock() + mock_socket.connect.return_value = None + mock_socket.getsockopt.return_value = 0 + mock_socket.send.side_effect = lambda x: len(x) + + def fake_connect_socket(self_inner): + self_inner._socket = mock_socket + + with patch.object(AsyncioConnection, '_connect_socket', fake_connect_socket): + conn = AsyncioConnection( + host='127.0.0.1', + cql_version='3.0.1', + connect_timeout=5, + ) + return conn + + def setUp(self): + if skip_me: + return + + loop = self._loop + + # Pre-mock sock_recv to block indefinitely (read loop won't spin) + self._recv_unblock = threading.Event() + + async def blocking_recv(sock, bufsize): + while not self._recv_unblock.is_set(): + await asyncio.sleep(0.01) + raise asyncio.CancelledError() + + # Pre-mock sock_sendall to silently consume data (options message, etc.) + self._sent_data = [] + + async def capturing_sendall(sock, data): + self._sent_data.append(bytes(data)) + + loop.sock_recv = blocking_recv + loop.sock_sendall = capturing_sendall + + self.conn = self._make_connection() + # Give the loop a moment to process __init__ tasks (options message) + time.sleep(0.1) + # Clear any data sent during init (options message) + self._sent_data.clear() + + def tearDown(self): + if skip_me: + return + # Unblock the recv so the read loop can exit + self._recv_unblock.set() + try: + self.conn.close() + except Exception: + pass + time.sleep(0.05) + # Restore default mocks for next test + self._loop.sock_recv = self._orig_sock_recv + self._loop.sock_sendall = self._orig_sock_sendall + + def test_push_sends_data(self): + """ + Verify that push() enqueues data and the write loop sends it + via sock_sendall on the event loop. + """ + test_data = b'hello world' + self.conn.push(test_data) + + # Wait for the event loop to drain the write queue + time.sleep(0.2) + + self.assertTrue(len(self._sent_data) > 0) + self.assertEqual(b''.join(self._sent_data), test_data) + + def test_push_chunking(self): + """ + Verify that data larger than out_buffer_size is chunked + into multiple pieces before being sent. + """ + buf_size = self.conn.out_buffer_size + # Send data that is 2.5x the buffer size + test_data = b'x' * int(buf_size * 2.5) + self.conn.push(test_data) + + time.sleep(0.2) + + # Should have been broken into at least 3 chunks + self.assertGreaterEqual(len(self._sent_data), 3) + self.assertEqual(b''.join(self._sent_data), test_data) + + def test_write_error_defuncts_connection(self): + """ + Verify that a socket error during write causes the + connection to become defunct. + """ + loop = self._loop + + async def error_sendall(sock, data): + raise stdlib_socket.error(32, "Broken pipe") + + loop.sock_sendall = error_sendall + + self.conn.push(b'trigger error') + time.sleep(0.2) + + self.assertTrue(self.conn.is_defunct) + self.assertIsInstance(self.conn.last_error, stdlib_socket.error) + + def test_read_eof_closes_connection(self): + """ + Verify that receiving an empty buffer (EOF / server close) + causes the connection to close. + """ + loop = self._loop + + # Cancel the existing read watcher so we can start a new one + if self.conn._read_watcher: + self.conn._read_watcher.cancel() + time.sleep(0.05) + + call_count = 0 + async def eof_recv(sock, bufsize): + nonlocal call_count + call_count += 1 + if call_count == 1: + return b'' # EOF + raise asyncio.CancelledError() + + loop.sock_recv = eof_recv + + self.conn._read_watcher = asyncio.run_coroutine_threadsafe( + self.conn.handle_read(), loop=loop + ) + + time.sleep(0.2) + self.assertTrue(self.conn.is_closed) + + def test_read_error_defuncts_connection(self): + """ + Verify that a socket error during read causes the + connection to become defunct. + """ + loop = self._loop + + if self.conn._read_watcher: + self.conn._read_watcher.cancel() + time.sleep(0.05) + + async def error_recv(sock, bufsize): + raise stdlib_socket.error(104, "Connection reset by peer") + + loop.sock_recv = error_recv + + self.conn._read_watcher = asyncio.run_coroutine_threadsafe( + self.conn.handle_read(), loop=loop + ) + + time.sleep(0.2) + self.assertTrue(self.conn.is_defunct) + self.assertIsInstance(self.conn.last_error, stdlib_socket.error) + + def test_read_processes_data(self): + """ + Verify that data received via sock_recv is written to the + IO buffer and process_io_buffer is called. + """ + loop = self._loop + + if self.conn._read_watcher: + self.conn._read_watcher.cancel() + time.sleep(0.05) + + call_count = 0 + async def data_then_eof_recv(sock, bufsize): + nonlocal call_count + call_count += 1 + if call_count == 1: + return b'some data from server' + return b'' + + loop.sock_recv = data_then_eof_recv + + with patch.object(self.conn, 'process_io_buffer') as mock_process: + self.conn._read_watcher = asyncio.run_coroutine_threadsafe( + self.conn.handle_read(), loop=loop + ) + time.sleep(0.2) + mock_process.assert_called() + + def test_close_cancels_watchers(self): + """ + Verify that closing the connection cancels both the + read and write watchers. + """ + read_watcher = self.conn._read_watcher + write_watcher = self.conn._write_watcher + + self.conn.close() + time.sleep(0.2) + + self.assertTrue(self.conn.is_closed) + # The watchers should have been cancelled + if read_watcher: + self.assertTrue(read_watcher.cancelled() or read_watcher.done()) + if write_watcher: + self.assertTrue(write_watcher.cancelled() or write_watcher.done()) + + +@unittest.skipIf(is_monkey_patched(), 'runtime is monkey patched for another reactor') +@unittest.skipUnless(ASYNCIO_AVAILABLE, "asyncio is not available for this runtime") +class AsyncioForkTest(unittest.TestCase): + """ + Test that handle_fork() properly resets reactor state. + """ + + def test_handle_fork_resets_state(self): + """ + Verify handle_fork() clears loop, thread, and updates pid. + """ + AsyncioConnection.initialize_reactor() + self.assertIsNotNone(AsyncioConnection._loop) + self.assertIsNotNone(AsyncioConnection._loop_thread) + + old_loop = AsyncioConnection._loop + old_thread = AsyncioConnection._loop_thread + + AsyncioConnection.handle_fork() + + self.assertIsNone(AsyncioConnection._loop) + self.assertIsNone(AsyncioConnection._loop_thread) + + # Re-initialize for other tests + AsyncioConnection.initialize_reactor() + self.assertIsNotNone(AsyncioConnection._loop) + self.assertIsNotNone(AsyncioConnection._loop_thread) + # Should be a new loop and thread + self.assertIsNot(AsyncioConnection._loop, old_loop) diff --git a/tox.ini b/tox.ini index e77835f0da..6aa24abed6 100644 --- a/tox.ini +++ b/tox.ini @@ -44,3 +44,14 @@ setenv = LIBEV_EMBED=0 changedir = {envtmpdir} commands = pytest -v {toxinidir}/tests/unit/io/test_eventletreactor.py + + +[testenv:asyncio_loop] +deps = {[base]deps} + +setenv = LIBEV_EMBED=0 + CARES_EMBED=0 + EVENT_LOOP_MANAGER=asyncio +changedir = {envtmpdir} +commands = + pytest -v {toxinidir}/tests/unit/io/test_asyncioreactor.py