Fix callbacks race in SelectorLoop.sock_connect. by 1st1 · Pull Request #366 · python/asyncio

While testing uvloop on recent CPython 3.5.2 I found a regression in loop.sock_connect, introduced in ed17848.

The bug breaks loop.sock_* in a very serious way, making programs that use those methods prone to random hangs after socket is connected.

How to trigger

Let's imagine we have a server, that sends some data (let's say b'hello') to the client immediately after connect. And the client program is the following:

data = await self.recv_all(sock, 5)
assert data == b'hello'
await self.loop.sock_sendall(sock, PAYLOAD)

If the PAYLOAD is big enough, the client program will hang forever.

Explanation

The cause of the hang is a race between callbacks -- one related to loop.sock_connect and one to sock_sendall.

Here's the relevant piece of code from selector_events.py:

    def sock_connect(self, sock, address):
        """Connect to a remote socket at address.

        This method is a coroutine.
        """
        if self._debug and sock.gettimeout() != 0:
            raise ValueError("the socket must be non-blocking")

        fut = self.create_future()
        if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
            self._sock_connect(fut, sock, address)
        else:
            resolved = base_events._ensure_resolved(
                address, family=sock.family, proto=sock.proto, loop=self)
            resolved.add_done_callback(
                lambda resolved: self._on_resolved(fut, sock, resolved))

        return fut

    def _on_resolved(self, fut, sock, resolved):
        try:
            _, _, _, _, address = resolved.result()[0]
        except Exception as exc:
            fut.set_exception(exc)
        else:
            self._sock_connect(fut, sock, address)

    def _sock_connect(self, fut, sock, address):
        fd = sock.fileno()
        try:
            sock.connect(address)
        except (BlockingIOError, InterruptedError):
            # Issue #23618: When the C function connect() fails with EINTR, the
            # connection runs in background. We have to wait until the socket
            # becomes writable to be notified when the connection succeed or
            # fails.
            fut.add_done_callback(functools.partial(self._sock_connect_done,
                                                    fd))
            self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
        except Exception as exc:
            fut.set_exception(exc)
        else:
            fut.set_result(None)

    def _sock_connect_done(self, fd, fut):
        self.remove_writer(fd)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.cancelled():
            return

        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
                raise OSError(err, 'Connect call failed %s' % (address,))
        except (BlockingIOError, InterruptedError):
            # socket is still registered, the callback will be retried later
            pass
        except Exception as exc:
            fut.set_exception(exc)
        else:
            fut.set_result(None)

Before ed17848, sock_connect called _sock_connect directly:

  1. sock_connect created a fut Future.
  2. If the address wasn't already resolved it raised an error.
  3. If the address was resolved, it called _sock_connect, which attached a callback to the fut -- _sock_connect_done.
  4. sock_connect then returned fut to the caller.
  5. If the caller is a coroutine, it's wrapped in asyncio.Task. Therefore, fut now have two callbacks attached to it: [_sock_connect_done, Task._wakeup]

After that commit:

  1. sock_connect creates a fut Future.
  2. Then calls _ensure_resolved (linked fut to the result of that call's Future).
  3. sock_connect returns fut to the caller.
  4. If the caller is a coroutine, its Task will add a callback to the fut, eventually resulting in this: [Task._wakeup, _sock_connect_done]

Therefore, after ed17848, _sock_connect_done can be called after await loop.sock_connect() line. If the program calls loop.sock_sendall after sock_connect, _sock_connect_done will remove writer callback that sock_sendall set up.

/cc @gvanrossum @ajdavis @Haypo