[3.7] bpo-32622: Native sendfile on windows (GH-5565) by miss-islington · Pull Request #5890 · python/cpython

Expand Up @@ -6,11 +6,14 @@
__all__ = 'BaseProactorEventLoop',
import io import os import socket import warnings
from . import base_events from . import constants from . import events from . import futures from . import protocols from . import sslproto Expand Down Expand Up @@ -107,6 +110,11 @@ def _fatal_error(self, exc, message='Fatal error on pipe transport'): self._force_close(exc)
def _force_close(self, exc): if self._empty_waiter is not None: if exc is None: self._empty_waiter.set_result(None) else: self._empty_waiter.set_exception(exc) if self._closing: return self._closing = True Expand Down Expand Up @@ -327,13 +335,19 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
_start_tls_compatible = True
def __init__(self, *args, **kw): super().__init__(*args, **kw) self._empty_waiter = None
def write(self, data): if not isinstance(data, (bytes, bytearray, memoryview)): raise TypeError( f"data argument must be a bytes-like object, " f"not {type(data).__name__}") if self._eof_written: raise RuntimeError('write_eof() already called') if self._empty_waiter is not None: raise RuntimeError('unable to write; sendfile is in progress')
if not data: return Expand Down Expand Up @@ -393,6 +407,8 @@ def _loop_writing(self, f=None, data=None): self._maybe_pause_protocol() else: self._write_fut.add_done_callback(self._loop_writing) if self._empty_waiter is not None and self._write_fut is None: self._empty_waiter.set_result(None) except ConnectionResetError as exc: self._force_close(exc) except OSError as exc: Expand All @@ -407,6 +423,17 @@ def write_eof(self): def abort(self): self._force_close(None)
def _make_empty_waiter(self): if self._empty_waiter is not None: raise RuntimeError("Empty waiter is already set") self._empty_waiter = self._loop.create_future() if self._write_fut is None: self._empty_waiter.set_result(None) return self._empty_waiter
def _reset_empty_waiter(self): self._empty_waiter = None

class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport): def __init__(self, *args, **kw): Expand Down Expand Up @@ -447,7 +474,7 @@ class _ProactorSocketTransport(_ProactorReadPipeTransport, transports.Transport): """Transport for connected sockets."""
_sendfile_compatible = constants._SendfileMode.FALLBACK _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
def _set_extra(self, sock): self._extra['socket'] = sock Expand Down Expand Up @@ -556,6 +583,47 @@ async def sock_connect(self, sock, address): async def sock_accept(self, sock): return await self._proactor.accept(sock)
async def _sock_sendfile_native(self, sock, file, offset, count): try: fileno = file.fileno() except (AttributeError, io.UnsupportedOperation) as err: raise events.SendfileNotAvailableError("not a regular file") try: fsize = os.fstat(fileno).st_size except OSError as err: raise events.SendfileNotAvailableError("not a regular file") blocksize = count if count else fsize if not blocksize: return 0 # empty file
blocksize = min(blocksize, 0xffff_ffff) end_pos = min(offset + count, fsize) if count else fsize offset = min(offset, fsize) total_sent = 0 try: while True: blocksize = min(end_pos - offset, blocksize) if blocksize <= 0: return total_sent await self._proactor.sendfile(sock, file, offset, blocksize) offset += blocksize total_sent += blocksize finally: if total_sent > 0: file.seek(offset)
async def _sendfile_native(self, transp, file, offset, count): resume_reading = transp.is_reading() transp.pause_reading() await transp._make_empty_waiter() try: return await self.sock_sendfile(transp._sock, file, offset, count, fallback=False) finally: transp._reset_empty_waiter() if resume_reading: transp.resume_reading()
def _close_self_pipe(self): if self._self_reading_future is not None: self._self_reading_future.cancel() Expand Down