Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace
from sentry_sdk.transport import BaseHttpTransport, make_transport
from sentry_sdk.transport import HttpTransportCore, make_transport
from sentry_sdk.consts import (
SPANDATA,
DEFAULT_MAX_VALUE_LENGTH,
Expand Down Expand Up @@ -406,7 +406,7 @@ def _capture_envelope(envelope: Envelope) -> None:
self.monitor
or self.log_batcher
or has_profiling_enabled(self.options)
or isinstance(self.transport, BaseHttpTransport)
or isinstance(self.transport, HttpTransportCore)
):
# If we have anything on that could spawn a background thread, we
# need to check if it's safe to use them.
Expand Down
158 changes: 98 additions & 60 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from sentry_sdk.consts import EndpointType
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
from sentry_sdk.worker import BackgroundWorker
from sentry_sdk.worker import BackgroundWorker, Worker
from sentry_sdk.envelope import Envelope, Item, PayloadRef

from typing import TYPE_CHECKING
Expand Down Expand Up @@ -162,8 +162,8 @@ def _parse_rate_limits(
continue


class BaseHttpTransport(Transport):
"""The base HTTP transport."""
class HttpTransportCore(Transport):
"""Shared base class for sync and async transports."""

TIMEOUT = 30 # seconds

Expand All @@ -173,7 +173,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
Transport.__init__(self, options)
assert self.parsed_dsn is not None
self.options: Dict[str, Any] = options
self._worker = BackgroundWorker(queue_size=options["transport_queue_size"])
self._worker = self._create_worker(options)
self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
self._disabled_until: Dict[Optional[str], datetime] = {}
# We only use this Retry() class for the `get_retry_after` method it exposes
Expand Down Expand Up @@ -224,6 +224,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
elif self._compression_algo == "br":
self._compression_level = 4

def _create_worker(self: Self, options: Dict[str, Any]) -> Worker:
# For now, we only support the threaded sync background worker.
return BackgroundWorker(queue_size=options["transport_queue_size"])

def record_lost_event(
self: Self,
reason: str,
Expand Down Expand Up @@ -286,12 +290,8 @@ def _update_rate_limits(
seconds=retry_after
)

def _send_request(
self: Self,
body: bytes,
headers: Dict[str, str],
endpoint_type: EndpointType = EndpointType.ENVELOPE,
envelope: Optional[Envelope] = None,
def _handle_request_error(
self: Self, envelope: Optional[Envelope], loss_reason: str = "network"
) -> None:
def record_loss(reason: str) -> None:
if envelope is None:
Expand All @@ -300,45 +300,45 @@ def record_loss(reason: str) -> None:
for item in envelope.items:
self.record_lost_event(reason, item=item)

self.on_dropped_event(loss_reason)
record_loss("network_error")

def _handle_response(
self: Self,
response: Union[urllib3.BaseHTTPResponse, httpcore.Response],
envelope: Optional[Envelope],
) -> None:
self._update_rate_limits(response)

if response.status == 429:
# if we hit a 429. Something was rate limited but we already
# acted on this in `self._update_rate_limits`. Note that we
# do not want to record event loss here as we will have recorded
# an outcome in relay already.
self.on_dropped_event("status_429")
pass

elif response.status >= 300 or response.status < 200:
logger.error(
"Unexpected status code: %s (body: %s)",
response.status,
getattr(response, "data", getattr(response, "content", None)),
)
self._handle_request_error(
envelope=envelope, loss_reason="status_{}".format(response.status)
)

def _update_headers(
self: Self,
headers: Dict[str, str],
) -> None:

headers.update(
{
"User-Agent": str(self._auth.client),
"X-Sentry-Auth": str(self._auth.to_header()),
}
)
try:
response = self._request(
"POST",
endpoint_type,
body,
headers,
)
except Exception:
self.on_dropped_event("network")
record_loss("network_error")
raise

try:
self._update_rate_limits(response)

if response.status == 429:
# if we hit a 429. Something was rate limited but we already
# acted on this in `self._update_rate_limits`. Note that we
# do not want to record event loss here as we will have recorded
# an outcome in relay already.
self.on_dropped_event("status_429")
pass

elif response.status >= 300 or response.status < 200:
logger.error(
"Unexpected status code: %s (body: %s)",
response.status,
getattr(response, "data", getattr(response, "content", None)),
)
self.on_dropped_event("status_{}".format(response.status))
record_loss("network_error")
finally:
response.close()

def on_dropped_event(self: Self, _reason: str) -> None:
return None
Expand Down Expand Up @@ -375,11 +375,6 @@ def _fetch_pending_client_report(
type="client_report",
)

def _flush_client_reports(self: Self, force: bool = False) -> None:
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))

def _check_disabled(self: Self, category: EventDataCategory) -> bool:
def _disabled(bucket: Optional[EventDataCategory]) -> bool:
ts = self._disabled_until.get(bucket)
Expand All @@ -398,7 +393,9 @@ def _is_worker_full(self: Self) -> bool:
def is_healthy(self: Self) -> bool:
return not (self._is_worker_full() or self._is_rate_limited())

def _send_envelope(self: Self, envelope: Envelope) -> None:
def _prepare_envelope(
self: Self, envelope: Envelope
) -> Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]:

