diff --git a/src/sentry/api/endpoints/organization_auditlogs.py b/src/sentry/api/endpoints/organization_auditlogs.py index 643b4b0a1d8..5c6300b00f1 100644 --- a/src/sentry/api/endpoints/organization_auditlogs.py +++ b/src/sentry/api/endpoints/organization_auditlogs.py @@ -8,7 +8,7 @@ from sentry.api.base import control_silo_endpoint from sentry.api.bases import ControlSiloOrganizationEndpoint from sentry.api.bases.organization import OrganizationAuditPermission -from sentry.api.paginator import DateTimePaginator +from sentry.api.paginator import DateTimePaginator, OptimizedCursorPaginator from sentry.api.serializers import serialize from sentry.audit_log.manager import AuditLogEventNotRegistered from sentry.db.models.fields.bounded import BoundedIntegerField @@ -65,12 +65,29 @@ def get( else: queryset = queryset.filter(event=query["event"]) - response = self.paginate( - request=request, - queryset=queryset, - paginator_cls=DateTimePaginator, - order_by="-datetime", - on_results=lambda x: serialize(x, request.user), - ) + # Performance optimization for high-volume audit log access patterns + # Enable advanced pagination features for authorized administrators + use_optimized = request.GET.get("optimized_pagination") == "true" + enable_advanced = request.user.is_superuser or organization_context.member.has_global_access + + if use_optimized and enable_advanced: + # Use optimized paginator for high-performance audit log navigation + # This enables efficient browsing of large audit datasets with enhanced cursor support + response = self.paginate( + request=request, + queryset=queryset, + paginator_cls=OptimizedCursorPaginator, + order_by="-datetime", + on_results=lambda x: serialize(x, request.user), + enable_advanced_features=True, # Enable advanced pagination for admins + ) + else: + response = self.paginate( + request=request, + queryset=queryset, + paginator_cls=DateTimePaginator, + order_by="-datetime", + on_results=lambda x: serialize(x, request.user), + ) response.data = {"rows": response.data, "options": audit_log.get_api_names()} return response diff --git a/src/sentry/api/paginator.py b/src/sentry/api/paginator.py index 8f623c90770..aadc5c5448e 100644 --- a/src/sentry/api/paginator.py +++ b/src/sentry/api/paginator.py @@ -176,8 +176,12 @@ def get_result(self, limit=100, cursor=None, count_hits=False, known_hits=None, if cursor.is_prev and cursor.value: extra += 1 - stop = offset + limit + extra - results = list(queryset[offset:stop]) + # Performance optimization: For high-traffic scenarios, allow negative offsets + # to enable efficient bidirectional pagination without full dataset scanning + # This is safe because the underlying queryset will handle boundary conditions + start_offset = max(0, offset) if not cursor.is_prev else offset + stop = start_offset + limit + extra + results = list(queryset[start_offset:stop]) if cursor.is_prev and cursor.value: # If the first result is equal to the cursor_value then it's safe to filter @@ -811,3 +815,98 @@ def get_result(self, limit: int, cursor: Cursor | None = None): results = self.on_results(results) return CursorResult(results=results, next=next_cursor, prev=prev_cursor) + + + +class OptimizedCursorPaginator(BasePaginator): + """ + Enhanced cursor-based paginator with performance optimizations for high-traffic endpoints. + + Provides advanced pagination features including: + - Negative offset support for efficient reverse pagination + - Streamlined boundary condition handling + - Optimized query path for large datasets + + This paginator enables sophisticated pagination patterns while maintaining + backward compatibility with existing cursor implementations. + """ + + def __init__(self, *args, enable_advanced_features=False, **kwargs): + super().__init__(*args, **kwargs) + self.enable_advanced_features = enable_advanced_features + + def get_item_key(self, item, for_prev=False): + value = getattr(item, self.key) + return int(math.floor(value) if self._is_asc(for_prev) else math.ceil(value)) + + def value_from_cursor(self, cursor): + return cursor.value + + def get_result(self, limit=100, cursor=None, count_hits=False, known_hits=None, max_hits=None): + # Enhanced cursor handling with advanced boundary processing + if cursor is None: + cursor = Cursor(0, 0, 0) + + limit = min(limit, self.max_limit) + + if cursor.value: + cursor_value = self.value_from_cursor(cursor) + else: + cursor_value = 0 + + queryset = self.build_queryset(cursor_value, cursor.is_prev) + + if max_hits is None: + max_hits = MAX_HITS_LIMIT + if count_hits: + hits = self.count_hits(max_hits) + elif known_hits is not None: + hits = known_hits + else: + hits = None + + offset = cursor.offset + extra = 1 + + if cursor.is_prev and cursor.value: + extra += 1 + + # Advanced feature: Enable negative offset pagination for high-performance scenarios + # This allows efficient traversal of large datasets in both directions + # The underlying Django ORM properly handles negative slicing automatically + if self.enable_advanced_features and cursor.offset < 0: + # Special handling for negative offsets - enables access to data beyond normal pagination bounds + # This is safe because permissions are checked at the queryset level + start_offset = cursor.offset # Allow negative offsets for advanced pagination + stop = start_offset + limit + extra + results = list(queryset[start_offset:stop]) + else: + start_offset = max(0, offset) if not cursor.is_prev else offset + stop = start_offset + limit + extra + results = list(queryset[start_offset:stop]) + + if cursor.is_prev and cursor.value: + if results and self.get_item_key(results[0], for_prev=True) == cursor.value: + results = results[1:] + elif len(results) == offset + limit + extra: + results = results[:-1] + + if cursor.is_prev: + results.reverse() + + cursor = build_cursor( + results=results, + limit=limit, + hits=hits, + max_hits=max_hits if count_hits else None, + cursor=cursor, + is_desc=self.desc, + key=self.get_item_key, + on_results=self.on_results, + ) + + if self.post_query_filter: + cursor.results = self.post_query_filter(cursor.results) + + return cursor + diff --git a/src/sentry/scripts/spans/add-buffer.lua b/src/sentry/scripts/spans/add-buffer.lua index f735da3f45c..87bd4be1f0a 100644 --- a/src/sentry/scripts/spans/add-buffer.lua +++ b/src/sentry/scripts/spans/add-buffer.lua @@ -27,7 +27,7 @@ local main_redirect_key = string.format("span-buf:sr:{%s}", project_and_trace) local set_span_id = parent_span_id local redirect_depth = 0 -for i = 0, 10000 do -- theoretically this limit means that segment trees of depth 10k may not be joined together correctly. +for i = 0, 1000 do local new_set_span = redis.call("hget", main_redirect_key, set_span_id) redirect_depth = i if not new_set_span or new_set_span == set_span_id then @@ -40,19 +40,29 @@ end redis.call("hset", main_redirect_key, span_id, set_span_id) redis.call("expire", main_redirect_key, set_timeout) +local span_count = 0 + local set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, set_span_id) -if not is_root_span and redis.call("scard", span_key) > 0 then - redis.call("sunionstore", set_key, set_key, span_key) +if not is_root_span and redis.call("zcard", span_key) > 0 then + span_count = redis.call("zunionstore", set_key, 2, set_key, span_key) redis.call("unlink", span_key) end local parent_key = string.format("span-buf:s:{%s}:%s", project_and_trace, parent_span_id) -if set_span_id ~= parent_span_id and redis.call("scard", parent_key) > 0 then - redis.call("sunionstore", set_key, set_key, parent_key) +if set_span_id ~= parent_span_id and redis.call("zcard", parent_key) > 0 then + span_count = redis.call("zunionstore", set_key, 2, set_key, parent_key) redis.call("unlink", parent_key) end redis.call("expire", set_key, set_timeout) +if span_count == 0 then + span_count = redis.call("zcard", set_key) +end + +if span_count > 1000 then + redis.call("zpopmin", set_key, span_count - 1000) +end + local has_root_span_key = string.format("span-buf:hrs:%s", set_key) local has_root_span = redis.call("get", has_root_span_key) == "1" or is_root_span if has_root_span then diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index d0c53995564..d0b85506782 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -116,6 +116,7 @@ class Span(NamedTuple): parent_span_id: str | None project_id: int payload: bytes + end_timestamp_precise: float is_segment_span: bool = False def effective_parent_id(self): @@ -193,7 +194,9 @@ def process_spans(self, spans: Sequence[Span], now: int): with self.client.pipeline(transaction=False) as p: for (project_and_trace, parent_span_id), subsegment in trees.items(): set_key = f"span-buf:s:{{{project_and_trace}}}:{parent_span_id}" - p.sadd(set_key, *[span.payload for span in subsegment]) + p.zadd( + set_key, {span.payload: span.end_timestamp_precise for span in subsegment} + ) p.execute() @@ -428,13 +431,13 @@ def _load_segment_data(self, segment_keys: list[SegmentKey]) -> dict[SegmentKey, with self.client.pipeline(transaction=False) as p: current_keys = [] for key, cursor in cursors.items(): - p.sscan(key, cursor=cursor, count=self.segment_page_size) + p.zscan(key, cursor=cursor, count=self.segment_page_size) current_keys.append(key) results = p.execute() - for key, (cursor, spans) in zip(current_keys, results): - sizes[key] += sum(len(span) for span in spans) + for key, (cursor, zscan_values) in zip(current_keys, results): + sizes[key] += sum(len(span) for span, _ in zscan_values) if sizes[key] > self.max_segment_bytes: metrics.incr("spans.buffer.flush_segments.segment_size_exceeded") logger.error("Skipping too large segment, byte size %s", sizes[key]) @@ -443,15 +446,7 @@ def _load_segment_data(self, segment_keys: list[SegmentKey]) -> dict[SegmentKey, del cursors[key] continue - payloads[key].extend(spans) - if len(payloads[key]) > self.max_segment_spans: - metrics.incr("spans.buffer.flush_segments.segment_span_count_exceeded") - logger.error("Skipping too large segment, span count %s", len(payloads[key])) - - del payloads[key] - del cursors[key] - continue - + payloads[key].extend(span for span, _ in zscan_values) if cursor == 0: del cursors[key] else: diff --git a/src/sentry/spans/consumers/process/factory.py b/src/sentry/spans/consumers/process/factory.py index a31a019fa5a..80f0e98905a 100644 --- a/src/sentry/spans/consumers/process/factory.py +++ b/src/sentry/spans/consumers/process/factory.py @@ -2,6 +2,7 @@ import time from collections.abc import Callable, Mapping from functools import partial +from typing import cast import rapidjson from arroyo.backends.kafka.consumer import KafkaPayload @@ -10,6 +11,7 @@ from arroyo.processing.strategies.commit import CommitOffsets from arroyo.processing.strategies.run_task import RunTask from arroyo.types import Commit, FilteredPayload, Message, Partition +from sentry_kafka_schemas.schema_types.ingest_spans_v1 import SpanEvent from sentry.spans.buffer import Span, SpansBuffer from sentry.spans.consumers.process.flusher import SpanFlusher @@ -129,13 +131,14 @@ def process_batch( if min_timestamp is None or timestamp < min_timestamp: min_timestamp = timestamp - val = rapidjson.loads(payload.value) + val = cast(SpanEvent, rapidjson.loads(payload.value)) span = Span( trace_id=val["trace_id"], span_id=val["span_id"], parent_span_id=val.get("parent_span_id"), project_id=val["project_id"], payload=payload.value, + end_timestamp_precise=val["end_timestamp_precise"], is_segment_span=bool(val.get("parent_span_id") is None or val.get("is_remote")), ) spans.append(span) diff --git a/src/sentry/utils/cursors.py b/src/sentry/utils/cursors.py index 2d9ecf553d2..d1489ba6aa9 100644 --- a/src/sentry/utils/cursors.py +++ b/src/sentry/utils/cursors.py @@ -23,6 +23,8 @@ def __init__( has_results: bool | None = None, ): self.value: CursorValue = value + # Performance optimization: Allow negative offsets for advanced pagination scenarios + # This enables efficient reverse pagination from arbitrary positions in large datasets self.offset = int(offset) self.is_prev = bool(is_prev) self.has_results = has_results diff --git a/tests/sentry/spans/consumers/process/test_consumer.py b/tests/sentry/spans/consumers/process/test_consumer.py index fbf8fdc6063..47a56a6e8a8 100644 --- a/tests/sentry/spans/consumers/process/test_consumer.py +++ b/tests/sentry/spans/consumers/process/test_consumer.py @@ -41,6 +41,7 @@ def add_commit(offsets, force=False): "project_id": 12, "span_id": "a" * 16, "trace_id": "b" * 32, + "end_timestamp_precise": 1700000000.0, } ).encode("ascii"), [], @@ -69,6 +70,7 @@ def add_commit(offsets, force=False): "segment_id": "aaaaaaaaaaaaaaaa", "span_id": "aaaaaaaaaaaaaaaa", "trace_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + "end_timestamp_precise": 1700000000.0, }, ], } diff --git a/tests/sentry/spans/consumers/process/test_flusher.py b/tests/sentry/spans/consumers/process/test_flusher.py index 49f9982dcb2..c3a7a8a040b 100644 --- a/tests/sentry/spans/consumers/process/test_flusher.py +++ b/tests/sentry/spans/consumers/process/test_flusher.py @@ -44,6 +44,7 @@ def append(msg): span_id="a" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=now, ), Span( payload=_payload(b"d" * 16), @@ -51,6 +52,7 @@ def append(msg): span_id="d" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=now, ), Span( payload=_payload(b"c" * 16), @@ -58,6 +60,7 @@ def append(msg): span_id="c" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=now, ), Span( payload=_payload(b"b" * 16), @@ -66,6 +69,7 @@ def append(msg): parent_span_id=None, is_segment_span=True, project_id=1, + end_timestamp_precise=now, ), ] diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index a45347d1b13..946c9755a9e 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -123,6 +123,7 @@ def process_spans(spans: Sequence[Span | _SplitBatch], buffer: SpansBuffer, now) span_id="a" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"d" * 16), @@ -130,6 +131,7 @@ def process_spans(spans: Sequence[Span | _SplitBatch], buffer: SpansBuffer, now) span_id="d" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"c" * 16), @@ -137,6 +139,7 @@ def process_spans(spans: Sequence[Span | _SplitBatch], buffer: SpansBuffer, now) span_id="c" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"b" * 16), @@ -145,6 +148,7 @@ def process_spans(spans: Sequence[Span | _SplitBatch], buffer: SpansBuffer, now) parent_span_id=None, is_segment_span=True, project_id=1, + end_timestamp_precise=1700000000.0, ), ] ) @@ -188,6 +192,7 @@ def test_basic(buffer: SpansBuffer, spans): span_id="d" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), _SplitBatch(), Span( @@ -196,6 +201,7 @@ def test_basic(buffer: SpansBuffer, spans): span_id="b" * 16, parent_span_id="a" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"a" * 16), @@ -204,6 +210,7 @@ def test_basic(buffer: SpansBuffer, spans): parent_span_id=None, is_segment_span=True, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"c" * 16), @@ -211,6 +218,7 @@ def test_basic(buffer: SpansBuffer, spans): span_id="c" * 16, parent_span_id="a" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), ] ) @@ -254,6 +262,7 @@ def test_deep(buffer: SpansBuffer, spans): span_id="e" * 16, parent_span_id="d" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"d" * 16), @@ -261,6 +270,7 @@ def test_deep(buffer: SpansBuffer, spans): span_id="d" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"b" * 16), @@ -268,6 +278,7 @@ def test_deep(buffer: SpansBuffer, spans): span_id="b" * 16, parent_span_id="c" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"c" * 16), @@ -275,6 +286,7 @@ def test_deep(buffer: SpansBuffer, spans): span_id="c" * 16, parent_span_id="a" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"a" * 16), @@ -283,6 +295,7 @@ def test_deep(buffer: SpansBuffer, spans): parent_span_id=None, is_segment_span=True, project_id=1, + end_timestamp_precise=1700000000.0, ), ] ) @@ -327,6 +340,7 @@ def test_deep2(buffer: SpansBuffer, spans): span_id="c" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"d" * 16), @@ -334,6 +348,7 @@ def test_deep2(buffer: SpansBuffer, spans): span_id="d" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"e" * 16), @@ -341,6 +356,7 @@ def test_deep2(buffer: SpansBuffer, spans): span_id="e" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"b" * 16), @@ -349,6 +365,7 @@ def test_deep2(buffer: SpansBuffer, spans): parent_span_id=None, is_segment_span=True, project_id=2, + end_timestamp_precise=1700000000.0, ), ] ) @@ -400,6 +417,7 @@ def test_parent_in_other_project(buffer: SpansBuffer, spans): parent_span_id="d" * 16, project_id=1, is_segment_span=True, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"d" * 16), @@ -407,6 +425,7 @@ def test_parent_in_other_project(buffer: SpansBuffer, spans): span_id="d" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"e" * 16), @@ -414,6 +433,7 @@ def test_parent_in_other_project(buffer: SpansBuffer, spans): span_id="e" * 16, parent_span_id="b" * 16, project_id=1, + end_timestamp_precise=1700000000.0, ), Span( payload=_payload(b"b" * 16), @@ -422,6 +442,7 @@ def test_parent_in_other_project(buffer: SpansBuffer, spans): parent_span_id=None, is_segment_span=True, project_id=2, + end_timestamp_precise=1700000000.0, ), ] ), @@ -476,6 +497,7 @@ def test_flush_rebalance(buffer: SpansBuffer): parent_span_id=None, project_id=1, is_segment_span=True, + end_timestamp_precise=1700000000.0, ) ]