Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace
from sentry_sdk.transport import BaseHttpTransport, make_transport
from sentry_sdk.transport import HttpTransportCore, make_transport
from sentry_sdk.consts import (
SPANDATA,
DEFAULT_MAX_VALUE_LENGTH,
Expand Down Expand Up @@ -406,7 +406,7 @@ def _capture_envelope(envelope: Envelope) -> None:
self.monitor
or self.log_batcher
or has_profiling_enabled(self.options)
or isinstance(self.transport, BaseHttpTransport)
or isinstance(self.transport, HttpTransportCore)
):
# If we have anything on that could spawn a background thread, we
# need to check if it's safe to use them.
Expand Down
150 changes: 92 additions & 58 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ def _parse_rate_limits(
continue


class BaseHttpTransport(Transport):
"""The base HTTP transport."""
class HttpTransportCore(Transport):
"""Shared base class for sync and async transports."""

TIMEOUT = 30 # seconds

Expand Down Expand Up @@ -286,12 +286,8 @@ def _update_rate_limits(
seconds=retry_after
)

def _send_request(
self: Self,
body: bytes,
headers: Dict[str, str],
endpoint_type: EndpointType = EndpointType.ENVELOPE,
envelope: Optional[Envelope] = None,
def _handle_request_error(
self: Self, envelope: Optional[Envelope], loss_reason: str = "network"
) -> None:
def record_loss(reason: str) -> None:
if envelope is None:
Expand All @@ -300,45 +296,45 @@ def record_loss(reason: str) -> None:
for item in envelope.items:
self.record_lost_event(reason, item=item)

self.on_dropped_event(loss_reason)
record_loss("network_error")

def _handle_response(
self: Self,
response: Union[urllib3.BaseHTTPResponse, httpcore.Response],
envelope: Optional[Envelope],
) -> None:
self._update_rate_limits(response)

if response.status == 429:
# if we hit a 429. Something was rate limited but we already
# acted on this in `self._update_rate_limits`. Note that we
# do not want to record event loss here as we will have recorded
# an outcome in relay already.
self.on_dropped_event("status_429")
pass

elif response.status >= 300 or response.status < 200:
logger.error(
"Unexpected status code: %s (body: %s)",
response.status,
getattr(response, "data", getattr(response, "content", None)),
)
self._handle_request_error(
envelope=envelope, loss_reason="status_{}".format(response.status)
)

def _update_headers(
self: Self,
headers: Dict[str, str],
) -> None:

headers.update(
{
"User-Agent": str(self._auth.client),
"X-Sentry-Auth": str(self._auth.to_header()),
}
)
try:
response = self._request(
"POST",
endpoint_type,
body,
headers,
)
except Exception:
self.on_dropped_event("network")
record_loss("network_error")
raise

try:
self._update_rate_limits(response)

if response.status == 429:
# if we hit a 429. Something was rate limited but we already
# acted on this in `self._update_rate_limits`. Note that we
# do not want to record event loss here as we will have recorded
# an outcome in relay already.
self.on_dropped_event("status_429")
pass

elif response.status >= 300 or response.status < 200:
logger.error(
"Unexpected status code: %s (body: %s)",
response.status,
getattr(response, "data", getattr(response, "content", None)),
)
self.on_dropped_event("status_{}".format(response.status))
record_loss("network_error")
finally:
response.close()

def on_dropped_event(self: Self, _reason: str) -> None:
return None
Expand Down Expand Up @@ -375,11 +371,6 @@ def _fetch_pending_client_report(
type="client_report",
)

def _flush_client_reports(self: Self, force: bool = False) -> None:
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))

def _check_disabled(self: Self, category: EventDataCategory) -> bool:
def _disabled(bucket: Optional[EventDataCategory]) -> bool:
ts = self._disabled_until.get(bucket)
Expand All @@ -398,7 +389,9 @@ def _is_worker_full(self: Self) -> bool:
def is_healthy(self: Self) -> bool:
return not (self._is_worker_full() or self._is_rate_limited())

def _send_envelope(self: Self, envelope: Envelope) -> None:
def _prepare_envelope(
self: Self, envelope: Envelope
) -> Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]:

# remove all items from the envelope which are over quota
new_items = []
Expand Down Expand Up @@ -442,13 +435,7 @@ def _send_envelope(self: Self, envelope: Envelope) -> None:
if content_encoding:
headers["Content-Encoding"] = content_encoding

