feat: reduce overhead to send responses (#1135) · python-zeroconf/python-zeroconf@c4077dd
@@ -28,7 +28,7 @@
2828import sys
2929import threading
3030from types import TracebackType # noqa # used in type hints
31-from typing import Awaitable, Dict, List, Optional, Tuple, Type, Union, cast
31+from typing import Any, Awaitable, Dict, List, Optional, Tuple, Type, Union, cast
32323333from ._cache import DNSCache
3434from ._dns import DNSQuestion, DNSQuestionType
105105_REGISTER_BROADCASTS = 3
106106107107108+class _WrappedTransport:
109+"""A wrapper for transports."""
110+111+__slots__ = (
112+'transport',
113+'is_ipv6',
114+'sock',
115+'fileno',
116+'sock_name',
117+ )
118+119+def __init__(
120+self,
121+transport: asyncio.DatagramTransport,
122+is_ipv6: bool,
123+sock: socket.socket,
124+fileno: int,
125+sock_name: Any,
126+ ) -> None:
127+"""Initialize the wrapped transport.
128+129+ These attributes are used when sending packets.
130+ """
131+self.transport = transport
132+self.is_ipv6 = is_ipv6
133+self.sock = sock
134+self.fileno = fileno
135+self.sock_name = sock_name
136+137+138+def _make_wrapped_transport(transport: asyncio.DatagramTransport) -> _WrappedTransport:
139+"""Make a wrapped transport."""
140+sock: socket.socket = transport.get_extra_info('socket')
141+return _WrappedTransport(
142+transport=transport,
143+is_ipv6=sock.family == socket.AF_INET6,
144+sock=sock,
145+fileno=sock.fileno(),
146+sock_name=sock.getsockname(),
147+ )
148+149+108150class AsyncEngine:
109151"""An engine wraps sockets in the event loop."""
110152@@ -117,8 +159,8 @@ def __init__(
117159self.loop: Optional[asyncio.AbstractEventLoop] = None
118160self.zc = zeroconf
119161self.protocols: List[AsyncListener] = []
120-self.readers: List[asyncio.DatagramTransport] = []
121-self.senders: List[asyncio.DatagramTransport] = []
162+self.readers: List[_WrappedTransport] = []
163+self.senders: List[_WrappedTransport] = []
122164self.running_event: Optional[asyncio.Event] = None
123165self._listen_socket = listen_socket
124166self._respond_sockets = respond_sockets
@@ -158,9 +200,9 @@ async def _async_create_endpoints(self) -> None:
158200for s in reader_sockets:
159201transport, protocol = await loop.create_datagram_endpoint(lambda: AsyncListener(self.zc), sock=s)
160202self.protocols.append(cast(AsyncListener, protocol))
161-self.readers.append(cast(asyncio.DatagramTransport, transport))
203+self.readers.append(_make_wrapped_transport(cast(asyncio.DatagramTransport, transport)))
162204if s in sender_sockets:
163-self.senders.append(cast(asyncio.DatagramTransport, transport))
205+self.senders.append(_make_wrapped_transport(cast(asyncio.DatagramTransport, transport)))
164206165207def _async_cache_cleanup(self) -> None:
166208"""Periodic cache cleanup."""
@@ -186,8 +228,8 @@ def _async_shutdown(self) -> None:
186228"""Shutdown transports and sockets."""
187229assert self.running_event is not None
188230self.running_event.clear()
189-for transport in itertools.chain(self.senders, self.readers):
190-transport.close()
231+for wrapped_transport in itertools.chain(self.senders, self.readers):
232+wrapped_transport.transport.close()
191233192234def close(self) -> None:
193235"""Close from sync context.
@@ -221,7 +263,7 @@ def __init__(self, zc: 'Zeroconf') -> None:
221263self.zc = zc
222264self.data: Optional[bytes] = None
223265self.last_time: float = 0
224-self.transport: Optional[asyncio.DatagramTransport] = None
266+self.transport: Optional[_WrappedTransport] = None
225267self.sock_description: Optional[str] = None
226268self._deferred: Dict[str, List[DNSIncoming]] = {}
227269self._timers: Dict[str, asyncio.TimerHandle] = {}
@@ -309,7 +351,7 @@ def handle_query_or_defer(
309351msg: DNSIncoming,
310352addr: str,
311353port: int,
312-transport: asyncio.DatagramTransport,
354+transport: _WrappedTransport,
313355v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
314356 ) -> None:
315357"""Deal with incoming query packets. Provides a response if
@@ -341,7 +383,7 @@ def _respond_query(
341383msg: Optional[DNSIncoming],
342384addr: str,
343385port: int,
344-transport: asyncio.DatagramTransport,
386+transport: _WrappedTransport,
345387v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
346388 ) -> None:
347389"""Respond to a query and reassemble any truncated deferred packets."""
@@ -362,27 +404,25 @@ def error_received(self, exc: Exception) -> None:
362404self.log_exception_once(exc, msg_str, exc)
363405364406def connection_made(self, transport: asyncio.BaseTransport) -> None:
365-self.transport = cast(asyncio.DatagramTransport, transport)
366-sock_name = self.transport.get_extra_info('sockname')
367-sock_fileno = self.transport.get_extra_info('socket').fileno()
368-self.sock_description = f"{sock_fileno} ({sock_name})"
407+wrapped_transport = _make_wrapped_transport(cast(asyncio.DatagramTransport, transport))
408+self.transport = wrapped_transport
409+self.sock_description = f"{wrapped_transport.fileno} ({wrapped_transport.sock_name})"
369410370411def connection_lost(self, exc: Optional[Exception]) -> None:
371412"""Handle connection lost."""
372413373414374415def async_send_with_transport(
375416log_debug: bool,
376-transport: asyncio.DatagramTransport,
417+transport: _WrappedTransport,
377418packet: bytes,
378419packet_num: int,
379420out: DNSOutgoing,
380421addr: Optional[str],
381422port: int,
382423v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
383424) -> None:
384-s = transport.get_extra_info('socket')
385-ipv6_socket = s.family == socket.AF_INET6
425+ipv6_socket = transport.is_ipv6
386426if addr is None:
387427real_addr = _MDNS_ADDR6 if ipv6_socket else _MDNS_ADDR
388428else:
@@ -394,8 +434,8 @@ def async_send_with_transport(
394434'Sending to (%s, %d) via [socket %s (%s)] (%d bytes #%d) %r as %r...',
395435real_addr,
396436port or _MDNS_PORT,
397-s.fileno(),
398-transport.get_extra_info('sockname'),
437+transport.fileno,
438+transport.sock_name,
399439len(packet),
400440packet_num + 1,
401441out,
@@ -404,9 +444,9 @@ def async_send_with_transport(
404444# Get flowinfo and scopeid for the IPV6 socket to create a complete IPv6
405445# address tuple: https://docs.python.org/3.6/library/socket.html#socket-families
406446if ipv6_socket and not v6_flow_scope:
407-_, _, sock_flowinfo, sock_scopeid = s.getsockname()
447+_, _, sock_flowinfo, sock_scopeid = transport.sock_name
408448v6_flow_scope = (sock_flowinfo, sock_scopeid)
409-transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope))
449+transport.transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope))
410450411451412452class Zeroconf(QuietLogger):
@@ -832,7 +872,7 @@ def handle_assembled_query(
832872packets: List[DNSIncoming],
833873addr: str,
834874port: int,
835-transport: asyncio.DatagramTransport,
875+transport: _WrappedTransport,
836876v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
837877 ) -> None:
838878"""Respond to a (re)assembled query.
@@ -870,7 +910,7 @@ def send(
870910addr: Optional[str] = None,
871911port: int = _MDNS_PORT,
872912v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
873-transport: Optional[asyncio.DatagramTransport] = None,
913+transport: Optional[_WrappedTransport] = None,
874914 ) -> None:
875915"""Sends an outgoing packet threadsafe."""
876916assert self.loop is not None
@@ -882,7 +922,7 @@ def async_send(
882922addr: Optional[str] = None,
883923port: int = _MDNS_PORT,
884924v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
885-transport: Optional[asyncio.DatagramTransport] = None,
925+transport: Optional[_WrappedTransport] = None,
886926 ) -> None:
887927"""Sends an outgoing packet."""
888928if self.done: