1+ import errno
12import threading
23
34from cassandra .connection import Connection , ConnectionShutdown
1213
1314log = logging .getLogger (__name__ )
1415
16+ # Errno values that indicate the remote peer has disconnected.
17+ _PEER_DISCONNECT_ERRNOS = frozenset ((
18+ errno .ENOTCONN , errno .ESHUTDOWN ,
19+ errno .ECONNRESET , errno .ECONNABORTED ,
20+ errno .EBADF ,
21+ ))
22+
23+ # Windows winerror codes for the same conditions:
24+ # 10053 = WSAECONNABORTED, 10054 = WSAECONNRESET
25+ _PEER_DISCONNECT_WINERRORS = frozenset ((10053 , 10054 ))
26+
27+
28+ def _is_peer_disconnect (err ):
29+ """Return True if *err* indicates the remote peer closed the connection."""
30+ return (isinstance (err , ConnectionError )
31+ or getattr (err , 'winerror' , None ) in _PEER_DISCONNECT_WINERRORS
32+ or getattr (err , 'errno' , None ) in _PEER_DISCONNECT_ERRNOS )
33+
1534
1635# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
1736# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
@@ -140,8 +159,7 @@ def close(self):
140159 return
141160 self .is_closed = True
142161
143- # close from the loop thread to avoid races when removing file
144- # descriptors
162+ # Schedule async cleanup (cancel watchers, error pending requests)
145163 asyncio .run_coroutine_threadsafe (
146164 self ._close (), loop = self ._loop
147165 )
@@ -153,11 +171,46 @@ async def _close(self):
153171 if self ._read_watcher :
154172 self ._read_watcher .cancel ()
155173 if self ._socket :
156- self ._loop .remove_writer (self ._socket .fileno ())
157- self ._loop .remove_reader (self ._socket .fileno ())
158- self ._socket .close ()
159-
160- log .debug ("Closed socket to %s" % (self .endpoint ,))
174+ fd = self ._socket .fileno ()
175+ if fd >= 0 :
176+ try :
177+ self ._loop .remove_writer (fd )
178+ except NotImplementedError :
179+ # NotImplementedError: remove_reader/remove_writer are not
180+ # supported on Windows ProactorEventLoop (default since
181+ # Python 3.10). ProactorEventLoop uses completion-based
182+ # IOCP, which has no concept of "watching a fd for
183+ # readiness" to remove.
184+ pass
185+ except Exception :
186+ # It is not critical if it fails, driver can keep working,
187+ # but it should not be happening, so logged as error
188+ log .error ("Unexpected error removing writer for %s" ,
189+ self .endpoint , exc_info = True )
190+ try :
191+ self ._loop .remove_reader (fd )
192+ except NotImplementedError :
193+ # NotImplementedError: remove_reader/remove_writer are not
194+ # supported on Windows ProactorEventLoop (default since
195+ # Python 3.10). ProactorEventLoop uses completion-based
196+ # IOCP, which has no concept of "watching a fd for
197+ # readiness" to remove.
198+ pass
199+ except Exception :
200+ # It is not critical if it fails, driver can keep working,
201+ # but it should not be happening, so logged as error
202+ log .error ("Unexpected error removing reader for %s" ,
203+ self .endpoint , exc_info = True )
204+
205+ try :
206+ self ._socket .close ()
207+ except OSError :
208+ # Ignore if socket is already closed
209+ pass
210+ except Exception :
211+ log .debug ("Unexpected error closing socket to %s" ,
212+ self .endpoint , exc_info = True )
213+ log .debug ("Closed socket to %s" % (self .endpoint ,))
161214
162215 if not self .is_defunct :
163216 msg = "Connection to %s was closed" % self .endpoint
@@ -168,6 +221,9 @@ async def _close(self):
168221 self .connected_event .set ()
169222
170223 def push (self , data ):
224+ if self .is_closed or self .is_defunct :
225+ raise ConnectionShutdown (
226+ "Connection to %s is already closed" % self .endpoint )
171227 buff_size = self .out_buffer_size
172228 if len (data ) > buff_size :
173229 chunks = []
@@ -196,43 +252,61 @@ async def _push_msg(self, chunks):
196252
197253
198254 async def handle_write (self ):
199- while True :
200- try :
255+ exc = None
256+ try :
257+ while True :
201258 next_msg = await self ._write_queue .get ()
202259 if next_msg :
203260 await self ._loop .sock_sendall (self ._socket , next_msg )
204- except socket .error as err :
261+ except asyncio .CancelledError :
262+ pass
263+ except Exception as err :
264+ if _is_peer_disconnect (err ):
265+ log .debug ("Connection %s closed by peer during write: %s" ,
266+ self , err )
267+ else :
268+ exc = err
205269 log .debug ("Exception in send for %s: %s" , self , err )
206- self .defunct (err )
207- return
208- except asyncio .CancelledError :
209- return
270+ finally :
271+ self .defunct (exc or ConnectionShutdown (
272+ "Connection to %s was closed" % self .endpoint ))
210273
211274 async def handle_read (self ):
212- while True :
213- try :
214- buf = await self ._loop .sock_recv (self ._socket , self .in_buffer_size )
215- self ._iobuf .write (buf )
216- # sock_recv expects EWOULDBLOCK if socket provides no data, but
217- # nonblocking ssl sockets raise these instead, so we handle them
218- # ourselves by yielding to the event loop, where the socket will
219- # get the reading/writing it "wants" before retrying
220- except (ssl .SSLWantWriteError , ssl .SSLWantReadError ):
221- # Apparently the preferred way to yield to the event loop from within
222- # a native coroutine based on https://github.com/python/asyncio/issues/284
223- await asyncio .sleep (0 )
224- continue
225- except socket .error as err :
226- log .debug ("Exception during socket recv for %s: %s" ,
275+ exc = None
276+ try :
277+ while True :
278+ try :
279+ buf = await self ._loop .sock_recv (self ._socket , self .in_buffer_size )
280+ self ._iobuf .write (buf )
281+ # sock_recv expects EWOULDBLOCK if socket provides no data, but
282+ # nonblocking ssl sockets raise these instead, so we handle them
283+ # ourselves by yielding to the event loop, where the socket will
284+ # get the reading/writing it "wants" before retrying
285+ except (ssl .SSLWantWriteError , ssl .SSLWantReadError ):
286+ # Apparently the preferred way to yield to the event loop from within
287+ # a native coroutine based on https://github.com/python/asyncio/issues/284
288+ await asyncio .sleep (0 )
289+ continue
290+
291+ if buf and self ._iobuf .tell ():
292+ self .process_io_buffer ()
293+ else :
294+ log .debug ("Connection %s closed by server" , self )
295+ exc = ConnectionShutdown (
296+ "Connection to %s was closed by server" % self .endpoint )
297+ return
298+ except asyncio .CancelledError :
299+ # Task cancellation is treated as a normal connection shutdown;
300+ # cleanup and marking the connection as defunct are handled in finally.
301+ pass
302+ except Exception as err :
303+ if _is_peer_disconnect (err ):
304+ log .debug ("Connection %s closed by peer during read: %s" ,
227305 self , err )
228- self .defunct (err )
229- return # leave the read loop
230- except asyncio .CancelledError :
231- return
232-
233- if buf and self ._iobuf .tell ():
234- self .process_io_buffer ()
235306 else :
236- log .debug ("Connection %s closed by server" , self )
237- self .close ()
238- return
307+ exc = err
308+ log .debug ("Exception during socket recv for %s: %s" ,
309+ self , err )
310+ finally :
311+ self .defunct (exc or ConnectionShutdown (
312+ "Connection to %s was closed" % self .endpoint ))
0 commit comments