Skip to content

Commit 27cd330

Browse files
davidgoliclaude
andcommitted
feat(opentelemetry): add protobuf protocol support for OTLP exporters
Add support for Protocol Buffers binary encoding as an alternative to JSON for OTLP trace, metrics, and log exports. This enables more efficient wire format when communicating with OpenTelemetry collectors. - Add `protocol` option ("json" | "protobuf") to Otlp.layer and individual OtlpTracer, OtlpMetrics, OtlpLogger layers - Implement protobuf wire format encoding following opentelemetry-proto specs - Set appropriate Content-Type header (application/x-protobuf vs application/json) - No new dependencies - protobuf encoding implemented from scratch 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 488d6e8 commit 27cd330

File tree

9 files changed

+1473
-7
lines changed

9 files changed

+1473
-7
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
---
2+
"@effect/opentelemetry": minor
3+
---
4+
5+
Add protobuf protocol support for OTLP exporters.
6+
7+
This adds a `protocol` option to `Otlp.layer`, `OtlpTracer.layer`, `OtlpMetrics.layer`, and `OtlpLogger.layer` that allows choosing between JSON (default) and Protocol Buffers binary encoding when exporting telemetry data to OpenTelemetry collectors.
8+
9+
```typescript
10+
import { Otlp } from "@effect/opentelemetry"
11+
12+
// Use protobuf encoding for more efficient wire format
13+
Otlp.layer({
14+
baseUrl: "http://localhost:4318",
15+
protocol: "protobuf",
16+
resource: { serviceName: "my-service" }
17+
})
18+
```
19+
20+
- No new dependencies - protobuf encoding implemented from scratch
21+
- Sets appropriate Content-Type header (`application/x-protobuf` vs `application/json`)
22+
- Follows opentelemetry-proto specifications

packages/opentelemetry/src/Otlp.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,22 @@ import type * as Duration from "effect/Duration"
88
import * as Layer from "effect/Layer"
99
import type * as Logger from "effect/Logger"
1010
import type * as Tracer from "effect/Tracer"
11+
import type { OtlpProtocol as _OtlpProtocol } from "./internal/otlpExporter.js"
1112
import * as OtlpLogger from "./OtlpLogger.js"
1213
import * as OtlpMetrics from "./OtlpMetrics.js"
1314
import * as OtlpTracer from "./OtlpTracer.js"
1415

