feat: speed up incoming data parser by bdraco · Pull Request #1161 · python-zeroconf/python-zeroconf
Expand Up
@@ -353,7 +353,9 @@ def _decode_labels_at_offset(self, off: int, labels: List[str], seen_pointers: S
)
# We have a DNS compression pointer link = (length & 0x3F) * 256 + self.data[off + 1] link_data = self.data[off + 1] link = (length & 0x3F) * 256 + link_data lint_int = int(link) if link > self._data_len: raise IncomingDecodeError( f"DNS compression pointer at {off} points to {link} beyond packet from {self.source}" Expand All @@ -362,15 +364,16 @@ def _decode_labels_at_offset(self, off: int, labels: List[str], seen_pointers: S raise IncomingDecodeError( f"DNS compression pointer at {off} points to itself from {self.source}" ) if link in seen_pointers: if lint_int in seen_pointers: raise IncomingDecodeError( f"DNS compression pointer at {off} was seen again from {self.source}" ) linked_labels = self.name_cache.get(link, []) linked_labels = self.name_cache.get(lint_int) if not linked_labels: seen_pointers.add(link) linked_labels = [] seen_pointers.add(lint_int) self._decode_labels_at_offset(link, linked_labels, seen_pointers) self.name_cache[link] = linked_labels self.name_cache[lint_int] = linked_labels labels.extend(linked_labels) if len(labels) > MAX_DNS_LABELS: raise IncomingDecodeError( Expand Down
# We have a DNS compression pointer link = (length & 0x3F) * 256 + self.data[off + 1] link_data = self.data[off + 1] link = (length & 0x3F) * 256 + link_data lint_int = int(link) if link > self._data_len: raise IncomingDecodeError( f"DNS compression pointer at {off} points to {link} beyond packet from {self.source}" Expand All @@ -362,15 +364,16 @@ def _decode_labels_at_offset(self, off: int, labels: List[str], seen_pointers: S raise IncomingDecodeError( f"DNS compression pointer at {off} points to itself from {self.source}" ) if link in seen_pointers: if lint_int in seen_pointers: raise IncomingDecodeError( f"DNS compression pointer at {off} was seen again from {self.source}" ) linked_labels = self.name_cache.get(link, []) linked_labels = self.name_cache.get(lint_int) if not linked_labels: seen_pointers.add(link) linked_labels = [] seen_pointers.add(lint_int) self._decode_labels_at_offset(link, linked_labels, seen_pointers) self.name_cache[link] = linked_labels self.name_cache[lint_int] = linked_labels labels.extend(linked_labels) if len(labels) > MAX_DNS_LABELS: raise IncomingDecodeError( Expand Down