diff --git a/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx b/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx index ef745a05f0973..904723de7cf4f 100644 --- a/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx +++ b/develop-docs/sdk/miscellaneous/telemetry-buffer.mdx @@ -5,215 +5,332 @@ sidebar_order: 35 ## Telemetry Buffer Layer: Prioritized, Bounded, Rate-Aware Envelope Delivery -### Current State +### Overview -The current transport implementation of most of the SDKs uses a simple FIFO queue that processes all envelope types with equal priority. As our SDKs collect more and more telemetry with different types, it may become a problem that critical telemetry (such as errors/crashes) get delayed and de-prioritized due to other telemetry (traces, replays, logs) occupying the queue. This is especially relevant with replays, logs and continuous profiling which we periodically flush to the queue. +The buffer system sits between the SDK client and the HTTP transport layer, ensuring that critical telemetry like errors take priority over high-volume data like logs and traces. This prevents important events from getting lost when your application is under heavy load or sending large amounts of telemetry. -### Proposal +### Motivation -Introduce a per-telemetry buffer layer between `Client` and `Transport` to: -- Batch telemetry when protocol-safe (all types, not only logs) -- Apply early rate limiting (don’t enqueue when limited) -- Enforce bounded memory via fixed-capacity ring buffers -- Prioritize delivery by telemetry criticality via weighted round-robin -- Keep existing transport/offline cache semantics unchanged (might change) -- (Stretch) Have a http connection per telemetry type (only backend SDKs) +- Aggregation lives in a unified buffer layer (this way we avoid creating multiple batch processors for different telemetry types). +- All telemetry types use capture APIs (CaptureX) routed through the Client. +- Rate-limit awareness is built-in across categories. +- Buffers support two modes: normal ring buffer and bucket-by-trace (for spans). +- For spans, dropping an entire trace under pressure is preferable. ### Architecture Overview +Introduce a `Buffer` layer between the `Client` and the `Transport`. This `Buffer` wraps prioritization and scheduling and exposes a minimal API to the SDK: + +- Add(item). +- Flush(timeout). +- Close(timeout). + ``` -┌───────────────────────────────────────────────────────────────────────────┐ -│ Client │ -│ captureEvent / captureTransaction / captureReplay / captureLogs / ... │ -└───────────────────────────────────────────────────────────────────────────┘ - │ - ▼ -┌───────────────────────────────────────────────────────────────────────────┐ -│ TelemetryBuffer │ -│ - Holds per-category buffers │ -│ - Early rate-limit check (shared RateLimiter) │ -│ - Method-based submit to per-category buffers │ -└───────────────────────────────────────────────────────────────────────────┘ - │ - ┌───────────────┼────────────────────────────────┐ - ▼ ▼ ▼ -┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐ -│ Errors/Feedback │ │ Sessions/CheckIns │ │ Log │ -│ (CRITICAL) │ │ (HIGH) │ │ (MEDIUM) │ -│ RingBuffer + Batcher │ │ RingBuffer + Batcher │ │ RingBuffer + Batcher │ -└───────────────────────┘ └───────────────────────┘ └───────────────────────┘ - │ │ │ - ▼ ▼ ▼ -┌───────────────────────────────────────────────────────────────────────────┐ -│ EnvelopeScheduler (Weighted RR) │ -│ - Cross-buffer selection by priority (5..1) │ -│ - Re-checks RateLimiter before send │ -│ - Submits envelopes to transport │ -└───────────────────────────────────────────────────────────────────────────┘ - │ - ▼ -┌───────────────────────────────────────────────────────────────────────────┐ -│ Transport (unchanged). │ -│ - Single worker, disk cache, offline retry, client reports │ -└───────────────────────────────────────────────────────────────────────────┘ +┌────────────────────────────────────────────────────────────────────────────┐ +│ Client │ +│ captureEvent / captureTransaction / captureCheckIn / captureLog │ +└────────────────────────────────────────────────────────────────────────────┘ + + ▼ +┌────────────────────────────────────────────────────────────────────────────┐ +│ Buffer │ +│ Add(item) · Flush(timeout) · Close(timeout) │ +│ │ +│ ┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐ │ +│ │ Error Store │ │ Check-in Store │ │ Log Store │ │ +│ │ (CRITICAL) │ │ (HIGH) │ │ (MEDIUM) │ │ +│ │ Timeout: N/A │ │ Timeout: N/A │ │ Timeout: 5s │ │ +│ │ BatchSize: 1 │ │ BatchSize: 1 │ │ BatchSize: 100 │ │ +│ └──────────────────────┘ └──────────────────────┘ └──────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────────────┐ │ +│ │ Scheduler (Weighted Round-Robin) │ │ +│ │ - Priority weights: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1 │ │ +│ │ - Processes a batch of items based on BatchSize and/or Timeout │ │ +│ │ - Builds envelopes from batch │ │ +│ │ - Submits envelopes to transport │ │ +│ └─────────────────────────────────────────────────────────────────────┘ │ +└────────────────────────────────────────────────────────────────────────────┘ + + ▼ +┌────────────────────────────────────────────────────────────────────────────┐ +│ Transport │ +│ - Single worker, disk cache, offline retry, client reports │ +└────────────────────────────────────────────────────────────────────────────┘ ``` -### Priorities (TBD) -- CRITICAL: Error, Feedback -- HIGH: Session, CheckIn -- MEDIUM: Log, ClientReport, Span -- LOW: Transaction, Profile, ProfileChunk -- LOWEST: Replay +#### How the Buffer works + +- **Smart batching**: Logs are batched into single requests; errors, transactions, and monitors are sent immediately. +- **Pre-send rate limiting**: The scheduler checks rate limits before dispatching, avoiding unnecessary requests while keeping items buffered. +- **Category isolation**: Separate ring buffers for each telemetry type prevent head-of-line blocking. +- **Weighted scheduling**: High-priority telemetry gets sent more frequently via round-robin selection. +- **Transport compatibility**: Works with existing HTTP transport implementations without modification. + +### Priorities +- CRITICAL: Error, Feedback. +- HIGH: Session, CheckIn. +- MEDIUM: Log, ClientReport, Span. +- LOW: Transaction, Profile, ProfileChunk. +- LOWEST: Replay. Configurable via weights. ### Components -- **Client** - - Owns per-category buffers and is the single entry for all capture paths. - - Consults `RateLimiter` early; on active rate limit do not enqueue and record `DiscardReason.RATELIMIT_BACKOFF`. - - Submits items from capture methods to the matching per-category buffer. - -- **TelemetryBuffer\** (per DataCategory) - - Fixed-capacity ring buffer (bounded memory). - - Stores raw items (pre-envelope). - - Type-aware batching policy (size and/or time). Examples: - - Errors/Feedback/Sessions/CheckIns: typically single-item; allow small batch if protocol-safe. - - Logs: size/time-based (reuse semantics of `LoggerBatchProcessor`). - - Spans: trace-based. - - Transactions/Profiles/Replay: default single-item; - - Overflow policy: drop-oldest (default). Record `DiscardReason.QUEUE_OVERFLOW`. - -- **EnvelopeScheduler** - - Single worker; weighted round-robin across priorities: weights 5,4,3,2,1. - - Pulls ready batches from buffers; builds envelopes from batches; re-checks `RateLimiter` at send-time. - - Submits envelopes to `ITransport`. If transport unhealthy or recently rejected, backs off briefly. - -- **Rate Limiting** - - Shared `RateLimiter` is the source of truth. - - Checked both at buffer ingress (to avoid queueing) and at egress (to avoid sending). - -- **Transport and Offline Cache** - - Unchanged. Disk caching, retry semantics, client report recording remain in transport. - - Buffers are in-memory only (for now). - -### Configuration (new options) -- `bufferCapacityByCategory`: map\ (defaults tuned per volume) -- `priorityWeights`: CRITICAL..LOWEST (default 5,4,3,2,1) -- `overflowPolicy`: `drop_oldest` | `drop_newest` (default `drop_oldest`) -- `preemptLowerPriorityForCritical`: boolean (default false) -- `scheduler`: - - `backoffMsOnTransportUnhealthy` (e.g., 250–1000 ms for backpressure) - - `maxQueueSize` (soft cap; default derived from transport queue) - -### Pseudocode (Kotlin-ish) - -```kotlin -enum class EnvelopePriority(val weight: Int) { - CRITICAL(5), HIGH(4), MEDIUM(3), LOW(2), LOWEST(1) +#### Storage + +Each telemetry category maintains a store interface; a fixed-size circular array/ring buffer (not to be confused with the `Buffer` wrapper) that stores items before transmission: + +- **Bounded capacity**: Default to 100 items for errors, logs, and monitors; 1000 for transactions. This prevents unbounded memory growth regardless of telemetry volume and backpressure handling. +- **Overflow policies**: + - `drop_oldest` (default): Evicts the oldest item when the buffer is full, making room for new data. + - `drop_newest`: Rejects incoming items when full, preserving what's already queued. +- **Batching configuration**: + - `batchSize`: Number of items to combine into a single batch (1 for errors, transactions, and monitors; 100 for logs). + - `timeout`: Maximum time to wait before sending a partial batch (5 seconds for logs). +- **Bucketed Storage Support**: The storage interface should satisfy both bucketed and single-item implementations, allowing sending spans per trace id. +- **Observability**: Each store tracks offered, accepted, and dropped item counts for client reports. + +##### Single-item ring buffer (default) + +- Data structure: fixed-size circular array with head/tail indices; O(1) `Offer`/`Poll`. +- Offer semantics: if not full, append; when full, apply `overflowPolicy`: + - `drop_oldest`: evict the oldest item, insert the new one, and invoke the dropped callback with reason `buffer_full_drop_oldest`. + - `drop_newest`: reject the new item and invoke the dropped callback with reason `buffer_full_drop_newest`. +- Readiness: a store is ready when `size >= batchSize` or when `timeout` has elapsed since `lastFlushTime` (and it is non-empty). +- Polling: `PollIfReady()` returns up to `batchSize` items and updates `lastFlushTime`; `Drain()` empties the store. + +##### Bucketed-by-trace storage (spans) + +- Purpose: keep spans from the same trace together and flush them as a unit to avoid partial-trace delivery under pressure. +- Grouping: a new bucket is created per trace id; a map (`traceIndex`) provides O(1) lookup. Items without a trace id are accepted but grouped without an index. +- Capacity model: two limits are enforced—overall `itemCapacity` and a derived `bucketCapacity ~= capacity/10` (minimum 10). Additionally, a `perBucketItemLimit` (100) prevents a single trace from monopolizing storage. +- Readiness: when total buffered items reach `batchSize` or `timeout` elapses, the entire oldest bucket is flushed to preserve trace coherence. +- Overflow behavior: + - `drop_oldest`: evict the oldest bucket (dropping all its items) and invoke the dropped callback for each (`buffer_full_drop_oldest_bucket`). Preferred for spans to drop an entire trace. + - `drop_newest`: reject the incoming item (`buffer_full_drop_newest`). +- Lifecycle: empty buckets are removed and their trace ids are purged from the index; `MarkFlushed()` updates `lastFlushTime`. + +Stores are mapped to [DataCategories](https://github.com/getsentry/relay/blob/master/relay-base-schema/src/data_category.rs), which determine their scheduling priority and rate limits. + +#### Scheduler + +The scheduler runs as a background worker, coordinating the flow of telemetry from storage to the transport: + +- **Initialization**: Constructs a weighted priority cycle (e.g., `[CRITICAL×5, HIGH×4, MEDIUM×3, ...]`) based on configured weights. +- **Event loop**: Wakes when explicitly signaled from the `captureX` methods on the client when new data is available (if the language does not support this, then a periodic ticker can be used). +- **Buffer selection**: Iterates through the priority cycle, selecting buffers that are ready to flush and not rate limited. +- **Rate limit coordination**: Queries the transport's rate limit state before attempting to send any category. +- **Envelope construction**: Converts buffered items into Sentry protocol envelopes. + - Log items are batched together into a single envelope with multiple log entries. + - Other categories typically send one item per envelope. +- **Graceful shutdown**: During client shutdown, force-drains all buffers to prevent data loss. + +#### Transport + +The transport layer handles HTTP communication with Sentry's ingestion endpoints: + +### Configuration + +#### Buffer Options +- **Capacity**: 100 items for errors, logs, and monitors; 1000 for transactions. +- **Overflow policy**: `drop_oldest`. +- **Batch size**: 1 for errors and monitors (immediate send), 100 for logs. +- **Batch timeout**: 5 seconds for logs. + +#### Scheduler Options +- **Priority weights**: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1. + +#### Transport Options +- **Queue size**: 1000 envelopes for AsyncTransport. +- **HTTP timeout**: 30 seconds. + +### Implementation Example (Go) + +The `sentry-go` SDK provides a reference implementation of this architecture: + +#### Storage Interface + +```go +type Storage[T any] interface { + // Core operations + Offer(item T) bool + Poll() (T, bool) + PollBatch(maxItems int) []T + PollIfReady() []T + Drain() []T + Peek() (T, bool) + + // State queries + Size() int + Capacity() int + IsEmpty() bool + IsFull() bool + Utilization() float64 + + // Flush management + IsReadyToFlush() bool + MarkFlushed() + + // Category/Priority + Category() ratelimit.Category + Priority() ratelimit.Priority + + // Metrics + OfferedCount() int64 + DroppedCount() int64 + AcceptedCount() int64 + DropRate() float64 + GetMetrics() BufferMetrics + + // Configuration + SetDroppedCallback(callback func(item T, reason string)) + Clear() } -interface TelemetryBuffer { - val category: DataCategory - val priority: EnvelopePriority - fun offer(item: T): OfferResult // may drop; record client report - fun nextBatchReady(nowMs: Long): Boolean - fun drainBatch(maxItems: Int, nowMs: Long): List - fun size(): Int + +// Single item store +func (b *RingBuffer[T]) PollIfReady() []T { + b.mu.Lock() + defer b.mu.Unlock() + + if b.size == 0 { + return nil + } + + ready := b.size >= b.batchSize || + (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout) + + if !ready { + return nil + } + + itemCount := b.batchSize + if itemCount > b.size { + itemCount = b.size + } + + result := make([]T, itemCount) + var zero T + + for i := 0; i < itemCount; i++ { + result[i] = b.items[b.head] + b.items[b.head] = zero + b.head = (b.head + 1) % b.capacity + b.size-- + } + + b.lastFlushTime = time.Now() + return result } -class Client( - private val buffers: Map>, - private val rateLimiter: RateLimiter, - private val clientReports: ClientReportRecorder -) { - fun captureEvent(event: Any) = submit(DataCategory.Error, event) - fun captureTransaction(tx: Any) = submit(DataCategory.Transaction, tx) - fun captureReplay(replay: Any) = submit(DataCategory.Replay, replay) - fun captureLog(log: Any) = submit(DataCategory.Log, log) - // ... other capture methods ... - - private fun submit(category: DataCategory, item: Any) { - if (rateLimiter.isRateLimitActive(category)) { - clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, category) - return - } - val res = buffers[category]?.offer(item) - if (res is OfferResult.Dropped) { - clientReports.recordLostEvent(DiscardReason.QUEUE_OVERFLOW, category, res.count) - } - } +// Bucketed store +func (b *BucketedBuffer[T]) PollIfReady() []T { + b.mu.Lock() + defer b.mu.Unlock() + if b.bucketCount == 0 { + return nil + } + // the batchSize is satisfied based on total items + ready := b.totalItems >= b.batchSize || (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout) + if !ready { + return nil + } + // keep track of oldest bucket + oldest := b.buckets[b.head] + if oldest == nil { + return nil + } + items := oldest.items + if oldest.traceID != "" { + delete(b.traceIndex, oldest.traceID) + } + b.buckets[b.head] = nil + b.head = (b.head + 1) % b.bucketCapacity + b.totalItems -= len(items) + b.bucketCount-- + b.lastFlushTime = time.Now() + return items } -class EnvelopeScheduler( - private val buffersByPriority: Map>, - private val transport: ITransport, - private val rateLimiter: RateLimiter, - private val clientReports: ClientReportRecorder, - private val weights: Map, - private val backoffMs: Long -) : Thread("TelemetryEnvelopeScheduler") { - override fun run() { - val order = generatePriorityCycle(weights) // e.g., [CRITICAL×5, HIGH×4, ...] - while (true) { - var sentSomething = false - for (p in order) { - val buf = selectReadyBuffer(buffersByPriority[p]) - if (buf != null) { - val cat = buf.category - val batch = buf.drainBatch(maxItemsFor(cat), nowMs()) - if (batch.isNotEmpty()) { - if (!rateLimiter.isRateLimitActive(cat)) { - val envelopes = buildEnvelopes(cat, batch) - for (env in envelopes) { - transport.send(env) - } - sentSomething = true - } else { - clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, cat, batch.size) - } - } +``` + +#### Scheduler Processing + +```go +func (s *Scheduler) processNextBatch() { + // Select priority from weighted cycle + priority := s.currentCycle[s.cyclePos] + s.cyclePos = (s.cyclePos + 1) % len(s.currentCycle) + + // Find ready buffer for this priority + for category, buffer := range s.buffers { + if buffer.Priority() == priority && + !s.transport.IsRateLimited(category) && + buffer.IsReadyToFlush() { + items := buffer.PollIfReady() + s.sendItems(category, items) + // only process one batch per tick + break } - } - if (!sentSomething) sleep(backoffMs) } - } +} +``` + + +#### Flushing + +```go +func (s *Scheduler) flush() { + // should process all store buffers and send to transport + for category, buffer := range s.buffers { + if !buffer.IsEmpty() { + s.processItems(buffer, category, true) + } + } +} - private fun buildEnvelopes(category: DataCategory, batch: List): List { - // Applies SDK version, trace context if provided, and size constraints - // Returns one or more envelopes constructed from the batch - TODO() - } +// The Buffer exposes the flush method that calls both +func (b *Buffer) Flush(timeout time.Time) { + scheduler.flush() + transport.flush(timeout) } ``` -Ring buffer semantics (per buffer): -- Fixed capacity N; on offer when full: - - `drop_oldest`: evict 1 from head, enqueue new; record queue overflow client report - - `drop_newest`: reject incoming; record queue overflow client report +### Batching Policies -Batching policy examples: -- Logs: batch up to 100 items or 5s; split if envelope size limit reached -- Spans: batch per trace -- Errors/Feedback: batch size 1 (default) -- Sessions/CheckIns: small batches if safe (e.g., 10 or 1s) -- Transactions/Profiles/Replay: default 1 +Different telemetry types use batching strategies optimized for their characteristics: -### Observability +- **Errors**: Single-item envelopes for immediate delivery (latency-sensitive). +- **Monitors**: Single-item envelopes to maintain check-in timing accuracy. +- **Logs**: Batches of up to 100 items or 5-second timeout, whichever comes first (volume-optimized). +- **Transactions**: Single-item envelopes (trace-aware batching is a future enhancement). + +#### Batch Processing Details + +For high-volume telemetry like logs, the buffer uses time and count-based batching: -- Counters per category: enqueued, sent, queue_overflow, rate_limited - - Create a dashboard per category with reasons for dropped events -- Health: include buffer health in `Client.isHealthy()` alongside transport +**Timeout-based flushing**: +- When the first item enters an empty log buffer, a timeout starts (5 seconds). +- When the timeout expires, all buffered log items are sent regardless of batch size. +- The timeout resets after each flush. -### Defaults and Safety +**Count-based flushing**: +- When the number of buffered log items reaches the batch size (100), they are sent immediately. + +**Ordering and lifecycle**: +- Filtering and sampling happen before buffering to avoid wasting buffer space. +- Rate limiting is checked before dispatch; if limited, items remain buffered. +- Items are batched into a single envelope with multiple entries of the same type (logs). + +### Observability -- Enabled by default with conservative capacities and weights -- No change to envelope format, transport, or disk caching +The buffer system exposes metrics to help you understand telemetry flow and identify issues: -### Open Questions +- **Per-category counters**: Items offered, sent successfully, and dropped. +- **Drop reasons**: Distinguish between buffer overflow and rate limit drops. +- **Buffer utilization**: Current size vs. capacity for each category. -- Default capacities per category (especially logs/replay vs. critical) -- Category weights (logs and spans will be high volume, so we might want to send them more often) -- Safe batching across categories beyond logs/client reports - - Shall we adapt Relay to accept multiple "top-level" items (like errors or transactions) in a single envelope? -- Multiple http connections per telemetry type +These metrics enable dashboards that visualize why events are being dropped, helping you tune buffer sizes or identify rate limiting issues.