feat: reduce overhead to send responses (#1135) · python-zeroconf/python-zeroconf@c4077dd

@@ -28,7 +28,7 @@

2828

import sys

2929

import threading

3030

from types import TracebackType # noqa # used in type hints

31-

from typing import Awaitable, Dict, List, Optional, Tuple, Type, Union, cast

31+

from typing import Any, Awaitable, Dict, List, Optional, Tuple, Type, Union, cast

32323333

from ._cache import DNSCache

3434

from ._dns import DNSQuestion, DNSQuestionType

105105

_REGISTER_BROADCASTS = 3

106106107107108+

class _WrappedTransport:

109+

"""A wrapper for transports."""

110+111+

__slots__ = (

112+

'transport',

113+

'is_ipv6',

114+

'sock',

115+

'fileno',

116+

'sock_name',

117+

)

118+119+

def __init__(

120+

self,

121+

transport: asyncio.DatagramTransport,

122+

is_ipv6: bool,

123+

sock: socket.socket,

124+

fileno: int,

125+

sock_name: Any,

126+

) -> None:

127+

"""Initialize the wrapped transport.

128+129+

These attributes are used when sending packets.

130+

"""

131+

self.transport = transport

132+

self.is_ipv6 = is_ipv6

133+

self.sock = sock

134+

self.fileno = fileno

135+

self.sock_name = sock_name

136+137+138+

def _make_wrapped_transport(transport: asyncio.DatagramTransport) -> _WrappedTransport:

139+

"""Make a wrapped transport."""

140+

sock: socket.socket = transport.get_extra_info('socket')

141+

return _WrappedTransport(

142+

transport=transport,

143+

is_ipv6=sock.family == socket.AF_INET6,

144+

sock=sock,

145+

fileno=sock.fileno(),

146+

sock_name=sock.getsockname(),

147+

)

148+149+108150

class AsyncEngine:

109151

"""An engine wraps sockets in the event loop."""

110152

@@ -117,8 +159,8 @@ def __init__(

117159

self.loop: Optional[asyncio.AbstractEventLoop] = None

118160

self.zc = zeroconf

119161

self.protocols: List[AsyncListener] = []

120-

self.readers: List[asyncio.DatagramTransport] = []

121-

self.senders: List[asyncio.DatagramTransport] = []

162+

self.readers: List[_WrappedTransport] = []

163+

self.senders: List[_WrappedTransport] = []

122164

self.running_event: Optional[asyncio.Event] = None

123165

self._listen_socket = listen_socket

124166

self._respond_sockets = respond_sockets

@@ -158,9 +200,9 @@ async def _async_create_endpoints(self) -> None:

158200

for s in reader_sockets:

159201

transport, protocol = await loop.create_datagram_endpoint(lambda: AsyncListener(self.zc), sock=s)

160202

self.protocols.append(cast(AsyncListener, protocol))

161-

self.readers.append(cast(asyncio.DatagramTransport, transport))

203+

self.readers.append(_make_wrapped_transport(cast(asyncio.DatagramTransport, transport)))

162204

if s in sender_sockets:

163-

self.senders.append(cast(asyncio.DatagramTransport, transport))

205+

self.senders.append(_make_wrapped_transport(cast(asyncio.DatagramTransport, transport)))

164206165207

def _async_cache_cleanup(self) -> None:

166208

"""Periodic cache cleanup."""

@@ -186,8 +228,8 @@ def _async_shutdown(self) -> None:

186228

"""Shutdown transports and sockets."""

187229

assert self.running_event is not None

188230

self.running_event.clear()

189-

for transport in itertools.chain(self.senders, self.readers):

190-

transport.close()

231+

for wrapped_transport in itertools.chain(self.senders, self.readers):

232+

wrapped_transport.transport.close()

191233192234

def close(self) -> None:

193235

"""Close from sync context.

@@ -221,7 +263,7 @@ def __init__(self, zc: 'Zeroconf') -> None:

221263

self.zc = zc

222264

self.data: Optional[bytes] = None

223265

self.last_time: float = 0

224-

self.transport: Optional[asyncio.DatagramTransport] = None

266+

self.transport: Optional[_WrappedTransport] = None

225267

self.sock_description: Optional[str] = None

226268

self._deferred: Dict[str, List[DNSIncoming]] = {}

227269

self._timers: Dict[str, asyncio.TimerHandle] = {}

@@ -309,7 +351,7 @@ def handle_query_or_defer(

309351

msg: DNSIncoming,

310352

addr: str,

311353

port: int,

312-

transport: asyncio.DatagramTransport,

354+

transport: _WrappedTransport,

313355

v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),

314356

) -> None:

315357

"""Deal with incoming query packets. Provides a response if

@@ -341,7 +383,7 @@ def _respond_query(

341383

msg: Optional[DNSIncoming],

342384

addr: str,

343385

port: int,

344-

transport: asyncio.DatagramTransport,

386+

transport: _WrappedTransport,

345387

v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),

346388

) -> None:

347389

"""Respond to a query and reassemble any truncated deferred packets."""

@@ -362,27 +404,25 @@ def error_received(self, exc: Exception) -> None:

362404

self.log_exception_once(exc, msg_str, exc)

363405364406

def connection_made(self, transport: asyncio.BaseTransport) -> None:

365-

self.transport = cast(asyncio.DatagramTransport, transport)

366-

sock_name = self.transport.get_extra_info('sockname')

367-

sock_fileno = self.transport.get_extra_info('socket').fileno()

368-

self.sock_description = f"{sock_fileno} ({sock_name})"

407+

wrapped_transport = _make_wrapped_transport(cast(asyncio.DatagramTransport, transport))

408+

self.transport = wrapped_transport

409+

self.sock_description = f"{wrapped_transport.fileno} ({wrapped_transport.sock_name})"

369410370411

def connection_lost(self, exc: Optional[Exception]) -> None:

371412

"""Handle connection lost."""

372413373414374415

def async_send_with_transport(

375416

log_debug: bool,

376-

transport: asyncio.DatagramTransport,

417+

transport: _WrappedTransport,

377418

packet: bytes,

378419

packet_num: int,

379420

out: DNSOutgoing,

380421

addr: Optional[str],

381422

port: int,

382423

v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),

383424

) -> None:

