-
Notifications
You must be signed in to change notification settings - Fork 913
otel: support for JSON, schema registry and integration test #3924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
go.mod
Outdated
| github.com/redpanda-data/common-go/authz v0.2.0 | ||
| github.com/redpanda-data/common-go/license v0.0.0-20260109170727-1dd9f5d22ee1 | ||
| github.com/redpanda-data/common-go/redpanda-otel-exporter v0.3.0 | ||
| github.com/redpanda-data/common-go/redpanda-otel-exporter v0.3.1-0.20260120073450-935d3dd3d6c1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW feel free to push a tag for 0.4.0 for this, you have permissions for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uhh this is awesome
82324f5 to
bfc7327
Compare
This is for compatibility with redpanda-otel-exporter and JSON is the default now. Inputs expose configuration option, outputs transparently handle both.
Extend OTLP HTTP and gRPC inputs to optionally register and add Schema Registry wire format headers to outgoing messages.
…Telemetry collector (with schema registry)
Test Infrastructure:
- Spins up Redpanda container with Schema Registry
- Deploys OTel Collector with debug exporter for validation
- Uses telemetrygen to generate synthetic OTLP data
Test Matrix:
- Signal types: traces, logs, metrics
- Transports: HTTP (port 4318) and gRPC (port 4317)
- Encodings: JSON and Protobuf
- Total: 12 test scenarios
Test Flow:
1. Generate OTLP data → OTLP input → encode with Schema Registry → write to Redpanda topic
2. Read from Redpanda → decode from Schema Registry → send to OTel Collector
3. Validate expected data count (with 20% tolerance)
=== RUN TestIntegrationOTLPWithSchemaRegistry/trace_http_json
integration_test.go:152: Given: Redpanda with Schema Registry
integration_test.go:154: Redpanda broker: localhost:33320
integration_test.go:155: Schema Registry: http://localhost:33318
integration_test.go:158: And: topic otlp-trace-json-http is created
integration_test.go:216: When: Creating topic with single partition
integration_test.go:161: And: OTel Collector
integration_test.go:163: OTel Collector endpoints - HTTP: localhost:33323, gRPC: localhost:33322
integration_test.go:165: When: generating telemetry data and sending to Redpanda via Benthos pipeline
time="2026-01-20T16:16:08+01:00" level=info msg="Output type redpanda is now active" @service=benthos label="" path=root.output
time="2026-01-20T16:16:08+01:00" level=info msg="Using Schema Registry schema ID 1 for signal type trace" @service=benthos label="" path=root.input
time="2026-01-20T16:16:08+01:00" level=info msg="Using Schema Registry schema ID 2 for signal type log" @service=benthos label="" path=root.input
time="2026-01-20T16:16:08+01:00" level=info msg="Using Schema Registry schema ID 3 for signal type metric" @service=benthos label="" path=root.input
time="2026-01-20T16:16:08+01:00" level=info msg="Starting OTLP HTTP server on 0.0.0.0:4318" @service=benthos label="" path=root.input
time="2026-01-20T16:16:08+01:00" level=info msg="Input type otlp_http is now active" @service=benthos label="" path=root.input
time="2026-01-20T16:16:09+01:00" level=debug msg="immediate metadata update triggered" @service=benthos label="" path=root.output why="forced load because we are producing to a topic for the first time"
time="2026-01-20T16:16:09+01:00" level=debug msg="producing to a new topic for the first time, fetching metadata to learn its partitions" @service=benthos label="" path=root.output topic=otlp-trace-json-http
time="2026-01-20T16:16:09+01:00" level=debug msg="done waiting for metadata for new topic" @service=benthos label="" path=root.output topic=otlp-trace-json-http
time="2026-01-20T16:16:09+01:00" level=debug msg="initializing producer id" @service=benthos label="" path=root.output
time="2026-01-20T16:16:09+01:00" level=debug msg="producer id initialization success" @service=benthos epoch=0 id=1 label="" path=root.output
integration_test.go:251: otelgen logs:
2026-01-20T15:16:08.611Z INFO traces/traces.go:41 starting HTTP exporter
2026-01-20T15:16:08.611Z INFO traces/traces.go:125 generation of traces is limited {"per-second": 100}
2026-01-20T15:16:23.622Z INFO traces/worker.go:180 traces generated {"worker": 0, "traces": 751}
2026-01-20T15:16:23.622Z INFO traces/traces.go:75 stop the batch span processor
2026-01-20T15:16:23.639Z INFO traces/traces.go:65 stopping the exporter
time="2026-01-20T16:16:23+01:00" level=debug msg="Waiting for pending acks to resolve before shutting down." @service=benthos label="" path=root.input
time="2026-01-20T16:16:23+01:00" level=debug msg="Pending acks resolved." @service=benthos label="" path=root.input
integration_test.go:170: And: reading from Redpanda and sending to OTel Collector via pipeline
integration_test.go:357: Pipeline shutdown
integration_test.go:177: Then: OTel Collector should eventually contain expected data
time="2026-01-20T16:16:23+01:00" level=info msg="Connected to OTLP HTTP endpoint: http://localhost:33323" @service=benthos label="" path=root.output
time="2026-01-20T16:16:23+01:00" level=info msg="Output type otlp_http is now active" @service=benthos label="" path=root.output
time="2026-01-20T16:16:23+01:00" level=debug msg="immediate metadata update triggered" @service=benthos label="" path=root.input why="querying metadata for consumer initialization"
integration_test.go:183: Current count: 0, expected: 1500 (±20%)
time="2026-01-20T16:16:23+01:00" level=info msg="Input type redpanda is now active" @service=benthos label="" path=root.input
time="2026-01-20T16:16:23+01:00" level=debug msg="beginning to manage the group lifecycle" @service=benthos group=otlp-integration-test label="" path=root.input
time="2026-01-20T16:16:23+01:00" level=debug msg="beginning autocommit loop" @service=benthos group=otlp-integration-test label="" path=root.input
time="2026-01-20T16:16:23+01:00" level=debug msg="joining group" @service=benthos group=otlp-integration-test label="" path=root.input
time="2026-01-20T16:16:23+01:00" level=debug msg="join returned MemberIDRequired, rejoining with response's MemberID" @service=benthos group=otlp-integration-test label="" member_id=redpanda-connect-88faab0e-5935-4472-91e8-4725adff967b path=root.input
integration_test.go:183: Current count: 0, expected: 1500 (±20%)
integration_test.go:183: Current count: 0, expected: 1500 (±20%)
integration_test.go:183: Current count: 0, expected: 1500 (±20%)
time="2026-01-20T16:16:26+01:00" level=debug msg="joined, balancing group" @service=benthos balance_protocol=cooperative-sticky generation=1 group=otlp-integration-test instance_id="<nil>" label="" leader=true member_id=redpanda-connect-88faab0e-5935-4472-91e8-4725adff967b path=root.input
time="2026-01-20T16:16:26+01:00" level=debug msg="balancing group as leader" @service=benthos label="" path=root.input
time="2026-01-20T16:16:26+01:00" level=debug msg="balance group member" @service=benthos id=redpanda-connect-88faab0e-5935-4472-91e8-4725adff967b interests="interested topics: [otlp-trace-json-http], previously owned: " label="" path=root.input
time="2026-01-20T16:16:26+01:00" level=debug msg=balanced @service=benthos label="" path=root.input plan="redpanda-connect-88faab0e-5935-4472-91e8-4725adff967b{otlp-trace-json-http[0]}"
time="2026-01-20T16:16:26+01:00" level=debug msg=syncing @service=benthos group=otlp-integration-test label="" path=root.input protocol=cooperative-sticky protocol_type=consumer
time="2026-01-20T16:16:26+01:00" level=debug msg=synced @service=benthos assigned="otlp-trace-json-http[0]" group=otlp-integration-test label="" path=root.input
time="2026-01-20T16:16:26+01:00" level=debug msg="new group session begun" @service=benthos added="otlp-trace-json-http[0]" group=otlp-integration-test label="" lost="" path=root.input
time="2026-01-20T16:16:26+01:00" level=debug msg="beginning heartbeat loop" @service=benthos group=otlp-integration-test label="" path=root.input
time="2026-01-20T16:16:26+01:00" level=debug msg="assigning partitions" @service=benthos how="assigning everything new, keeping current assignment" input="otlp-trace-json-http[0{-2 e-1 ce0}]" label="" path=root.input why="newly fetched offsets for group otlp-integration-test"
integration_test.go:183: Current count: 1502, expected: 1500 (±20%)
time="2026-01-20T16:16:27+01:00" level=debug msg="Waiting for pending acks to resolve before shutting down." @service=benthos label="" path=root.input
time="2026-01-20T16:16:27+01:00" level=debug msg="Pending acks resolved." @service=benthos label="" path=root.input
time="2026-01-20T16:16:27+01:00" level=debug msg="assigning partitions" @service=benthos how="unassigning everything" input="" label="" path=root.input why="invalidating all assignments in LeaveGroup"
time="2026-01-20T16:16:27+01:00" level=debug msg="heartbeat errored" @service=benthos err="context canceled" group=otlp-integration-test label="" path=root.input
time="2026-01-20T16:16:27+01:00" level=debug msg="assigning partitions" @service=benthos how="unassigning everything" input="" label="" path=root.input why="clearing assignment at end of group management session"
time="2026-01-20T16:16:27+01:00" level=debug msg="leaving group" @service=benthos group=otlp-integration-test label="" member_id=redpanda-connect-88faab0e-5935-4472-91e8-4725adff967b path=root.input
integration_test.go:357: Pipeline shutdown
--- PASS: TestIntegrationOTLPWithSchemaRegistry/trace_http_json (21.37s)
…o/redpanda/otel/v1
bfc7327 to
94bc64a
Compare
No description provided.