16+
/**
17+
* OTLP protocol type for encoding telemetry data.
18+
*
19+
* - `"json"`: JSON encoding (default) - uses `application/json` content type
20+
* - `"protobuf"`: Protocol Buffers binary encoding - uses `application/x-protobuf` content type
21+
*
22+
* @since 1.0.0
23+
* @category Models
24+
*/
25+
export type OtlpProtocol = _OtlpProtocol
26+
1527
/**
1628
* @since 1.0.0
1729
* @category Layers
@@ -32,6 +44,7 @@ export const layer = (options: {
3244
readonly metricsExportInterval?: Duration.DurationInput | undefined
3345
readonly tracerExportInterval?: Duration.DurationInput | undefined
3446
readonly shutdownTimeout?: Duration.DurationInput | undefined
47+
readonly protocol?: OtlpProtocol | undefined
3548
}): Layer.Layer<never, never, HttpClient.HttpClient> => {
3649
const baseReq = HttpClientRequest.get(options.baseUrl)
3750
const url = (path: string) => HttpClientRequest.appendUrl(baseReq, path).url
@@ -44,14 +57,16 @@ export const layer = (options: {
4457
exportInterval: options.loggerExportInterval,
4558
maxBatchSize: options.maxBatchSize,
4659
shutdownTimeout: options.shutdownTimeout,
47-
excludeLogSpans: options.loggerExcludeLogSpans
60+
excludeLogSpans: options.loggerExcludeLogSpans,
61+
protocol: options.protocol
4862
}),
4963
OtlpMetrics.layer({
5064
url: url("/v1/metrics"),
5165
resource: options.resource,
5266
headers: options.headers,
5367
exportInterval: options.metricsExportInterval,
54-
shutdownTimeout: options.shutdownTimeout
68+
shutdownTimeout: options.shutdownTimeout,
69+
protocol: options.protocol
5570
}),
5671
OtlpTracer.layer({
5772
url: url("/v1/traces"),
@@ -60,7 +75,8 @@ export const layer = (options: {
6075
exportInterval: options.tracerExportInterval,
6176
maxBatchSize: options.maxBatchSize,
6277
context: options.tracerContext,
63-
shutdownTimeout: options.shutdownTimeout
78+
shutdownTimeout: options.shutdownTimeout,
79+
protocol: options.protocol
6480
})
6581
)
6682
}

packages/opentelemetry/src/OtlpLogger.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import * as Option from "effect/Option"
1818
import type * as Scope from "effect/Scope"
1919
import * as Tracer from "effect/Tracer"
2020
import * as Exporter from "./internal/otlpExporter.js"
21+
import type { OtlpProtocol } from "./internal/otlpExporter.js"
22+
import * as OtlpProtobuf from "./internal/otlpProtobuf.js"
2123
import type { AnyValue, Fixed64, KeyValue, Resource } from "./OtlpResource.js"
2224
import * as OtlpResource from "./OtlpResource.js"
2325

@@ -38,6 +40,7 @@ export const make: (
3840
readonly maxBatchSize?: number | undefined
3941
readonly shutdownTimeout?: Duration.DurationInput | undefined
4042
readonly excludeLogSpans?: boolean | undefined
43+
readonly protocol?: OtlpProtocol | undefined
4144
}
4245
) => Effect.Effect<
4346
Logger.Logger<unknown, void>,
@@ -55,6 +58,7 @@ export const make: (
5558
headers: options.headers,
5659
maxBatchSize: options.maxBatchSize ?? 1000,
5760
exportInterval: options.exportInterval ?? Duration.seconds(1),
61+
protocol: options.protocol,
5862
body: (data): IExportLogsServiceRequest => ({
5963
resourceLogs: [{
6064
resource: otelResource,
@@ -64,6 +68,29 @@ export const make: (
6468
}]
6569
}]
6670
}),
71+
bodyProtobuf: (data: Array<ILogRecord>): Uint8Array =>
72+
OtlpProtobuf.encodeLogsData({
73+
resourceLogs: [{
74+
resource: otelResource,
75+
scopeLogs: [{
76+
scope,
77+
logRecords: data.map((record) => ({
78+
timeUnixNano: String(record.timeUnixNano),
79+
observedTimeUnixNano: record.observedTimeUnixNano !== undefined
80+
? String(record.observedTimeUnixNano)
81+
: undefined,
82+
severityNumber: record.severityNumber,
83+
severityText: record.severityText,
84+
body: record.body,
85+
attributes: record.attributes,
86+
droppedAttributesCount: record.droppedAttributesCount,
87+
flags: record.flags,
88+
traceId: typeof record.traceId === "string" ? record.traceId : undefined,
89+
spanId: typeof record.spanId === "string" ? record.spanId : undefined
90+
}))
91+
}]
92+
}]
93+
}),
6794
shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3)
6895
})
6996

@@ -92,6 +119,7 @@ export const layer = (options: {
92119
readonly maxBatchSize?: number | undefined
93120
readonly shutdownTimeout?: Duration.DurationInput | undefined
94121
readonly excludeLogSpans?: boolean | undefined
122+
readonly protocol?: OtlpProtocol | undefined
95123
}): Layer.Layer<never, never, HttpClient.HttpClient> =>
96124
options.replaceLogger ? Logger.replaceScoped(options.replaceLogger, make(options)) : Logger.addScoped(make(options))
97125

packages/opentelemetry/src/OtlpMetrics.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import * as MetricState from "effect/MetricState"
1313
import * as Option from "effect/Option"
1414
import type * as Scope from "effect/Scope"
1515
import * as Exporter from "./internal/otlpExporter.js"
16+
import type { OtlpProtocol } from "./internal/otlpExporter.js"
17+
import * as OtlpProtobuf from "./internal/otlpProtobuf.js"
1618
import type { Fixed64, KeyValue } from "./OtlpResource.js"
1719
import * as OtlpResource from "./OtlpResource.js"
1820

@@ -30,6 +32,7 @@ export const make: (options: {
3032
readonly headers?: Headers.Input | undefined
3133
readonly exportInterval?: Duration.DurationInput | undefined
3234
readonly shutdownTimeout?: Duration.DurationInput | undefined
35+
readonly protocol?: OtlpProtocol | undefined
3336
}) => Effect.Effect<
3437
void,
3538
never,
@@ -265,13 +268,72 @@ export const make: (options: {
265268
}
266269
}
267270

271+
const snapshotProtobuf = (): Uint8Array => {
272+
const data = snapshot()
273+
return OtlpProtobuf.encodeMetricsData({
274+
resourceMetrics: data.resourceMetrics.map((rm) => ({
275+
resource: rm.resource!,
276+
scopeMetrics: rm.scopeMetrics.map((sm) => ({
277+
scope: sm.scope!,
278+
metrics: sm.metrics.map((m) => ({
279+
name: m.name,
280+
description: m.description,
281+
unit: m.unit,
282+
gauge: m.gauge
283+
? {
284+
dataPoints: m.gauge.dataPoints.map((dp) => ({
285+
attributes: dp.attributes,
286+
startTimeUnixNano: String(dp.startTimeUnixNano ?? "0"),
287+
timeUnixNano: String(dp.timeUnixNano ?? "0"),
288+
asDouble: dp.asDouble !== null ? dp.asDouble : undefined,
289+
asInt: dp.asInt !== undefined ? String(dp.asInt) : undefined
290+
}))
291+
}
292+
: undefined,
293+
sum: m.sum
294+
? {
295+
dataPoints: m.sum.dataPoints.map((dp) => ({
296+
attributes: dp.attributes,
297+
startTimeUnixNano: String(dp.startTimeUnixNano ?? "0"),
298+
timeUnixNano: String(dp.timeUnixNano ?? "0"),
299+
asDouble: dp.asDouble !== null ? dp.asDouble : undefined,
300+
asInt: dp.asInt !== undefined ? String(dp.asInt) : undefined
301+
})),
302+
aggregationTemporality: m.sum.aggregationTemporality,
303+
isMonotonic: m.sum.isMonotonic
304+
}
305+
: undefined,
306+
histogram: m.histogram
307+
? {
308+
dataPoints: m.histogram.dataPoints.map((dp) => ({
309+
attributes: dp.attributes ?? [],
310+
startTimeUnixNano: String(dp.startTimeUnixNano ?? "0"),
311+
timeUnixNano: String(dp.timeUnixNano ?? "0"),
312+
count: dp.count ?? 0,
313+
sum: dp.sum,
314+
bucketCounts: dp.bucketCounts ?? [],
315+
explicitBounds: dp.explicitBounds ?? [],
316+
min: dp.min,
317+
max: dp.max
318+
})),
319+
aggregationTemporality: m.histogram.aggregationTemporality ?? OtlpProtobuf.AggregationTemporality.Cumulative
320+
}
321+
: undefined
322+
}))
323+
}))
324+
}))
325+
})
326+
}
327+
268328
yield* Exporter.make({
269329
label: "OtlpMetrics",
270330
url: options.url,
271331
headers: options.headers,
272332
maxBatchSize: "disabled",
273333
exportInterval: options.exportInterval ?? Duration.seconds(10),
334+
protocol: options.protocol,
274335
body: snapshot,
336+
bodyProtobuf: snapshotProtobuf,
275337
shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3)
276338
})
277339
})
@@ -290,6 +352,7 @@ export const layer = (options: {
290352
readonly headers?: Headers.Input | undefined
291353
readonly exportInterval?: Duration.DurationInput | undefined
292354
readonly shutdownTimeout?: Duration.DurationInput | undefined
355+
readonly protocol?: OtlpProtocol | undefined
293356
}): Layer.Layer<never, never, HttpClient.HttpClient> => Layer.scopedDiscard(make(options))
294357

295358
// internal

packages/opentelemetry/src/OtlpTracer.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import type * as Scope from "effect/Scope"
1414
import * as Tracer from "effect/Tracer"
1515
import type { ExtractTag } from "effect/Types"
1616
import * as Exporter from "./internal/otlpExporter.js"
17+
import type { OtlpProtocol } from "./internal/otlpExporter.js"
18+
import * as OtlpProtobuf from "./internal/otlpProtobuf.js"
1719
import type { KeyValue, Resource } from "./OtlpResource.js"
1820
import { entriesToAttributes } from "./OtlpResource.js"
1921
import * as OtlpResource from "./OtlpResource.js"
@@ -39,6 +41,7 @@ export const make: (
3941
readonly maxBatchSize?: number | undefined
4042
readonly context?: (<X>(f: () => X, span: Tracer.AnySpan) => X) | undefined
4143
readonly shutdownTimeout?: Duration.DurationInput | undefined
44+
readonly protocol?: OtlpProtocol | undefined
4245
}
4346
) => Effect.Effect<
4447
Tracer.Tracer,
@@ -56,6 +59,7 @@ export const make: (
5659
headers: options.headers,
5760
exportInterval: options.exportInterval ?? Duration.seconds(5),
5861
maxBatchSize: options.maxBatchSize ?? 1000,
62+
protocol: options.protocol,
5963
body(spans) {
6064
const data: TraceData = {
6165
resourceSpans: [{
@@ -68,6 +72,17 @@ export const make: (
6872
}
6973
return data
7074
},
75+
bodyProtobuf(spans) {
76+
return OtlpProtobuf.encodeTracesData({
77+
resourceSpans: [{
78+
resource: otelResource,
79+
scopeSpans: [{
80+
scope,
81+
spans
82+
}]
83+
}]
84+
})
85+
},
7186
shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3)
7287
})
7388

@@ -119,6 +134,7 @@ export const layer = (options: {
119134
readonly maxBatchSize?: number | undefined
120135
readonly context?: (<X>(f: () => X, span: Tracer.AnySpan) => X) | undefined
121136
readonly shutdownTimeout?: Duration.DurationInput | undefined
137+
readonly protocol?: OtlpProtocol | undefined
122138
}): Layer.Layer<never, never, HttpClient.HttpClient> => Layer.unwrapScoped(Effect.map(make(options), Layer.setTracer))
123139

124140
// internal

packages/opentelemetry/src/internal/otlpExporter.ts

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import * as Headers from "@effect/platform/Headers"
2+
import * as HttpBody from "@effect/platform/HttpBody"
23
import * as HttpClient from "@effect/platform/HttpClient"
34
import * as HttpClientError from "@effect/platform/HttpClientError"
45
import * as HttpClientRequest from "@effect/platform/HttpClientRequest"
@@ -10,6 +11,12 @@ import * as Option from "effect/Option"
1011
import * as Schedule from "effect/Schedule"
1112
import * as Scope from "effect/Scope"
1213

14+
/**
15+
* OTLP protocol type for encoding
16+
* @internal
17+
*/
18+
export type OtlpProtocol = "json" | "protobuf"
19+
1320
const policy = Schedule.forever.pipe(
1421
Schedule.passthrough,
1522
Schedule.addDelay((error) => {
@@ -37,6 +44,8 @@ export const make: (
3744
readonly exportInterval: Duration.DurationInput
3845
readonly maxBatchSize: number | "disabled"
3946
readonly body: (data: Array<any>) => unknown
47+
readonly bodyProtobuf?: ((data: Array<any>) => Uint8Array) | undefined
48+
readonly protocol?: OtlpProtocol | undefined
4049
readonly shutdownTimeout: Duration.DurationInput
4150
}
4251
) => Effect.Effect<
@@ -53,8 +62,14 @@ export const make: (
5362
HttpClient.retryTransient({ schedule: policy, times: 3 })
5463
)
5564

65+
const protocol = options.protocol ?? "json"
66+
const contentType = protocol === "protobuf"
67+
? "application/x-protobuf"
68+
: "application/json"
69+
5670
let headers = Headers.unsafeFromRecord({
57-
"user-agent": `effect-opentelemetry-${options.label}/0.0.0`
71+
"user-agent": `effect-opentelemetry-${options.label}/0.0.0`,
72+
"content-type": contentType
5873
})
5974
if (options.headers) {
6075
headers = Headers.merge(Headers.fromInput(options.headers), headers)
@@ -75,9 +90,14 @@ export const make: (
7590
}
7691
buffer = []
7792
}
78-
return client.execute(
79-
HttpClientRequest.bodyUnsafeJson(request, options.body(items))
80-
).pipe(
93+
const requestWithBody = protocol === "protobuf" && options.bodyProtobuf
94+
? HttpClientRequest.setBody(
95+
request,
96+
HttpBody.uint8Array(options.bodyProtobuf(items), contentType)
97+
)
98+
: HttpClientRequest.bodyUnsafeJson(request, options.body(items))
99+
100+
return client.execute(requestWithBody).pipe(
81101
Effect.asVoid,
82102
Effect.withTracerEnabled(false)
83103
)

0 commit comments

Comments
 (0)