self._send_request(
body.getvalue(),
headers=headers,
endpoint_type=EndpointType.ENVELOPE,
envelope=envelope,
)
return None
return envelope, body, headers

def _serialize_envelope(
self: Self, envelope: Envelope
Expand Down Expand Up @@ -494,6 +481,9 @@ def _make_pool(
httpcore.SOCKSProxy,
httpcore.HTTPProxy,
httpcore.ConnectionPool,
httpcore.AsyncSOCKSProxy,
httpcore.AsyncHTTPProxy,
httpcore.AsyncConnectionPool,
]:
raise NotImplementedError()

Expand All @@ -506,6 +496,54 @@ def _request(
) -> Union[urllib3.BaseHTTPResponse, httpcore.Response]:
raise NotImplementedError()

def kill(self: Self) -> None:
logger.debug("Killing HTTP transport")
self._worker.kill()


class BaseHttpTransport(HttpTransportCore):
"""The base HTTP transport."""

def _send_envelope(self: Self, envelope: Envelope) -> None:
_prepared_envelope = self._prepare_envelope(envelope)
if _prepared_envelope is not None:
envelope, body, headers = _prepared_envelope
self._send_request(
body.getvalue(),
headers=headers,
endpoint_type=EndpointType.ENVELOPE,
envelope=envelope,
)
return None

def _send_request(
self: Self,
body: bytes,
headers: Dict[str, str],
endpoint_type: EndpointType,
envelope: Optional[Envelope],
) -> None:
self._update_headers(headers)
try:
response = self._request(
"POST",
endpoint_type,
body,
headers,
)
except Exception:
self._handle_request_error(envelope=envelope, loss_reason="network")
raise
try:
self._handle_response(response=response, envelope=envelope)
finally:
response.close()

def _flush_client_reports(self: Self, force: bool = False) -> None:
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))

def capture_envelope(self: Self, envelope: Envelope) -> None:
def send_envelope_wrapper() -> None:
with capture_internal_exceptions():
Expand All @@ -528,10 +566,6 @@ def flush(
self._worker.submit(lambda: self._flush_client_reports(force=True))
self._worker.flush(timeout, callback)

def kill(self: Self) -> None:
logger.debug("Killing HTTP transport")
self._worker.kill()


class HttpTransport(BaseHttpTransport):
if TYPE_CHECKING:
Expand Down
76 changes: 76 additions & 0 deletions tests/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,79 @@ def test_record_lost_event_transaction_item(capturing_server, make_client, span_
"reason": "test",
"quantity": span_count + 1,
} in discarded_events


def test_handle_unexpected_status_invokes_handle_request_error(
make_client, monkeypatch
):
client = make_client()
transport = client.transport

monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True)

def stub_request(method, endpoint, body=None, headers=None):
class MockResponse:
def __init__(self):
self.status = 500 # Integer
self.data = b"server error"
self.headers = {}

def close(self):
pass

return MockResponse()

monkeypatch.setattr(transport, "_request", stub_request)

seen = []
monkeypatch.setattr(
transport,
"_handle_request_error",
lambda envelope, loss_reason: seen.append(loss_reason),
)

client.capture_event({"message": "test"})
client.flush()

assert seen == ["status_500"]


def test_handle_request_error_basic_coverage(make_client, monkeypatch):
client = make_client()
transport = client.transport

monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True)

# Track method calls
calls = []

def mock_on_dropped_event(reason):
calls.append(("on_dropped_event", reason))

def mock_record_lost_event(reason, data_category=None, item=None):
calls.append(("record_lost_event", reason, data_category, item))

monkeypatch.setattr(transport, "on_dropped_event", mock_on_dropped_event)
monkeypatch.setattr(transport, "record_lost_event", mock_record_lost_event)

# Test case 1: envelope is None
transport._handle_request_error(envelope=None, loss_reason="test_reason")

assert len(calls) == 2
assert calls[0] == ("on_dropped_event", "test_reason")
assert calls[1] == ("record_lost_event", "network_error", "error", None)

# Reset
calls.clear()

# Test case 2: envelope with items
envelope = Envelope()
envelope.add_item(mock.MagicMock()) # Simple mock item
envelope.add_item(mock.MagicMock()) # Another mock item

transport._handle_request_error(envelope=envelope, loss_reason="connection_error")

assert len(calls) == 3
assert calls[0] == ("on_dropped_event", "connection_error")
assert calls[1][0:2] == ("record_lost_event", "network_error")
assert calls[2][0:2] == ("record_lost_event", "network_error")
Loading