Skip to content

Commit c28c9b5

Browse files
authored
feat(develop): Telemetry Buffers (#14640)
1 parent c130359 commit c28c9b5

File tree

1 file changed

+219
-0
lines changed

1 file changed

+219
-0
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
---
2+
title: Telemetry Buffer
3+
sidebar_order: 35
4+
---
5+
6+
## Telemetry Buffer Layer: Prioritized, Bounded, Rate-Aware Envelope Delivery
7+
8+
### Current State
9+
10+
The current transport implementation of most of the SDKs uses a simple FIFO queue that processes all envelope types with equal priority. As our SDKs collect more and more telemetry with different types, it may become a problem that critical telemetry (such as errors/crashes) get delayed and de-prioritized due to other telemetry (traces, replays, logs) occupying the queue. This is especially relevant with replays, logs and continuous profiling which we periodically flush to the queue.
11+
12+
### Proposal
13+
14+
Introduce a per-telemetry buffer layer between `Client` and `Transport` to:
15+
- Batch telemetry when protocol-safe (all types, not only logs)
16+
- Apply early rate limiting (don’t enqueue when limited)
17+
- Enforce bounded memory via fixed-capacity ring buffers
18+
- Prioritize delivery by telemetry criticality via weighted round-robin
19+
- Keep existing transport/offline cache semantics unchanged (might change)
20+
- (Stretch) Have a http connection per telemetry type (only backend SDKs)
21+
22+
### Architecture Overview
23+
24+
```
25+
┌───────────────────────────────────────────────────────────────────────────┐
26+
│ Client │
27+
│ captureEvent / captureTransaction / captureReplay / captureLogs / ... │
28+
└───────────────────────────────────────────────────────────────────────────┘
29+
30+
31+
┌───────────────────────────────────────────────────────────────────────────┐
32+
│ TelemetryBuffer │
33+
│ - Holds per-category buffers │
34+
│ - Early rate-limit check (shared RateLimiter) │
35+
│ - Method-based submit to per-category buffers │
36+
└───────────────────────────────────────────────────────────────────────────┘
37+
38+
┌───────────────┼────────────────────────────────┐
39+
▼ ▼ ▼
40+
┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐
41+
│ Errors/Feedback │ │ Sessions/CheckIns │ │ Log │
42+
│ (CRITICAL) │ │ (HIGH) │ │ (MEDIUM) │
43+
│ RingBuffer + Batcher │ │ RingBuffer + Batcher │ │ RingBuffer + Batcher │
44+
└───────────────────────┘ └───────────────────────┘ └───────────────────────┘
45+
│ │ │
46+
▼ ▼ ▼
47+
┌───────────────────────────────────────────────────────────────────────────┐
48+
│ EnvelopeScheduler (Weighted RR) │
49+
│ - Cross-buffer selection by priority (5..1) │
50+
│ - Re-checks RateLimiter before send │
51+
│ - Submits envelopes to transport │
52+
└───────────────────────────────────────────────────────────────────────────┘
53+
54+
55+
┌───────────────────────────────────────────────────────────────────────────┐
56+
│ Transport (unchanged). │
57+
│ - Single worker, disk cache, offline retry, client reports │
58+
└───────────────────────────────────────────────────────────────────────────┘
59+
```
60+
61+
### Priorities (TBD)
62+
- CRITICAL: Error, Feedback
63+
- HIGH: Session, CheckIn
64+
- MEDIUM: Log, ClientReport, Span
65+
- LOW: Transaction, Profile, ProfileChunk
66+
- LOWEST: Replay
67+
68+
Configurable via weights.
69+
70+
### Components
71+
72+
- **Client**
73+
- Owns per-category buffers and is the single entry for all capture paths.
74+
- Consults `RateLimiter` early; on active rate limit do not enqueue and record `DiscardReason.RATELIMIT_BACKOFF`.
75+
- Submits items from capture methods to the matching per-category buffer.
76+
77+
- **TelemetryBuffer\<T\>** (per DataCategory)
78+
- Fixed-capacity ring buffer (bounded memory).
79+
- Stores raw items (pre-envelope).
80+
- Type-aware batching policy (size and/or time). Examples:
81+
- Errors/Feedback/Sessions/CheckIns: typically single-item; allow small batch if protocol-safe.
82+
- Logs: size/time-based (reuse semantics of `LoggerBatchProcessor`).
83+
- Spans: trace-based.
84+
- Transactions/Profiles/Replay: default single-item;
85+
- Overflow policy: drop-oldest (default). Record `DiscardReason.QUEUE_OVERFLOW`.
86+
87+
- **EnvelopeScheduler**
88+
- Single worker; weighted round-robin across priorities: weights 5,4,3,2,1.
89+
- Pulls ready batches from buffers; builds envelopes from batches; re-checks `RateLimiter` at send-time.
90+
- Submits envelopes to `ITransport`. If transport unhealthy or recently rejected, backs off briefly.
91+
92+
- **Rate Limiting**
93+
- Shared `RateLimiter` is the source of truth.
94+
- Checked both at buffer ingress (to avoid queueing) and at egress (to avoid sending).
95+
96+
- **Transport and Offline Cache**
97+
- Unchanged. Disk caching, retry semantics, client report recording remain in transport.
98+
- Buffers are in-memory only (for now).
99+
100+
### Configuration (new options)
101+
- `bufferCapacityByCategory`: map\<DataCategory, int\> (defaults tuned per volume)
102+
- `priorityWeights`: CRITICAL..LOWEST (default 5,4,3,2,1)
103+
- `overflowPolicy`: `drop_oldest` | `drop_newest` (default `drop_oldest`)
104+
- `preemptLowerPriorityForCritical`: boolean (default false)
105+
- `scheduler`:
106+
- `backoffMsOnTransportUnhealthy` (e.g., 250–1000 ms for backpressure)
107+
- `maxQueueSize` (soft cap; default derived from transport queue)
108+
109+
### Pseudocode (Kotlin-ish)
110+
111+
```kotlin
112+
enum class EnvelopePriority(val weight: Int) {
113+
CRITICAL(5), HIGH(4), MEDIUM(3), LOW(2), LOWEST(1)
114+
}
115+
116+
interface TelemetryBuffer<T> {
117+
val category: DataCategory
118+
val priority: EnvelopePriority
119+
fun offer(item: T): OfferResult // may drop; record client report
120+
fun nextBatchReady(nowMs: Long): Boolean
121+
fun drainBatch(maxItems: Int, nowMs: Long): List<T>
122+
fun size(): Int
123+
}
124+
125+
class Client(
126+
private val buffers: Map<DataCategory, TelemetryBuffer<Any>>,
127+
private val rateLimiter: RateLimiter,
128+
private val clientReports: ClientReportRecorder
129+
) {
130+
fun captureEvent(event: Any) = submit(DataCategory.Error, event)
131+
fun captureTransaction(tx: Any) = submit(DataCategory.Transaction, tx)
132+
fun captureReplay(replay: Any) = submit(DataCategory.Replay, replay)
133+
fun captureLog(log: Any) = submit(DataCategory.Log, log)
134+
// ... other capture methods ...
135+
136+
private fun submit(category: DataCategory, item: Any) {
137+
if (rateLimiter.isRateLimitActive(category)) {
138+
clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, category)
139+
return
140+
}
141+
val res = buffers[category]?.offer(item)
142+
if (res is OfferResult.Dropped) {
143+
clientReports.recordLostEvent(DiscardReason.QUEUE_OVERFLOW, category, res.count)
144+
}
145+
}
146+
}
147+
148+
class EnvelopeScheduler(
149+
private val buffersByPriority: Map<EnvelopePriority, List<TelemetryBuffer<*>>,
150+
private val transport: ITransport,
151+
private val rateLimiter: RateLimiter,
152+
private val clientReports: ClientReportRecorder,
153+
private val weights: Map<EnvelopePriority, Int>,
154+
private val backoffMs: Long
155+
) : Thread("TelemetryEnvelopeScheduler") {
156+
override fun run() {
157+
val order = generatePriorityCycle(weights) // e.g., [CRITICAL×5, HIGH×4, ...]
158+
while (true) {
159+
var sentSomething = false
160+
for (p in order) {
161+
val buf = selectReadyBuffer(buffersByPriority[p])
162+
if (buf != null) {
163+
val cat = buf.category
164+
val batch = buf.drainBatch(maxItemsFor(cat), nowMs())
165+
if (batch.isNotEmpty()) {
166+
if (!rateLimiter.isRateLimitActive(cat)) {
167+
val envelopes = buildEnvelopes(cat, batch)
168+
for (env in envelopes) {
169+
transport.send(env)
170+
}
171+
sentSomething = true
172+
} else {
173+
clientReports.recordLostEvent(DiscardReason.RATELIMIT_BACKOFF, cat, batch.size)
174+
}
175+
}
176+
}
177+
}
178+
if (!sentSomething) sleep(backoffMs)
179+
}
180+
}
181+
182+
private fun buildEnvelopes(category: DataCategory, batch: List<Any>): List<Envelope> {
183+
// Applies SDK version, trace context if provided, and size constraints
184+
// Returns one or more envelopes constructed from the batch
185+
TODO()
186+
}
187+
}
188+
```
189+
190+
Ring buffer semantics (per buffer):
191+
- Fixed capacity N; on offer when full:
192+
- `drop_oldest`: evict 1 from head, enqueue new; record queue overflow client report
193+
- `drop_newest`: reject incoming; record queue overflow client report
194+
195+
Batching policy examples:
196+
- Logs: batch up to 100 items or 5s; split if envelope size limit reached
197+
- Spans: batch per trace
198+
- Errors/Feedback: batch size 1 (default)
199+
- Sessions/CheckIns: small batches if safe (e.g., 10 or 1s)
200+
- Transactions/Profiles/Replay: default 1
201+
202+
### Observability
203+
204+
- Counters per category: enqueued, sent, queue_overflow, rate_limited
205+
- Create a dashboard per category with reasons for dropped events
206+
- Health: include buffer health in `Client.isHealthy()` alongside transport
207+
208+
### Defaults and Safety
209+
210+
- Enabled by default with conservative capacities and weights
211+
- No change to envelope format, transport, or disk caching
212+
213+
### Open Questions
214+
215+
- Default capacities per category (especially logs/replay vs. critical)
216+
- Category weights (logs and spans will be high volume, so we might want to send them more often)
217+
- Safe batching across categories beyond logs/client reports
218+
- Shall we adapt Relay to accept multiple "top-level" items (like errors or transactions) in a single envelope?
219+
- Multiple http connections per telemetry type

0 commit comments

Comments
 (0)