Skip to content

Commit 6a7502e

Browse files
committed
otlp: add integration test moving data from otelgen to official Open Telemetry collector (with schema registry)
Test Infrastructure: - Spins up Redpanda container with Schema Registry - Deploys OTel Collector with debug exporter for validation - Uses telemetrygen to generate synthetic OTLP data Test Matrix: - Signal types: traces, logs, metrics - Transports: HTTP (port 4318) and gRPC (port 4317) - Encodings: JSON and Protobuf - Total: 12 test scenarios Test Flow: 1. Generate OTLP data → OTLP input → encode with Schema Registry → write to Redpanda topic 2. Read from Redpanda → decode from Schema Registry → send to OTel Collector 3. Validate expected data count (with 20% tolerance) === RUN TestIntegrationOTLPWithSchemaRegistry/trace_http_json integration_test.go:152: Given: Redpanda with Schema Registry integration_test.go:154: Redpanda broker: localhost:33320 integration_test.go:155: Schema Registry: http://localhost:33318 integration_test.go:158: And: topic otlp-trace-json-http is created integration_test.go:216: When: Creating topic with single partition integration_test.go:161: And: OTel Collector integration_test.go:163: OTel Collector endpoints - HTTP: localhost:33323, gRPC: localhost:33322 integration_test.go:165: When: generating telemetry data and sending to Redpanda via Benthos pipeline time="2026-01-20T16:16:08+01:00" level=info msg="Output type redpanda is now active" @service=benthos label="" path=root.output time="2026-01-20T16:16:08+01:00" level=info msg="Using Schema Registry schema ID 1 for signal type trace" @service=benthos label="" path=root.input time="2026-01-20T16:16:08+01:00" level=info msg="Using Schema Registry schema ID 2 for signal type log" @service=benthos label="" path=root.input time="2026-01-20T16:16:08+01:00" level=info msg="Using Schema Registry schema ID 3 for signal type metric" @service=benthos label="" path=root.input time="2026-01-20T16:16:08+01:00" level=info msg="Starting OTLP HTTP server on 0.0.0.0:4318" @service=benthos label="" path=root.input time="2026-01-20T16:16:08+01:00" level=info msg="Input type otlp_http is now active" @service=benthos label="" path=root.input time="2026-01-20T16:16:09+01:00" level=debug msg="immediate metadata update triggered" @service=benthos label="" path=root.output why="forced load because we are producing to a topic for the first time" time="2026-01-20T16:16:09+01:00" level=debug msg="producing to a new topic for the first time, fetching metadata to learn its partitions" @service=benthos label="" path=root.output topic=otlp-trace-json-http time="2026-01-20T16:16:09+01:00" level=debug msg="done waiting for metadata for new topic" @service=benthos label="" path=root.output topic=otlp-trace-json-http time="2026-01-20T16:16:09+01:00" level=debug msg="initializing producer id" @service=benthos label="" path=root.output time="2026-01-20T16:16:09+01:00" level=debug msg="producer id initialization success" @service=benthos epoch=0 id=1 label="" path=root.output integration_test.go:251: otelgen logs: 2026-01-20T15:16:08.611Z INFO traces/traces.go:41 starting HTTP exporter 2026-01-20T15:16:08.611Z INFO traces/traces.go:125 generation of traces is limited {"per-second": 100} 2026-01-20T15:16:23.622Z INFO traces/worker.go:180 traces generated {"worker": 0, "traces": 751} 2026-01-20T15:16:23.622Z INFO traces/traces.go:75 stop the batch span processor 2026-01-20T15:16:23.639Z INFO traces/traces.go:65 stopping the exporter time="2026-01-20T16:16:23+01:00" level=debug msg="Waiting for pending acks to resolve before shutting down." @service=benthos label="" path=root.input time="2026-01-20T16:16:23+01:00" level=debug msg="Pending acks resolved." @service=benthos label="" path=root.input integration_test.go:170: And: reading from Redpanda and sending to OTel Collector via pipeline integration_test.go:357: Pipeline shutdown integration_test.go:177: Then: OTel Collector should eventually contain expected data time="2026-01-20T16:16:23+01:00" level=info msg="Connected to OTLP HTTP endpoint: http://localhost:33323" @service=benthos label="" path=root.output time="2026-01-20T16:16:23+01:00" level=info msg="Output type otlp_http is now active" @service=benthos label="" path=root.output time="2026-01-20T16:16:23+01:00" level=debug msg="immediate metadata update triggered" @service=benthos label="" path=root.input why="querying metadata for consumer initialization" integration_test.go:183: Current count: 0, expected: 1500 (±20%) time="2026-01-20T16:16:23+01:00" level=info msg="Input type redpanda is now active" @service=benthos label="" path=root.input time="2026-01-20T16:16:23+01:00" level=debug msg="beginning to manage the group lifecycle" @service=benthos group=otlp-integration-test label="" path=root.input time="2026-01-20T16:16:23+01:00" level=debug msg="beginning autocommit loop" @service=benthos group=otlp-integration-test label="" path=root.input time="2026-01-20T16:16:23+01:00" level=debug msg="joining group" @service=benthos group=otlp-integration-test label="" path=root.input time="2026-01-20T16:16:23+01:00" level=debug msg="join returned MemberIDRequired, rejoining with response's MemberID" @service=benthos group=otlp-integration-test label="" member_id=redpanda-connect-88faab0e-5935-4472-91e8-4725adff967b path=root.input integration_test.go:183: Current count: 0, expected: 1500 (±20%) integration_test.go:183: Current count: 0, expected: 1500 (±20%) integration_test.go:183: Current count: 0, expected: 1500 (±20%) time="2026-01-20T16:16:26+01:00" level=debug msg="joined, balancing group" @service=benthos balance_protocol=cooperative-sticky generation=1 group=otlp-integration-test instance_id="<nil>" label="" leader=true member_id=redpanda-connect-88faab0e-5935-4472-91e8-4725adff967b path=root.input time="2026-01-20T16:16:26+01:00" level=debug msg="balancing group as leader" @service=benthos label="" path=root.input time="2026-01-20T16:16:26+01:00" level=debug msg="balance group member" @service=benthos id=redpanda-connect-88faab0e-5935-4472-91e8-4725adff967b interests="interested topics: [otlp-trace-json-http], previously owned: " label="" path=root.input time="2026-01-20T16:16:26+01:00" level=debug msg=balanced @service=benthos label="" path=root.input plan="redpanda-connect-88faab0e-5935-4472-91e8-4725adff967b{otlp-trace-json-http[0]}" time="2026-01-20T16:16:26+01:00" level=debug msg=syncing @service=benthos group=otlp-integration-test label="" path=root.input protocol=cooperative-sticky protocol_type=consumer time="2026-01-20T16:16:26+01:00" level=debug msg=synced @service=benthos assigned="otlp-trace-json-http[0]" group=otlp-integration-test label="" path=root.input time="2026-01-20T16:16:26+01:00" level=debug msg="new group session begun" @service=benthos added="otlp-trace-json-http[0]" group=otlp-integration-test label="" lost="" path=root.input time="2026-01-20T16:16:26+01:00" level=debug msg="beginning heartbeat loop" @service=benthos group=otlp-integration-test label="" path=root.input time="2026-01-20T16:16:26+01:00" level=debug msg="assigning partitions" @service=benthos how="assigning everything new, keeping current assignment" input="otlp-trace-json-http[0{-2 e-1 ce0}]" label="" path=root.input why="newly fetched offsets for group otlp-integration-test" integration_test.go:183: Current count: 1502, expected: 1500 (±20%) time="2026-01-20T16:16:27+01:00" level=debug msg="Waiting for pending acks to resolve before shutting down." @service=benthos label="" path=root.input time="2026-01-20T16:16:27+01:00" level=debug msg="Pending acks resolved." @service=benthos label="" path=root.input time="2026-01-20T16:16:27+01:00" level=debug msg="assigning partitions" @service=benthos how="unassigning everything" input="" label="" path=root.input why="invalidating all assignments in LeaveGroup" time="2026-01-20T16:16:27+01:00" level=debug msg="heartbeat errored" @service=benthos err="context canceled" group=otlp-integration-test label="" path=root.input time="2026-01-20T16:16:27+01:00" level=debug msg="assigning partitions" @service=benthos how="unassigning everything" input="" label="" path=root.input why="clearing assignment at end of group management session" time="2026-01-20T16:16:27+01:00" level=debug msg="leaving group" @service=benthos group=otlp-integration-test label="" member_id=redpanda-connect-88faab0e-5935-4472-91e8-4725adff967b path=root.input integration_test.go:357: Pipeline shutdown --- PASS: TestIntegrationOTLPWithSchemaRegistry/trace_http_json (21.37s)
1 parent fe2005f commit 6a7502e

