feat: speed up outgoing multicast queue by bdraco · Pull Request #1277 · python-zeroconf/python-zeroconf
Expand Up
@@ -32,9 +32,13 @@
construct_outgoing_multicast_answers,
)
RAND_INT = random.randint
if TYPE_CHECKING: from .._core import Zeroconf
_float = float
class MulticastOutgoingQueue: """An outgoing queue used to aggregate multicast responses.""" Expand All @@ -50,10 +54,13 @@ def __init__(self, zeroconf: 'Zeroconf', additional_delay: int, max_aggregation_ self.additional_delay = additional_delay self.aggregation_delay = max_aggregation_delay
def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None: def async_add(self, now: _float, answers: _AnswerWithAdditionalsType) -> None: """Add a group of answers with additionals to the outgoing queue.""" assert self.zc.loop is not None random_delay = random.randint(*MULTICAST_DELAY_RANDOM_INTERVAL) + self.additional_delay loop = self.zc.loop if TYPE_CHECKING: assert loop is not None random_int = RAND_INT(*MULTICAST_DELAY_RANDOM_INTERVAL) random_delay = random_int + self.additional_delay send_after = now + random_delay send_before = now + self.aggregation_delay + self.additional_delay if len(self.queue): Expand All @@ -66,7 +73,7 @@ def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None: last_group.answers.update(answers) return else: self.zc.loop.call_later(millis_to_seconds(random_delay), self.async_ready) loop.call_at(loop.time() + millis_to_seconds(random_delay), self.async_ready) self.queue.append(AnswerGroup(send_after, send_before, answers))
def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> None: Expand All @@ -77,13 +84,16 @@ def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> Non
def async_ready(self) -> None: """Process anything in the queue that is ready.""" assert self.zc.loop is not None zc = self.zc loop = zc.loop if TYPE_CHECKING: assert loop is not None now = current_time_millis()
if len(self.queue) > 1 and self.queue[0].send_before > now: # There is more than one answer in the queue, # delay until we have to send it (first answer group reaches send_before) self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_before - now), self.async_ready) loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_before - now), self.async_ready) return
answers: _AnswerWithAdditionalsType = {} Expand All @@ -94,9 +104,9 @@ def async_ready(self) -> None: if len(self.queue): # If there are still groups in the queue that are not ready to send # be sure we schedule them to go out later self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_after - now), self.async_ready) loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_after - now), self.async_ready)
if answers: # If we have the same answer scheduled to go out, remove them self._remove_answers_from_queue(answers) self.zc.async_send(construct_outgoing_multicast_answers(answers)) zc.async_send(construct_outgoing_multicast_answers(answers))
RAND_INT = random.randint
if TYPE_CHECKING: from .._core import Zeroconf
_float = float
class MulticastOutgoingQueue: """An outgoing queue used to aggregate multicast responses.""" Expand All @@ -50,10 +54,13 @@ def __init__(self, zeroconf: 'Zeroconf', additional_delay: int, max_aggregation_ self.additional_delay = additional_delay self.aggregation_delay = max_aggregation_delay
def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None: def async_add(self, now: _float, answers: _AnswerWithAdditionalsType) -> None: """Add a group of answers with additionals to the outgoing queue.""" assert self.zc.loop is not None random_delay = random.randint(*MULTICAST_DELAY_RANDOM_INTERVAL) + self.additional_delay loop = self.zc.loop if TYPE_CHECKING: assert loop is not None random_int = RAND_INT(*MULTICAST_DELAY_RANDOM_INTERVAL) random_delay = random_int + self.additional_delay send_after = now + random_delay send_before = now + self.aggregation_delay + self.additional_delay if len(self.queue): Expand All @@ -66,7 +73,7 @@ def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None: last_group.answers.update(answers) return else: self.zc.loop.call_later(millis_to_seconds(random_delay), self.async_ready) loop.call_at(loop.time() + millis_to_seconds(random_delay), self.async_ready) self.queue.append(AnswerGroup(send_after, send_before, answers))
def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> None: Expand All @@ -77,13 +84,16 @@ def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> Non
def async_ready(self) -> None: """Process anything in the queue that is ready.""" assert self.zc.loop is not None zc = self.zc loop = zc.loop if TYPE_CHECKING: assert loop is not None now = current_time_millis()
if len(self.queue) > 1 and self.queue[0].send_before > now: # There is more than one answer in the queue, # delay until we have to send it (first answer group reaches send_before) self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_before - now), self.async_ready) loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_before - now), self.async_ready) return
answers: _AnswerWithAdditionalsType = {} Expand All @@ -94,9 +104,9 @@ def async_ready(self) -> None: if len(self.queue): # If there are still groups in the queue that are not ready to send # be sure we schedule them to go out later self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_after - now), self.async_ready) loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_after - now), self.async_ready)
if answers: # If we have the same answer scheduled to go out, remove them self._remove_answers_from_queue(answers) self.zc.async_send(construct_outgoing_multicast_answers(answers)) zc.async_send(construct_outgoing_multicast_answers(answers))