|
| 1 | +--- |
| 2 | +title: Telemetry Buffer |
| 3 | +sidebar_order: 35 |
| 4 | +--- |
| 5 | + |
| 6 | +## Telemetry Buffer Layer: Prioritized, Bounded, Rate-Aware Envelope Delivery |
| 7 | + |
| 8 | +### Current State |
| 9 | + |
| 10 | +The current transport implementation of most of the SDKs uses a simple FIFO queue that processes all envelope types with equal priority. As our SDKs collect more and more telemetry with different types, it may become a problem that critical telemetry (such as errors/crashes) get delayed and de-prioritized due to other telemetry (traces, replays, logs) occupying the queue. This is especially relevant with replays, logs and continuous profiling which we periodically flush to the queue. |
| 11 | + |
| 12 | +### Proposal |
| 13 | + |
| 14 | +Introduce a per-telemetry buffer layer between `Client` and `Transport` to: |
| 15 | +- Batch telemetry when protocol-safe (all types, not only logs) |
| 16 | +- Apply early rate limiting (don’t enqueue when limited) |
| 17 | +- Enforce bounded memory via fixed-capacity ring buffers |
| 18 | +- Prioritize delivery by telemetry criticality via weighted round-robin |
| 19 | +- Keep existing transport/offline cache semantics unchanged (might change) |
| 20 | +- (Stretch) Have a http connection per telemetry type (only backend SDKs) |
| 21 | + |
| 22 | +### Architecture Overview |
| 23 | + |
| 24 | +``` |
| 25 | +┌───────────────────────────────────────────────────────────────────────────┐ |
| 26 | +│ Client │ |
| 27 | +│ captureEvent / captureTransaction / captureReplay / captureLogs / ... │ |
| 28 | +└───────────────────────────────────────────────────────────────────────────┘ |
| 29 | + │ |
| 30 | + ▼ |
| 31 | +┌───────────────────────────────────────────────────────────────────────────┐ |
| 32 | +│ TelemetryBuffer │ |
| 33 | +│ - Holds per-category buffers │ |
| 34 | +│ - Early rate-limit check (shared RateLimiter) │ |
| 35 | +│ - Method-based submit to per-category buffers │ |
| 36 | +└───────────────────────────────────────────────────────────────────────────┘ |
| 37 | + │ |
| 38 | + ┌───────────────┼────────────────────────────────┐ |
| 39 | + ▼ ▼ ▼ |
| 40 | +┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐ |
| 41 | +│ Errors/Feedback │ │ Sessions/CheckIns │ │ Log │ |
| 42 | +│ (CRITICAL) │ │ (HIGH) │ │ (MEDIUM) │ |
| 43 | +│ RingBuffer + Batcher │ │ RingBuffer + Batcher │ │ RingBuffer + Batcher │ |
| 44 | +└───────────────────────┘ └───────────────────────┘ └───────────────────────┘ |
| 45 | + │ │ │ |
| 46 | + ▼ ▼ ▼ |
| 47 | +┌───────────────────────────────────────────────────────────────────────────┐ |
| 48 | +│ EnvelopeScheduler (Weighted RR) │ |
| 49 | +│ - Cross-buffer selection by priority (5..1) │ |
| 50 | +│ - Re-checks RateLimiter before send │ |
| 51 | +│ - Submits envelopes to transport │ |
| 52 | +└───────────────────────────────────────────────────────────────────────────┘ |
| 53 | + │ |
| 54 | + ▼ |
| 55 | +┌───────────────────────────────────────────────────────────────────────────┐ |
| 56 | +│ Transport (unchanged). │ |
| 57 | +│ - Single worker, disk cache, offline retry, client reports │ |
| 58 | +└───────────────────────────────────────────────────────────────────────────┘ |
| 59 | +``` |
| 60 | + |
| 61 | +### Priorities (TBD) |
| 62 | +- CRITICAL: Error, Feedback |
| 63 | +- HIGH: Session, CheckIn |
| 64 | +- MEDIUM: Log, ClientReport, Span |
| 65 | +- LOW: Transaction, Profile, ProfileChunk |
| 66 | +- LOWEST: Replay |
| 67 | + |
| 68 | +Configurable via weights. |
| 69 | + |
| 70 | +### Components |
| 71 | + |
| 72 | +- **Client** |
| 73 | + - Owns per-category buffers and is the single entry for all capture paths. |
| 74 | + - Consults `RateLimiter` early; on active rate limit do not enqueue and record `DiscardReason.RATELIMIT_BACKOFF`. |
| 75 | + - Submits items from capture methods to the matching per-category buffer. |
| 76 | + |
| 77 | +- **TelemetryBuffer\<T\>** (per DataCategory) |
| 78 | + - Fixed-capacity ring buffer (bounded memory). |
| 79 | + - Stores raw items (pre-envelope). |
| 80 | + - Type-aware batching policy (size and/or time). Examples: |
| 81 | + - Errors/Feedback/Sessions/CheckIns: typically single-item; allow small batch if protocol-safe. |
| 82 | + - Logs: size/time-based (reuse semantics of `LoggerBatchProcessor`). |
| 83 | + - Spans: trace-based. |
| 84 | + - Transactions/Profiles/Replay: default single-item; |
| 85 | + - Overflow policy: drop-oldest (default). Record `DiscardReason.QUEUE_OVERFLOW`. |
| 86 | + |
| 87 | +- **EnvelopeScheduler** |
| 88 | + - Single worker; weighted round-robin across priorities: weights 5,4,3,2,1. |
| 89 | + - Pulls ready batches from buffers; builds envelopes from batches; re-checks `RateLimiter` at send-time. |
| 90 | + - Submits envelopes to `ITransport`. If transport unhealthy or recently rejected, backs off briefly. |
| 91 | + |
| 92 | +- **Rate Limiting** |
| 93 | + - Shared `RateLimiter` is the source of truth. |
| 94 | + - Checked both at buffer ingress (to avoid queueing) and at egress (to avoid sending). |
| 95 | + |
| 96 | +- **Transport and Offline Cache** |
| 97 | + - Unchanged. Disk caching, retry semantics, client report recording remain in transport. |
| 98 | + - Buffers are in-memory only (for now). |
| 99 | + |
| 100 | +### Configuration (new options) |
| 101 | +- `bufferCapacityByCategory`: map\<DataCategory, int\> (defaults tuned per volume) |
| 102 | +- `priorityWeights`: CRITICAL..LOWEST (default 5,4,3,2,1) |
| 103 | +- `overflowPolicy`: `drop_oldest` | `drop_newest` (default `drop_oldest`) |
| 104 | +- `preemptLowerPriorityForCritical`: boolean (default false) |
| 105 | +- `scheduler`: |
| 106 | + - `backoffMsOnTransportUnhealthy` (e.g., 250–1000 ms for backpressure) |
| 107 | + - `maxQueueSize` (soft cap; default derived from transport queue) |
| 108 | + |
| 109 | +### Pseudocode (Kotlin-ish) |
| 110 | + |
| 111 | +```kotlin |
| 112 | +enum class EnvelopePriority(val weight: Int) { |
| 113 | + CRITICAL(5), HIGH(4), MEDIUM(3), LOW(2), LOWEST(1) |
| 114 | +} |
| 115 | + |
| 116 | +interface TelemetryBuffer<T> { |
| 117 | + val category: DataCategory |
| 118 | + val priority: EnvelopePriority |
| 119 | + fun offer(item: T): OfferResult // may drop; record client report |
| 120 | + fun nextBatchReady(nowMs: Long): Boolean |
| 121 | + fun drainBatch(maxItems: Int, nowMs: Long): List<T> |
| 122 | + fun size(): Int |
| 123 | +} |
| 124 | + |
| 125 | +class Client( |
| 126 | + private val buffers: Map<DataCategory, TelemetryBuffer<Any>>, |
| 127 | + private val rateLimiter: RateLimiter, |
| 128 | + private val clientReports: ClientReportRecorder |
| 129 | +) { |
| 130 | + fun captureEvent(event: Any) = submit(DataCategory.Error, event) |
| 131 | + fun captureTransaction(tx: Any) = submit(DataCategory.Transaction, tx) |
| 132 | + fun captureReplay(replay: Any) = submit(DataCategory.Replay, replay) |
| 133 | + fun captureLog(log: Any) = submit(DataCategory.Log, log) |
| 134 | + // ... other capture methods ... |
| 135 | + |
| 136 | + private fun submit(category: DataCategory, item: Any) { |
| 137 | + if (rateLimiter.isRateLimitActive(category)) { |
| 138 | + clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, category) |
| 139 | + return |
| 140 | + } |
| 141 | + val res = buffers[category]?.offer(item) |
| 142 | + if (res is OfferResult.Dropped) { |
| 143 | + clientReports.recordLostEvent(DiscardReason.QUEUE_OVERFLOW, category, res.count) |
| 144 | + } |
| 145 | + } |
| 146 | +} |
| 147 | + |
| 148 | +class EnvelopeScheduler( |
| 149 | + private val buffersByPriority: Map<EnvelopePriority, List<TelemetryBuffer<*>>, |
| 150 | + private val transport: ITransport, |
| 151 | + private val rateLimiter: RateLimiter, |
| 152 | + private val clientReports: ClientReportRecorder, |
| 153 | + private val weights: Map<EnvelopePriority, Int>, |
| 154 | + private val backoffMs: Long |
| 155 | +) : Thread("TelemetryEnvelopeScheduler") { |
| 156 | + override fun run() { |
| 157 | + val order = generatePriorityCycle(weights) // e.g., [CRITICAL×5, HIGH×4, ...] |
| 158 | + while (true) { |
| 159 | + var sentSomething = false |
| 160 | + for (p in order) { |
| 161 | + val buf = selectReadyBuffer(buffersByPriority[p]) |
| 162 | + if (buf != null) { |
| 163 | + val cat = buf.category |
| 164 | + val batch = buf.drainBatch(maxItemsFor(cat), nowMs()) |
| 165 | + if (batch.isNotEmpty()) { |
| 166 | + if (!rateLimiter.isRateLimitActive(cat)) { |
| 167 | + val envelopes = buildEnvelopes(cat, batch) |
| 168 | + for (env in envelopes) { |
| 169 | + transport.send(env) |
| 170 | + } |
| 171 | + sentSomething = true |
| 172 | + } else { |
| 173 | + clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, cat, batch.size) |
| 174 | + } |
| 175 | + } |
| 176 | + } |
| 177 | + } |
| 178 | + if (!sentSomething) sleep(backoffMs) |
| 179 | + } |
| 180 | + } |
| 181 | + |
| 182 | + private fun buildEnvelopes(category: DataCategory, batch: List<Any>): List<Envelope> { |
| 183 | + // Applies SDK version, trace context if provided, and size constraints |
| 184 | + // Returns one or more envelopes constructed from the batch |
| 185 | + TODO() |
| 186 | + } |
| 187 | +} |
| 188 | +``` |
| 189 | + |
| 190 | +Ring buffer semantics (per buffer): |
| 191 | +- Fixed capacity N; on offer when full: |
| 192 | + - `drop_oldest`: evict 1 from head, enqueue new; record queue overflow client report |
| 193 | + - `drop_newest`: reject incoming; record queue overflow client report |
| 194 | + |
| 195 | +Batching policy examples: |
| 196 | +- Logs: batch up to 100 items or 5s; split if envelope size limit reached |
| 197 | +- Spans: batch per trace |
| 198 | +- Errors/Feedback: batch size 1 (default) |
| 199 | +- Sessions/CheckIns: small batches if safe (e.g., 10 or 1s) |
| 200 | +- Transactions/Profiles/Replay: default 1 |
| 201 | + |
| 202 | +### Observability |
| 203 | + |
| 204 | +- Counters per category: enqueued, sent, queue_overflow, rate_limited |
| 205 | + - Create a dashboard per category with reasons for dropped events |
| 206 | +- Health: include buffer health in `Client.isHealthy()` alongside transport |
| 207 | + |
| 208 | +### Defaults and Safety |
| 209 | + |
| 210 | +- Enabled by default with conservative capacities and weights |
| 211 | +- No change to envelope format, transport, or disk caching |
| 212 | + |
| 213 | +### Open Questions |
| 214 | + |
| 215 | +- Default capacities per category (especially logs/replay vs. critical) |
| 216 | +- Category weights (logs and spans will be high volume, so we might want to send them more often) |
| 217 | +- Safe batching across categories beyond logs/client reports |
| 218 | + - Shall we adapt Relay to accept multiple "top-level" items (like errors or transactions) in a single envelope? |
| 219 | +- Multiple http connections per telemetry type |
0 commit comments