bpo-30064: Refactor sock_* asyncio API by asvetlov · Pull Request #10419 · python/cpython
Expand Up
@@ -358,26 +358,29 @@ async def sock_recv(self, sock, n):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
try:
return sock.recv(n)
except (BlockingIOError, InterruptedError):
pass
fut = self.create_future()
self._sock_recv(fut, None, sock, n)
fd = sock.fileno()
self.add_reader(fd, self._sock_recv, fut, sock, n)
fut.add_done_callback(
functools.partial(self._sock_read_done, fd))
return await fut
def _sock_recv(self, fut, registered_fd, sock, n): def _sock_read_done(self, fd, fut): self.remove_reader(fd)
def _sock_recv(self, fut, sock, n): # _sock_recv() can add itself as an I/O callback if the operation can't # be done immediately. Don't use it directly, call sock_recv(). if registered_fd is not None: # Remove the callback early. It should be rare that the # selector says the fd is ready but the call still returns # EAGAIN, and I am willing to take a hit in that case in # order to simplify the common case. self.remove_reader(registered_fd) if fut.cancelled(): if fut.done(): return try: data = sock.recv(n) except (BlockingIOError, InterruptedError): fd = sock.fileno() self.add_reader(fd, self._sock_recv, fut, fd, sock, n) return # try again next time except Exception as exc: fut.set_exception(exc) else: Expand All @@ -391,27 +394,27 @@ async def sock_recv_into(self, sock, buf): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") try: return sock.recv_into(buf) except (BlockingIOError, InterruptedError): pass fut = self.create_future() self._sock_recv_into(fut, None, sock, buf) fd = sock.fileno() self.add_reader(fd, self._sock_recv_into, fut, sock, buf) fut.add_done_callback( functools.partial(self._sock_read_done, fd)) return await fut
def _sock_recv_into(self, fut, registered_fd, sock, buf): def _sock_recv_into(self, fut, sock, buf): # _sock_recv_into() can add itself as an I/O callback if the operation # can't be done immediately. Don't use it directly, call # sock_recv_into(). if registered_fd is not None: # Remove the callback early. It should be rare that the # selector says the FD is ready but the call still returns # EAGAIN, and I am willing to take a hit in that case in # order to simplify the common case. self.remove_reader(registered_fd) if fut.cancelled(): if fut.done(): return try: nbytes = sock.recv_into(buf) except (BlockingIOError, InterruptedError): fd = sock.fileno() self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf) return # try again next time except Exception as exc: fut.set_exception(exc) else: Expand All @@ -428,34 +431,40 @@ async def sock_sendall(self, sock, data): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") fut = self.create_future() if data: self._sock_sendall(fut, None, sock, data) try: n = sock.send(data) except (BlockingIOError, InterruptedError): n = 0
if n == len(data): # all data sent return else: fut.set_result(None) data = bytearray(memoryview(data)[n:])
fut = self.create_future() fd = sock.fileno() fut.add_done_callback( functools.partial(self._sock_write_done, fd)) self.add_writer(fd, self._sock_sendall, fut, sock, data) return await fut
def _sock_sendall(self, fut, registered_fd, sock, data): if registered_fd is not None: self.remove_writer(registered_fd) if fut.cancelled(): def _sock_sendall(self, fut, sock, data): if fut.done(): # Future cancellation can be scheduled on previous loop iteration return
try: n = sock.send(data) except (BlockingIOError, InterruptedError): n = 0 return except Exception as exc: fut.set_exception(exc) return
if n == len(data): fut.set_result(None) else: if n: data = data[n:] fd = sock.fileno() self.add_writer(fd, self._sock_sendall, fut, fd, sock, data) del data[:n]
async def sock_connect(self, sock, address): """Connect to a remote socket at address. Expand Down Expand Up @@ -484,18 +493,18 @@ def _sock_connect(self, fut, sock, address): # becomes writable to be notified when the connection succeed or # fails. fut.add_done_callback( functools.partial(self._sock_connect_done, fd)) functools.partial(self._sock_write_done, fd)) self.add_writer(fd, self._sock_connect_cb, fut, sock, address) except Exception as exc: fut.set_exception(exc) else: fut.set_result(None)
def _sock_connect_done(self, fd, fut): def _sock_write_done(self, fd, fut): self.remove_writer(fd)
def _sock_connect_cb(self, fut, sock, address): if fut.cancelled(): if fut.done(): return
try: Expand Down Expand Up @@ -529,7 +538,7 @@ def _sock_accept(self, fut, registered, sock): fd = sock.fileno() if registered: self.remove_reader(fd) if fut.cancelled(): if fut.done(): return try: conn, address = sock.accept() Expand Down
def _sock_recv(self, fut, registered_fd, sock, n): def _sock_read_done(self, fd, fut): self.remove_reader(fd)
def _sock_recv(self, fut, sock, n): # _sock_recv() can add itself as an I/O callback if the operation can't # be done immediately. Don't use it directly, call sock_recv(). if registered_fd is not None: # Remove the callback early. It should be rare that the # selector says the fd is ready but the call still returns # EAGAIN, and I am willing to take a hit in that case in # order to simplify the common case. self.remove_reader(registered_fd) if fut.cancelled(): if fut.done(): return try: data = sock.recv(n) except (BlockingIOError, InterruptedError): fd = sock.fileno() self.add_reader(fd, self._sock_recv, fut, fd, sock, n) return # try again next time except Exception as exc: fut.set_exception(exc) else: Expand All @@ -391,27 +394,27 @@ async def sock_recv_into(self, sock, buf): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") try: return sock.recv_into(buf) except (BlockingIOError, InterruptedError): pass fut = self.create_future() self._sock_recv_into(fut, None, sock, buf) fd = sock.fileno() self.add_reader(fd, self._sock_recv_into, fut, sock, buf) fut.add_done_callback( functools.partial(self._sock_read_done, fd)) return await fut
def _sock_recv_into(self, fut, registered_fd, sock, buf): def _sock_recv_into(self, fut, sock, buf): # _sock_recv_into() can add itself as an I/O callback if the operation # can't be done immediately. Don't use it directly, call # sock_recv_into(). if registered_fd is not None: # Remove the callback early. It should be rare that the # selector says the FD is ready but the call still returns # EAGAIN, and I am willing to take a hit in that case in # order to simplify the common case. self.remove_reader(registered_fd) if fut.cancelled(): if fut.done(): return try: nbytes = sock.recv_into(buf) except (BlockingIOError, InterruptedError): fd = sock.fileno() self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf) return # try again next time except Exception as exc: fut.set_exception(exc) else: Expand All @@ -428,34 +431,40 @@ async def sock_sendall(self, sock, data): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") fut = self.create_future() if data: self._sock_sendall(fut, None, sock, data) try: n = sock.send(data) except (BlockingIOError, InterruptedError): n = 0
if n == len(data): # all data sent return else: fut.set_result(None) data = bytearray(memoryview(data)[n:])
fut = self.create_future() fd = sock.fileno() fut.add_done_callback( functools.partial(self._sock_write_done, fd)) self.add_writer(fd, self._sock_sendall, fut, sock, data) return await fut
def _sock_sendall(self, fut, registered_fd, sock, data): if registered_fd is not None: self.remove_writer(registered_fd) if fut.cancelled(): def _sock_sendall(self, fut, sock, data): if fut.done(): # Future cancellation can be scheduled on previous loop iteration return
try: n = sock.send(data) except (BlockingIOError, InterruptedError): n = 0 return except Exception as exc: fut.set_exception(exc) return
if n == len(data): fut.set_result(None) else: if n: data = data[n:] fd = sock.fileno() self.add_writer(fd, self._sock_sendall, fut, fd, sock, data) del data[:n]
async def sock_connect(self, sock, address): """Connect to a remote socket at address. Expand Down Expand Up @@ -484,18 +493,18 @@ def _sock_connect(self, fut, sock, address): # becomes writable to be notified when the connection succeed or # fails. fut.add_done_callback( functools.partial(self._sock_connect_done, fd)) functools.partial(self._sock_write_done, fd)) self.add_writer(fd, self._sock_connect_cb, fut, sock, address) except Exception as exc: fut.set_exception(exc) else: fut.set_result(None)
def _sock_connect_done(self, fd, fut): def _sock_write_done(self, fd, fut): self.remove_writer(fd)
def _sock_connect_cb(self, fut, sock, address): if fut.cancelled(): if fut.done(): return
try: Expand Down Expand Up @@ -529,7 +538,7 @@ def _sock_accept(self, fut, registered, sock): fd = sock.fileno() if registered: self.remove_reader(fd) if fut.cancelled(): if fut.done(): return try: conn, address = sock.accept() Expand Down