384-

s = transport.get_extra_info('socket')

385-

ipv6_socket = s.family == socket.AF_INET6

425+

ipv6_socket = transport.is_ipv6

386426

if addr is None:

387427

real_addr = _MDNS_ADDR6 if ipv6_socket else _MDNS_ADDR

388428

else:

@@ -394,8 +434,8 @@ def async_send_with_transport(

394434

'Sending to (%s, %d) via [socket %s (%s)] (%d bytes #%d) %r as %r...',

395435

real_addr,

396436

port or _MDNS_PORT,

397-

s.fileno(),

398-

transport.get_extra_info('sockname'),

437+

transport.fileno,

438+

transport.sock_name,

399439

len(packet),

400440

packet_num + 1,

401441

out,

@@ -404,9 +444,9 @@ def async_send_with_transport(

404444

# Get flowinfo and scopeid for the IPV6 socket to create a complete IPv6

405445

# address tuple: https://docs.python.org/3.6/library/socket.html#socket-families

406446

if ipv6_socket and not v6_flow_scope:

407-

_, _, sock_flowinfo, sock_scopeid = s.getsockname()

447+

_, _, sock_flowinfo, sock_scopeid = transport.sock_name

408448

v6_flow_scope = (sock_flowinfo, sock_scopeid)

409-

transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope))

449+

transport.transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope))

410450411451412452

class Zeroconf(QuietLogger):

@@ -832,7 +872,7 @@ def handle_assembled_query(

832872

packets: List[DNSIncoming],

833873

addr: str,

834874

port: int,

835-

transport: asyncio.DatagramTransport,

875+

transport: _WrappedTransport,

836876

v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),

837877

) -> None:

838878

"""Respond to a (re)assembled query.

@@ -870,7 +910,7 @@ def send(

870910

addr: Optional[str] = None,

871911

port: int = _MDNS_PORT,

872912

v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),

873-

transport: Optional[asyncio.DatagramTransport] = None,

913+

transport: Optional[_WrappedTransport] = None,

874914

) -> None:

875915

"""Sends an outgoing packet threadsafe."""

876916

assert self.loop is not None

@@ -882,7 +922,7 @@ def async_send(

882922

addr: Optional[str] = None,

883923

port: int = _MDNS_PORT,

884924

v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),

885-

transport: Optional[asyncio.DatagramTransport] = None,

925+

transport: Optional[_WrappedTransport] = None,

886926

) -> None:

887927

"""Sends an outgoing packet."""

888928

if self.done: