feat: speed up processing incoming packets by bdraco · Pull Request #1352 · python-zeroconf/python-zeroconf

Expand Up @@ -191,14 +191,16 @@ def _has_mcast_record_in_last_second(self, record: DNSRecord) -> bool: class QueryHandler: """Query the ServiceRegistry."""
__slots__ = ("zc", "registry", "cache", "question_history") __slots__ = ("zc", "registry", "cache", "question_history", "out_queue", "out_delay_queue")
def __init__(self, zc: 'Zeroconf') -> None: """Init the query handler.""" self.zc = zc self.registry = zc.registry self.cache = zc.cache self.question_history = zc.question_history self.out_queue = zc.out_queue self.out_delay_queue = zc.out_delay_queue
def _add_service_type_enumeration_query_answers( self, types: List[str], answer_set: _AnswerWithAdditionalsType, known_answers: DNSRRSet Expand Down Expand Up @@ -301,7 +303,7 @@ def async_response( # pylint: disable=unused-argument """ strategies: List[_AnswerStrategy] = [] for msg in msgs: for question in msg.questions: for question in msg._questions: strategies.extend(self._get_answer_strategies(question))
if not strategies: Expand All @@ -311,7 +313,8 @@ def async_response( # pylint: disable=unused-argument return None
is_probe = False questions = msg.questions msg = msgs[0] questions = msg._questions # Only decode known answers if we are not a probe and we have # at least one answer strategy answers: List[DNSRecord] = [] Expand All @@ -321,7 +324,6 @@ def async_response( # pylint: disable=unused-argument else: answers.extend(msg.answers())
msg = msgs[0] query_res = _QueryResponse(self.cache, questions, is_probe, msg.now) known_answers = DNSRRSet(answers) known_answers_set: Optional[Set[DNSRecord]] = None Expand Down Expand Up @@ -412,13 +414,12 @@ def handle_assembled_query( packet will be in packets. """ first_packet = packets[0] now = first_packet.now ucast_source = port != _MDNS_PORT question_answers = self.async_response(packets, ucast_source) if not question_answers: if question_answers is None: return if question_answers.ucast: questions = first_packet.questions questions = first_packet._questions id_ = first_packet.id out = construct_outgoing_unicast_answers(question_answers.ucast, ucast_source, questions, id_) # When sending unicast, only send back the reply Expand All @@ -428,11 +429,9 @@ def handle_assembled_query( if question_answers.mcast_now: self.zc.async_send(construct_outgoing_multicast_answers(question_answers.mcast_now)) if question_answers.mcast_aggregate: out_queue = self.zc.out_queue out_queue.async_add(now, question_answers.mcast_aggregate) self.out_queue.async_add(first_packet.now, question_answers.mcast_aggregate) if question_answers.mcast_aggregate_last_second: # https://datatracker.ietf.org/doc/html/rfc6762#section-14 # If we broadcast it in the last second, we have to delay # at least a second before we send it again out_delay_queue = self.zc.out_delay_queue out_delay_queue.async_add(now, question_answers.mcast_aggregate_last_second) self.out_delay_queue.async_add(first_packet.now, question_answers.mcast_aggregate_last_second)