# remove all items from the envelope which are over quota
new_items = []
Expand Down Expand Up @@ -442,13 +439,7 @@ def _send_envelope(self: Self, envelope: Envelope) -> None:
if content_encoding:
headers["Content-Encoding"] = content_encoding

self._send_request(
body.getvalue(),
headers=headers,
endpoint_type=EndpointType.ENVELOPE,
envelope=envelope,
)
return None
return envelope, body, headers

def _serialize_envelope(
self: Self, envelope: Envelope
Expand Down Expand Up @@ -494,6 +485,9 @@ def _make_pool(
httpcore.SOCKSProxy,
httpcore.HTTPProxy,
httpcore.ConnectionPool,
httpcore.AsyncSOCKSProxy,
httpcore.AsyncHTTPProxy,
httpcore.AsyncConnectionPool,
]:
raise NotImplementedError()

Expand All @@ -506,6 +500,54 @@ def _request(
) -> Union[urllib3.BaseHTTPResponse, httpcore.Response]:
raise NotImplementedError()

def kill(self: Self) -> None:
logger.debug("Killing HTTP transport")
self._worker.kill()


class BaseHttpTransport(HttpTransportCore):
"""The base HTTP transport."""

def _send_envelope(self: Self, envelope: Envelope) -> None:
_prepared_envelope = self._prepare_envelope(envelope)
if _prepared_envelope is not None:
envelope, body, headers = _prepared_envelope
self._send_request(
body.getvalue(),
headers=headers,
endpoint_type=EndpointType.ENVELOPE,
envelope=envelope,
)
return None

def _send_request(
self: Self,
body: bytes,
headers: Dict[str, str],
endpoint_type: EndpointType,
envelope: Optional[Envelope],
) -> None:
self._update_headers(headers)
try:
response = self._request(
"POST",
endpoint_type,
body,
headers,
)
except Exception:
self._handle_request_error(envelope=envelope, loss_reason="network")
raise
try:
self._handle_response(response=response, envelope=envelope)
finally:
response.close()

def _flush_client_reports(self: Self, force: bool = False) -> None:
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))

def capture_envelope(self: Self, envelope: Envelope) -> None:
def send_envelope_wrapper() -> None:
with capture_internal_exceptions():
Expand All @@ -528,10 +570,6 @@ def flush(
self._worker.submit(lambda: self._flush_client_reports(force=True))
self._worker.flush(timeout, callback)

def kill(self: Self) -> None:
logger.debug("Killing HTTP transport")
self._worker.kill()


class HttpTransport(BaseHttpTransport):
if TYPE_CHECKING:
Expand Down
Loading
Loading