Skip to content

Commit 21e7450

Browse files
OhJuhunaxw
andauthored
impl log partition by trace id (#43029)
### Description Added log partitioning capability to the Kafka Exporter. When the `partition_logs_by_trace_id` option is enabled, log records are distributed to Kafka partitions based on their TraceID. Records without a TraceID fall back to the default partitioning strategy. If `partition_logs_by_resource_attributes` is also enabled, resource attribute partitioning takes precedence over TraceID partitioning. ### Link to tracking issue Fixes #39146 ### Testing - Added unit tests to validate TraceID-based log partitioning and option precedence logic. - Verified correct partitioning for various combinations of TraceIDs and resource attributes. ### Documentation - Updated `exporter/kafkaexporter/README.md` with usage details and examples for `partition_logs_by_trace_id`. - Included related changelog updates. --------- Co-authored-by: Andrew Wilkins <[email protected]>
1 parent 4c43203 commit 21e7450

File tree

8 files changed

+227
-15
lines changed

8 files changed

+227
-15
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: exporter/kafka
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for partitioning log records by trace ID
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39146]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ The following settings can be optionally configured:
5454
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
5555
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
5656
- `partition_logs_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
57+
- - `partition_logs_by_trace_id` (default = false): configures the exporter to partition log messages by trace ID, if the log record has one associated. Note: `partition_logs_by_resource_attributes` and `partition_logs_by_trace_id` are mutually exclusive, and enabling both will lead to an error.
5758
- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options.
5859
- `auth`
5960
- `plain_text` (Deprecated in v0.123.0: use sasl with mechanism set to PLAIN instead.)

exporter/kafkaexporter/config.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
55

66
import (
7+
"errors"
8+
79
"go.opentelemetry.io/collector/component"
810
"go.opentelemetry.io/collector/config/configretry"
911
"go.opentelemetry.io/collector/confmap"
@@ -14,6 +16,8 @@ import (
1416

1517
var _ component.Config = (*Config)(nil)
1618

19+
var errLogsPartitionExclusive = errors.New("partition_logs_by_resource_attributes and partition_logs_by_trace_id cannot both be enabled")
20+
1721
// Config defines configuration for Kafka exporter.
1822
type Config struct {
1923
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
@@ -71,6 +75,21 @@ type Config struct {
7175
// If this is true, then the message key will be set to a hash of the resource's identifying
7276
// attributes.
7377
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
78+
79+
// PartitionLogsByTraceID controls partitioning of log messages by trace ID only.
80+
// When enabled, the exporter splits incoming logs per TraceID (using SplitLogs)
81+
// and sets the Kafka message key to the 16-byte hex string of that TraceID.
82+
// If a LogRecord has an empty TraceID, the key may be empty and partition
83+
// selection falls back to the Kafka client’s default strategy. Resource
84+
// attributes are not used for the key when this option is enabled.
85+
PartitionLogsByTraceID bool `mapstructure:"partition_logs_by_trace_id"`
86+
}
87+
88+
func (c *Config) Validate() (err error) {
89+
if c.PartitionLogsByResourceAttributes && c.PartitionLogsByTraceID {
90+
return errLogsPartitionExclusive
91+
}
92+
return err
7493
}
7594

7695
func (c *Config) Unmarshal(conf *confmap.Conf) error {

exporter/kafkaexporter/config_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func TestLoadConfig(t *testing.T) {
8080
PartitionTracesByID: true,
8181
PartitionMetricsByResourceAttributes: true,
8282
PartitionLogsByResourceAttributes: true,
83+
PartitionLogsByTraceID: false,
8384
},
8485
},
8586
{
@@ -152,3 +153,34 @@ func TestLoadConfig(t *testing.T) {
152153
})
153154
}
154155
}
156+
157+
func TestLoadConfigFailed(t *testing.T) {
158+
t.Parallel()
159+
160+
tests := []struct {
161+
id component.ID
162+
expectedError error
163+
configFile string
164+
}{
165+
{
166+
id: component.NewIDWithName(metadata.Type, ""),
167+
expectedError: errLogsPartitionExclusive,
168+
configFile: "config-partitioning-failed.yaml",
169+
},
170+
}
171+
172+
for _, tt := range tests {
173+
t.Run(tt.id.String(), func(t *testing.T) {
174+
cm, err := confmaptest.LoadConf(filepath.Join("testdata", tt.configFile))
175+
require.NoError(t, err)
176+
177+
cfg := createDefaultConfig().(*Config)
178+
179+
sub, err := cm.Sub(tt.id.String())
180+
require.NoError(t, err)
181+
require.NoError(t, sub.Unmarshal(cfg))
182+
183+
assert.ErrorIs(t, xconfmap.Validate(cfg), tt.expectedError)
184+
})
185+
}
186+
}

exporter/kafkaexporter/factory.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ const (
3232
defaultPartitionMetricsByResourceAttributesEnabled = false
3333
// partitioning logs by resource attributes is disabled by default
3434
defaultPartitionLogsByResourceAttributesEnabled = false
35+
// partitioning logs by trace id is disabled by default
36+
defaultPartitionLogsByTraceIDEnabled = false
3537
)
3638

3739
// NewFactory creates Kafka exporter factory.
@@ -71,6 +73,7 @@ func createDefaultConfig() component.Config {
7173
},
7274
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
7375
PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled,
76+
PartitionLogsByTraceID: defaultPartitionLogsByTraceIDEnabled,
7477
}
7578
}
7679

exporter/kafkaexporter/kafka_exporter.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func (e *kafkaTracesMessenger) marshalData(td ptrace.Traces) ([]marshaler.Messag
220220
}
221221

222222
func (e *kafkaTracesMessenger) getTopic(ctx context.Context, td ptrace.Traces) string {
223-
return getTopic(ctx, e.config.Traces, e.config.TopicFromAttribute, td.ResourceSpans())
223+
return getTopic[ptrace.ResourceSpans](ctx, e.config.Traces, e.config.TopicFromAttribute, td.ResourceSpans())
224224
}
225225

226226
func (e *kafkaTracesMessenger) partitionData(td ptrace.Traces) iter.Seq2[[]byte, ptrace.Traces] {
@@ -265,23 +265,36 @@ func (e *kafkaLogsMessenger) marshalData(ld plog.Logs) ([]marshaler.Message, err
265265
}
266266

267267
func (e *kafkaLogsMessenger) getTopic(ctx context.Context, ld plog.Logs) string {
268-
return getTopic(ctx, e.config.Logs, e.config.TopicFromAttribute, ld.ResourceLogs())
268+
return getTopic[plog.ResourceLogs](ctx, e.config.Logs, e.config.TopicFromAttribute, ld.ResourceLogs())
269269
}
270270

271271
func (e *kafkaLogsMessenger) partitionData(ld plog.Logs) iter.Seq2[[]byte, plog.Logs] {
272272
return func(yield func([]byte, plog.Logs) bool) {
273-
if !e.config.PartitionLogsByResourceAttributes {
274-
yield(nil, ld)
273+
if e.config.PartitionLogsByResourceAttributes {
274+
for _, resourceLogs := range ld.ResourceLogs().All() {
275+
hash := pdatautil.MapHash(resourceLogs.Resource().Attributes())
276+
newLogs := plog.NewLogs()
277+
resourceLogs.CopyTo(newLogs.ResourceLogs().AppendEmpty())
278+
if !yield(hash[:], newLogs) {
279+
return
280+
}
281+
}
275282
return
276283
}
277-
for _, resourceLogs := range ld.ResourceLogs().All() {
278-
hash := pdatautil.MapHash(resourceLogs.Resource().Attributes())
279-
newLogs := plog.NewLogs()
280-
resourceLogs.CopyTo(newLogs.ResourceLogs().AppendEmpty())
281-
if !yield(hash[:], newLogs) {
282-
return
284+
if e.config.PartitionLogsByTraceID {
285+
for _, l := range batchpersignal.SplitLogs(ld) {
286+
var key []byte
287+
traceID := l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).TraceID()
288+
if !traceID.IsEmpty() {
289+
key = []byte(traceutil.TraceIDToHexOrEmptyString(traceID))
290+
}
291+
if !yield(key, l) {
292+
return
293+
}
283294
}
295+
return
284296
}
297+
yield(nil, ld)
285298
}
286299
}
287300

@@ -308,7 +321,7 @@ func (e *kafkaMetricsMessenger) marshalData(md pmetric.Metrics) ([]marshaler.Mes
308321
}
309322

310323
func (e *kafkaMetricsMessenger) getTopic(ctx context.Context, md pmetric.Metrics) string {
311-
return getTopic(ctx, e.config.Metrics, e.config.TopicFromAttribute, md.ResourceMetrics())
324+
return getTopic[pmetric.ResourceMetrics](ctx, e.config.Metrics, e.config.TopicFromAttribute, md.ResourceMetrics())
312325
}
313326

314327
func (e *kafkaMetricsMessenger) partitionData(md pmetric.Metrics) iter.Seq2[[]byte, pmetric.Metrics] {
@@ -351,7 +364,7 @@ func (e *kafkaProfilesMessenger) marshalData(ld pprofile.Profiles) ([]marshaler.
351364
}
352365

353366
func (e *kafkaProfilesMessenger) getTopic(ctx context.Context, ld pprofile.Profiles) string {
354-
return getTopic(ctx, e.config.Profiles, e.config.TopicFromAttribute, ld.ResourceProfiles())
367+
return getTopic[pprofile.ResourceProfiles](ctx, e.config.Profiles, e.config.TopicFromAttribute, ld.ResourceProfiles())
355368
}
356369

357370
func (*kafkaProfilesMessenger) partitionData(ld pprofile.Profiles) iter.Seq2[[]byte, pprofile.Profiles] {

exporter/kafkaexporter/kafka_exporter_test.go

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/kafkaclient"
3232
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/metadata"
33+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
3334
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/kafkatest"
3435
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic"
3536
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
@@ -982,6 +983,90 @@ func TestLogsPusher_partitioning(t *testing.T) {
982983
assert.Equal(t, keys[0], keys[1])
983984
assert.NotEqual(t, keys[0], keys[2])
984985
})
986+
t.Run("trace_id_partitioning", func(t *testing.T) {
987+
config := createDefaultConfig().(*Config)
988+
config.PartitionLogsByTraceID = true
989+
exp, producer := newMockLogsExporter(t, *config, componenttest.NewNopHost())
990+
991+
// Build input with three ResourceLogs: two share the same TraceID, one has a different TraceID.
992+
in := plog.NewLogs()
993+
var rls []plog.ResourceLogs
994+
tid1 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1})
995+
tid2 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2})
996+
997+
makeResourceLogs := func(tid pcommon.TraceID) plog.ResourceLogs {
998+
rl := in.ResourceLogs().AppendEmpty()
999+
rl.Resource().Attributes().PutStr("service.name", "svc")
1000+
sl := rl.ScopeLogs().AppendEmpty()
1001+
lr := sl.LogRecords().AppendEmpty()
1002+
lr.SetTraceID(tid)
1003+
return rl
1004+
}
1005+
rl1 := makeResourceLogs(tid1)
1006+
rl2 := makeResourceLogs(tid1)
1007+
rl3 := makeResourceLogs(tid2)
1008+
rls = append(rls, rl1, rl2, rl3)
1009+
1010+
var keys [][]byte
1011+
for i := 0; i < len(rls); i++ {
1012+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(
1013+
func(msg *sarama.ProducerMessage) error {
1014+
value, err := msg.Value.Encode()
1015+
require.NoError(t, err)
1016+
output, err := (&plog.ProtoUnmarshaler{}).UnmarshalLogs(value)
1017+
require.NoError(t, err)
1018+
1019+
require.Equal(t, 1, output.ResourceLogs().Len())
1020+
assert.NoError(t, plogtest.CompareResourceLogs(
1021+
rls[i],
1022+
output.ResourceLogs().At(0),
1023+
))
1024+
1025+
key, err := msg.Key.Encode()
1026+
require.NoError(t, err)
1027+
keys = append(keys, key)
1028+
return nil
1029+
},
1030+
)
1031+
}
1032+
1033+
err := exp.exportData(t.Context(), in)
1034+
require.NoError(t, err)
1035+
1036+
require.Len(t, keys, 3)
1037+
// Keys should be the hex TraceID only, identical for same TraceID, different otherwise.
1038+
expected1 := []byte(traceutil.TraceIDToHexOrEmptyString(tid1))
1039+
expected2 := []byte(traceutil.TraceIDToHexOrEmptyString(tid2))
1040+
assert.Equal(t, expected1, keys[0])
1041+
assert.Equal(t, expected1, keys[1])
1042+
assert.Equal(t, expected2, keys[2])
1043+
assert.NotEqual(t, keys[0], keys[2])
1044+
})
1045+
1046+
// ensure that when TraceID partitioning is enabled but a log record has no TraceID,
1047+
// the exporter falls back to default partitioning (nil key).
1048+
t.Run("trace_id_partitioning_missing_traceid_defaults_to_nil_key", func(t *testing.T) {
1049+
config := createDefaultConfig().(*Config)
1050+
config.PartitionLogsByTraceID = true
1051+
exp, producer := newMockLogsExporter(t, *config, componenttest.NewNopHost())
1052+
1053+
in := plog.NewLogs()
1054+
rl := in.ResourceLogs().AppendEmpty()
1055+
rl.Resource().Attributes().PutStr("service.name", "svc")
1056+
sl := rl.ScopeLogs().AppendEmpty()
1057+
_ = sl.LogRecords().AppendEmpty()
1058+
1059+
producer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(
1060+
func(msg *sarama.ProducerMessage) error {
1061+
if msg.Key != nil {
1062+
return fmt.Errorf("expected nil key for missing TraceID, got %v", msg.Key)
1063+
}
1064+
return nil
1065+
},
1066+
)
1067+
1068+
require.NoError(t, exp.exportData(t.Context(), in))
1069+
})
9851070
}
9861071

9871072
func TestProfilesPusher(t *testing.T) {
@@ -1379,11 +1464,11 @@ func Test_GetTopic(t *testing.T) {
13791464
topic := ""
13801465
switch r := tests[i].resource.(type) {
13811466
case pmetric.ResourceMetricsSlice:
1382-
topic = getTopic(tests[i].ctx, tests[i].signalCfg, tests[i].topicFromAttribute, r)
1467+
topic = getTopic[pmetric.ResourceMetrics](tests[i].ctx, tests[i].signalCfg, tests[i].topicFromAttribute, r)
13831468
case ptrace.ResourceSpansSlice:
1384-
topic = getTopic(tests[i].ctx, tests[i].signalCfg, tests[i].topicFromAttribute, r)
1469+
topic = getTopic[ptrace.ResourceSpans](tests[i].ctx, tests[i].signalCfg, tests[i].topicFromAttribute, r)
13851470
case plog.ResourceLogsSlice:
1386-
topic = getTopic(tests[i].ctx, tests[i].signalCfg, tests[i].topicFromAttribute, r)
1471+
topic = getTopic[plog.ResourceLogs](tests[i].ctx, tests[i].signalCfg, tests[i].topicFromAttribute, r)
13871472
}
13881473
assert.Equal(t, tests[i].wantTopic, topic)
13891474
})
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
kafka:
2+
topic: spans
3+
brokers:
4+
- "foo:123"
5+
- "bar:456"
6+
producer:
7+
max_message_bytes: 10000000
8+
required_acks: -1 # WaitForAll
9+
timeout: 10s
10+
partition_traces_by_id: true
11+
partition_metrics_by_resource_attributes: true
12+
partition_logs_by_resource_attributes: true
13+
partition_logs_by_trace_id: true
14+
sending_queue:
15+
enabled: true
16+
num_consumers: 2
17+
queue_size: 10
18+
retry_on_failure:
19+
enabled: true
20+
initial_interval: 10s
21+
max_interval: 60s
22+
max_elapsed_time: 10m
23+
kafka/legacy_topic:
24+
topic: legacy_topic
25+
metrics:
26+
topic: metrics_topic
27+
logs:
28+
topic_from_metadata_key: metadata_key
29+
kafka/legacy_encoding:
30+
encoding: legacy_encoding
31+
metrics:
32+
encoding: metrics_encoding

0 commit comments

Comments
 (0)