Fix callbacks race in SelectorLoop.sock_connect. by 1st1 · Pull Request #366 · python/asyncio
While testing uvloop on recent CPython 3.5.2 I found a regression in loop.sock_connect, introduced in ed17848.
The bug breaks loop.sock_* in a very serious way, making programs that use those methods prone to random hangs after socket is connected.
How to trigger
Let's imagine we have a server, that sends some data (let's say b'hello') to the client immediately after connect. And the client program is the following:
data = await self.recv_all(sock, 5) assert data == b'hello' await self.loop.sock_sendall(sock, PAYLOAD)
If the PAYLOAD is big enough, the client program will hang forever.
Explanation
The cause of the hang is a race between callbacks -- one related to loop.sock_connect and one to sock_sendall.
Here's the relevant piece of code from selector_events.py:
def sock_connect(self, sock, address): """Connect to a remote socket at address. This method is a coroutine. """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") fut = self.create_future() if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX: self._sock_connect(fut, sock, address) else: resolved = base_events._ensure_resolved( address, family=sock.family, proto=sock.proto, loop=self) resolved.add_done_callback( lambda resolved: self._on_resolved(fut, sock, resolved)) return fut def _on_resolved(self, fut, sock, resolved): try: _, _, _, _, address = resolved.result()[0] except Exception as exc: fut.set_exception(exc) else: self._sock_connect(fut, sock, address) def _sock_connect(self, fut, sock, address): fd = sock.fileno() try: sock.connect(address) except (BlockingIOError, InterruptedError): # Issue #23618: When the C function connect() fails with EINTR, the # connection runs in background. We have to wait until the socket # becomes writable to be notified when the connection succeed or # fails. fut.add_done_callback(functools.partial(self._sock_connect_done, fd)) self.add_writer(fd, self._sock_connect_cb, fut, sock, address) except Exception as exc: fut.set_exception(exc) else: fut.set_result(None) def _sock_connect_done(self, fd, fut): self.remove_writer(fd) def _sock_connect_cb(self, fut, sock, address): if fut.cancelled(): return try: err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: # Jump to any except clause below. raise OSError(err, 'Connect call failed %s' % (address,)) except (BlockingIOError, InterruptedError): # socket is still registered, the callback will be retried later pass except Exception as exc: fut.set_exception(exc) else: fut.set_result(None)
Before ed17848, sock_connect called _sock_connect directly:
sock_connectcreated afutFuture.- If the address wasn't already resolved it raised an error.
- If the address was resolved, it called
_sock_connect, which attached a callback to thefut--_sock_connect_done. sock_connectthen returnedfutto the caller.- If the caller is a coroutine, it's wrapped in
asyncio.Task. Therefore,futnow have two callbacks attached to it:[_sock_connect_done, Task._wakeup]
After that commit:
sock_connectcreates afutFuture.- Then calls
_ensure_resolved(linkedfutto the result of that call's Future). sock_connectreturnsfutto the caller.- If the caller is a coroutine, its
Taskwill add a callback to thefut, eventually resulting in this:[Task._wakeup, _sock_connect_done]
Therefore, after ed17848, _sock_connect_done can be called after await loop.sock_connect() line. If the program calls loop.sock_sendall after sock_connect, _sock_connect_done will remove writer callback that sock_sendall set up.