feat: speed up outgoing multicast queue (#1277) · python-zeroconf/python-zeroconf@a13fd49

@@ -32,9 +32,13 @@

3232

construct_outgoing_multicast_answers,

3333

)

343435+

RAND_INT = random.randint

36+3537

if TYPE_CHECKING:

3638

from .._core import Zeroconf

373940+

_float = float

41+38423943

class MulticastOutgoingQueue:

4044

"""An outgoing queue used to aggregate multicast responses."""

@@ -50,10 +54,13 @@ def __init__(self, zeroconf: 'Zeroconf', additional_delay: int, max_aggregation_

5054

self.additional_delay = additional_delay

5155

self.aggregation_delay = max_aggregation_delay

525653-

def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None:

57+

def async_add(self, now: _float, answers: _AnswerWithAdditionalsType) -> None:

5458

"""Add a group of answers with additionals to the outgoing queue."""

55-

assert self.zc.loop is not None

56-

random_delay = random.randint(*MULTICAST_DELAY_RANDOM_INTERVAL) + self.additional_delay

59+

loop = self.zc.loop

60+

if TYPE_CHECKING:

61+

assert loop is not None

62+

random_int = RAND_INT(*MULTICAST_DELAY_RANDOM_INTERVAL)

63+

random_delay = random_int + self.additional_delay

5764

send_after = now + random_delay

5865

send_before = now + self.aggregation_delay + self.additional_delay

5966

if len(self.queue):

@@ -66,7 +73,7 @@ def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None:

6673

last_group.answers.update(answers)

6774

return

6875

else:

69-

self.zc.loop.call_later(millis_to_seconds(random_delay), self.async_ready)

76+

loop.call_at(loop.time() + millis_to_seconds(random_delay), self.async_ready)

7077

self.queue.append(AnswerGroup(send_after, send_before, answers))

71787279

def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> None:

@@ -77,13 +84,16 @@ def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> Non

77847885

def async_ready(self) -> None:

7986

"""Process anything in the queue that is ready."""

80-

assert self.zc.loop is not None

87+

zc = self.zc

88+

loop = zc.loop

89+

if TYPE_CHECKING:

90+

assert loop is not None

8191

now = current_time_millis()

82928393

if len(self.queue) > 1 and self.queue[0].send_before > now:

8494

# There is more than one answer in the queue,

8595

# delay until we have to send it (first answer group reaches send_before)

86-

self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_before - now), self.async_ready)

96+

loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_before - now), self.async_ready)

8797

return

88988999

answers: _AnswerWithAdditionalsType = {}

@@ -94,9 +104,9 @@ def async_ready(self) -> None:

94104

if len(self.queue):

95105

# If there are still groups in the queue that are not ready to send

96106

# be sure we schedule them to go out later

97-

self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_after - now), self.async_ready)

107+

loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_after - now), self.async_ready)

9810899109

if answers:

100110

# If we have the same answer scheduled to go out, remove them

101111

self._remove_answers_from_queue(answers)

102-

self.zc.async_send(construct_outgoing_multicast_answers(answers))

112+

zc.async_send(construct_outgoing_multicast_answers(answers))