sentry_sdk.transport
from abc import ABC, abstractmethod import asyncio import io import os import gzip import socket import ssl import time import warnings from datetime import datetime, timedelta, timezone from collections import defaultdict from urllib.request import getproxies try: import brotli # type: ignore except ImportError: brotli = None try: import httpcore except ImportError: httpcore = None # type: ignore[assignment,unused-ignore] try: import h2 # noqa: F401 HTTP2_ENABLED = httpcore is not None except ImportError: HTTP2_ENABLED = False try: import anyio # noqa: F401 ASYNC_TRANSPORT_AVAILABLE = httpcore is not None except ImportError: ASYNC_TRANSPORT_AVAILABLE = False import urllib3 import certifi import sentry_sdk from sentry_sdk.consts import EndpointType from sentry_sdk.utils import ( Dsn, logger, capture_internal_exceptions, mark_sentry_task_internal, ) from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING, cast, List, Dict if TYPE_CHECKING: from typing import Any from typing import Callable from typing import DefaultDict from typing import Iterable from typing import Mapping from typing import Optional from typing import Self from typing import Tuple from typing import Type from typing import Union from urllib3.poolmanager import PoolManager from urllib3.poolmanager import ProxyManager from sentry_sdk._types import Event, EventDataCategory KEEP_ALIVE_SOCKET_OPTIONS = [] for option in [ (socket.SOL_SOCKET, lambda: getattr(socket, "SO_KEEPALIVE"), 1), # noqa: B009 (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPIDLE"), 45), # noqa: B009 (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPINTVL"), 10), # noqa: B009 (socket.SOL_TCP, lambda: getattr(socket, "TCP_KEEPCNT"), 6), # noqa: B009 ]: try: KEEP_ALIVE_SOCKET_OPTIONS.append((option[0], option[1](), option[2])) except AttributeError: # a specific option might not be available on specific systems, # e.g. TCP_KEEPIDLE doesn't exist on macOS pass def _get_httpcore_header_value(response: "Any", header: str) -> "Optional[str]": """Case-insensitive header lookup for httpcore-style responses.""" header_lower = header.lower() return next( ( val.decode("ascii") for key, val in response.headers if key.decode("ascii").lower() == header_lower ), None, ) class Transport(ABC): """Baseclass for all transports. A transport is used to send an event to sentry. """ parsed_dsn: "Optional[Dsn]" = None def __init__(self: "Self", options: "Optional[Dict[str, Any]]" = None) -> None: self.options = options if options and options["dsn"] is not None and options["dsn"]: self.parsed_dsn = Dsn(options["dsn"], options.get("org_id")) else: self.parsed_dsn = None def capture_event(self: "Self", event: "Event") -> None: """ DEPRECATED: Please use capture_envelope instead. This gets invoked with the event dictionary when an event should be sent to sentry. """ warnings.warn( "capture_event is deprecated, please use capture_envelope instead!", DeprecationWarning, stacklevel=2, ) envelope = Envelope() envelope.add_event(event) self.capture_envelope(envelope)