feat: refactor notify implementation to reduce overhead of adding and… · python-zeroconf/python-zeroconf@ceb92cf

@@ -25,7 +25,7 @@

2525

import sys

2626

import threading

2727

from types import TracebackType

28-

from typing import Awaitable, Dict, List, Optional, Tuple, Type, Union

28+

from typing import Awaitable, Dict, List, Optional, Set, Tuple, Type, Union

29293030

from ._cache import DNSCache

3131

from ._dns import DNSQuestion, DNSQuestionType

@@ -49,11 +49,13 @@

4949

from ._transport import _WrappedTransport

5050

from ._updates import RecordUpdateListener

5151

from ._utils.asyncio import (

52+

_resolve_all_futures_to_none,

5253

await_awaitable,

5354

get_running_loop,

5455

run_coro_with_timeout,

5556

shutdown_loop,

5657

wait_event_or_timeout,

58+

wait_for_future_set_or_timeout,

5759

)

5860

from ._utils.name import service_type_name

5961

from ._utils.net import (

@@ -188,7 +190,7 @@ def __init__(

188190

self.query_handler = QueryHandler(self.registry, self.cache, self.question_history)

189191

self.record_manager = RecordManager(self)

190192191-

self.notify_event: Optional[asyncio.Event] = None

193+

self._notify_futures: Set[asyncio.Future] = set()

192194

self.loop: Optional[asyncio.AbstractEventLoop] = None

193195

self._loop_thread: Optional[threading.Thread] = None

194196

@@ -206,7 +208,6 @@ def start(self) -> None:

206208

"""Start Zeroconf."""

207209

self.loop = get_running_loop()

208210

if self.loop:

209-

self.notify_event = asyncio.Event()

210211

self.engine.setup(self.loop, None)

211212

return

212213

self._start_thread()

@@ -218,7 +219,6 @@ def _start_thread(self) -> None:

218219

def _run_loop() -> None:

219220

self.loop = asyncio.new_event_loop()

220221

asyncio.set_event_loop(self.loop)

221-

self.notify_event = asyncio.Event()

222222

self.engine.setup(self.loop, loop_thread_ready)

223223

self.loop.run_forever()

224224

@@ -245,8 +245,9 @@ def listeners(self) -> List[RecordUpdateListener]:

245245246246

async def async_wait(self, timeout: float) -> None:

247247

"""Calling task waits for a given number of milliseconds or until notified."""

248-

assert self.notify_event is not None

249-

await wait_event_or_timeout(self.notify_event, timeout=millis_to_seconds(timeout))

248+

loop = self.loop

249+

assert loop is not None

250+

await wait_for_future_set_or_timeout(loop, self._notify_futures, timeout)

250251251252

def notify_all(self) -> None:

252253

"""Notifies all waiting threads and notify listeners."""

@@ -255,9 +256,9 @@ def notify_all(self) -> None:

255256256257

def async_notify_all(self) -> None:

257258

"""Schedule an async_notify_all."""

258-

assert self.notify_event is not None

259-

self.notify_event.set()

260-

self.notify_event.clear()

259+

notify_futures = self._notify_futures

260+

if notify_futures:

261+

_resolve_all_futures_to_none(notify_futures)

261262262263

def get_service_info(

263264

self, type_: str, name: str, timeout: int = 3000, question_type: Optional[DNSQuestionType] = None