feat: speed up outgoing multicast queue (#1277) · python-zeroconf/python-zeroconf@a13fd49
@@ -32,9 +32,13 @@
3232construct_outgoing_multicast_answers,
3333)
343435+RAND_INT = random.randint
36+3537if TYPE_CHECKING:
3638from .._core import Zeroconf
373940+_float = float
41+38423943class MulticastOutgoingQueue:
4044"""An outgoing queue used to aggregate multicast responses."""
@@ -50,10 +54,13 @@ def __init__(self, zeroconf: 'Zeroconf', additional_delay: int, max_aggregation_
5054self.additional_delay = additional_delay
5155self.aggregation_delay = max_aggregation_delay
525653-def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None:
57+def async_add(self, now: _float, answers: _AnswerWithAdditionalsType) -> None:
5458"""Add a group of answers with additionals to the outgoing queue."""
55-assert self.zc.loop is not None
56-random_delay = random.randint(*MULTICAST_DELAY_RANDOM_INTERVAL) + self.additional_delay
59+loop = self.zc.loop
60+if TYPE_CHECKING:
61+assert loop is not None
62+random_int = RAND_INT(*MULTICAST_DELAY_RANDOM_INTERVAL)
63+random_delay = random_int + self.additional_delay
5764send_after = now + random_delay
5865send_before = now + self.aggregation_delay + self.additional_delay
5966if len(self.queue):
@@ -66,7 +73,7 @@ def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None:
6673last_group.answers.update(answers)
6774return
6875else:
69-self.zc.loop.call_later(millis_to_seconds(random_delay), self.async_ready)
76+loop.call_at(loop.time() + millis_to_seconds(random_delay), self.async_ready)
7077self.queue.append(AnswerGroup(send_after, send_before, answers))
71787279def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> None:
@@ -77,13 +84,16 @@ def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> Non
77847885def async_ready(self) -> None:
7986"""Process anything in the queue that is ready."""
80-assert self.zc.loop is not None
87+zc = self.zc
88+loop = zc.loop
89+if TYPE_CHECKING:
90+assert loop is not None
8191now = current_time_millis()
82928393if len(self.queue) > 1 and self.queue[0].send_before > now:
8494# There is more than one answer in the queue,
8595# delay until we have to send it (first answer group reaches send_before)
86-self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_before - now), self.async_ready)
96+loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_before - now), self.async_ready)
8797return
88988999answers: _AnswerWithAdditionalsType = {}
@@ -94,9 +104,9 @@ def async_ready(self) -> None:
94104if len(self.queue):
95105# If there are still groups in the queue that are not ready to send
96106# be sure we schedule them to go out later
97-self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_after - now), self.async_ready)
107+loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_after - now), self.async_ready)
9810899109if answers:
100110# If we have the same answer scheduled to go out, remove them
101111self._remove_answers_from_queue(answers)
102-self.zc.async_send(construct_outgoing_multicast_answers(answers))
112+zc.async_send(construct_outgoing_multicast_answers(answers))