File tree

4 files changed

+375
-2
lines changed

4 files changed

+375
-2
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ require (
156156
github.com/stretchr/testify v1.11.1
157157
github.com/testcontainers/testcontainers-go/modules/ollama v0.39.0
158158
github.com/testcontainers/testcontainers-go/modules/qdrant v0.39.0
159+
github.com/testcontainers/testcontainers-go/modules/redpanda v0.40.0
159160
github.com/tetratelabs/wazero v1.9.0
160161
github.com/tigerbeetle/tigerbeetle-go v0.16.61
161162
github.com/timeplus-io/proton-go-driver/v2 v2.1.2
@@ -494,7 +495,7 @@ require (
494495
github.com/sirupsen/logrus v1.9.3 // indirect
495496
github.com/spaolacci/murmur3 v1.1.0 // indirect
496497
github.com/stretchr/objx v0.5.2 // indirect
497-
github.com/testcontainers/testcontainers-go v0.40.0 // indirect
498+
github.com/testcontainers/testcontainers-go v0.40.0
498499
github.com/testcontainers/testcontainers-go/modules/mongodb v0.39.0
499500
github.com/tilinna/z85 v1.0.0 // indirect
500501
github.com/tklauser/go-sysconf v0.3.15 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1791,6 +1791,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
17911791
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
17921792
github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
17931793
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
1794+
github.com/mdelapenya/tlscert v0.2.0 h1:7H81W6Z/4weDvZBNOfQte5GpIMo0lGYEeWbkGp5LJHI=
1795+
github.com/mdelapenya/tlscert v0.2.0/go.mod h1:O4njj3ELLnJjGdkN7M/vIVCpZ+Cf0L6muqOG4tLSl8o=
17941796
github.com/microcosm-cc/bluemonday v1.0.27 h1:MpEUotklkwCSLeH+Qdx1VJgNqLlpY2KXwXFM08ygZfk=
17951797
github.com/microcosm-cc/bluemonday v1.0.27/go.mod h1:jFi9vgW+H7c3V0lb6nR74Ib/DIB5OBs92Dimizgw2cA=
17961798
github.com/microsoft/go-mssqldb v1.9.3 h1:hy4p+LDC8LIGvI3JATnLVmBOLMJbmn5X400mr5j0lPs=

internal/impl/otlp/input.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,11 @@ func (o *otlpInput) newMessageWithSignalType(msg proto.Message, s SignalType) (*
287287
case EncodingProtobuf:
288288
msgBytes, err = proto.Marshal(msg)
289289
case EncodingJSON:
290-
msgBytes, err = protojson.Marshal(msg)
290+
marshaler := protojson.MarshalOptions{
291+
UseProtoNames: true, // Align with our snake case preferences
292+
UseEnumNumbers: true, // Closer to the official OTEL JSON format
293+
}
294+
msgBytes, err = marshaler.Marshal(msg)
291295
default:
292296
return nil, fmt.Errorf("unsupported encoding: %s", o.encoding)
293297
}

0 commit comments

Comments
 (0)