diff --git a/.chloggen/exporterhelper_merge_metadata.yaml b/.chloggen/exporterhelper_merge_metadata.yaml new file mode 100644 index 00000000000..49d06300249 --- /dev/null +++ b/.chloggen/exporterhelper_merge_metadata.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pkg/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add `metadata_keys` configuration to `sending_queue` to partition batches by client metadata" + +# One or more tracking issues or pull requests related to the change +issues: [14139] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The `metadata_keys` configuration option is now available in the `sending_queue` section for all exporters. + When specified, batches are partitioned based on the values of the listed metadata keys, allowing separate batching per metadata partition. This feature + is automatically configured when using `exporterhelper.WithQueue()`. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 147ff280a7e..bf846236a3e 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -202,6 +202,8 @@ func WithQueue(cfg queuebatch.Config) Option { // WithQueueBatch enables queueing for an exporter. // This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// If cfg.MetadataKeys is set, it will automatically configure the partitioner and merge function +// to partition batches based on the specified metadata keys. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func WithQueueBatch(cfg queuebatch.Config, set queuebatch.Settings[request.Request]) Option { @@ -213,6 +215,17 @@ func WithQueueBatch(cfg queuebatch.Config, set queuebatch.Settings[request.Reque if cfg.StorageID != nil && set.Encoding == nil { return errors.New("`Settings.Encoding` must not be nil when persistent queue is enabled") } + // Automatically configure partitioner if MetadataKeys is set + if len(cfg.MetadataKeys) > 0 { + if set.Partitioner != nil { + return errors.New("cannot use metadata_keys when a custom partitioner is already configured") + } + if set.MergeCtx != nil { + return errors.New("cannot use metadata_keys when a custom merge function is already configured") + } + set.Partitioner = queuebatch.NewMetadataKeysPartitioner(cfg.MetadataKeys) + set.MergeCtx = queuebatch.NewMetadataKeysMergeCtx(cfg.MetadataKeys) + } o.queueBatchSettings = set o.queueCfg = cfg return nil diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index 5a5da91a9f8..3d6613d7ae4 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -88,6 +88,107 @@ func TestBaseExporterLogging(t *testing.T) { require.NoError(t, bs.Shutdown(context.Background())) } +func TestWithQueue_MetadataKeys(t *testing.T) { + t.Run("with MetadataKeys - configures partitioner and merge function", func(t *testing.T) { + qCfg := NewDefaultQueueConfig() + qCfg.MetadataKeys = []string{"key1", "key2"} + qCfg.Enabled = true + + be, err := NewBaseExporter( + exportertest.NewNopSettings(exportertest.NopType), + pipeline.SignalMetrics, + noopExport, + WithQueueBatchSettings(newFakeQueueBatch()), + WithQueue(qCfg), + ) + require.NoError(t, err) + assert.NotNil(t, be) + + // Verify partitioner and merge function are configured + assert.NotNil(t, be.queueBatchSettings.Partitioner, "Partitioner should be set when MetadataKeys is provided") + assert.NotNil(t, be.queueBatchSettings.MergeCtx, "MergeCtx should be set when MetadataKeys is provided") + }) + + t.Run("without MetadataKeys - does not configure partitioner", func(t *testing.T) { + tests := []struct { + name string + metadataKeys []string + }{ + {"empty slice", []string{}}, + {"nil", nil}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + qCfg := NewDefaultQueueConfig() + qCfg.MetadataKeys = tt.metadataKeys + qCfg.Enabled = true + + be, err := NewBaseExporter( + exportertest.NewNopSettings(exportertest.NopType), + pipeline.SignalMetrics, + noopExport, + WithQueueBatchSettings(newFakeQueueBatch()), + WithQueue(qCfg), + ) + require.NoError(t, err) + assert.NotNil(t, be) + + // Verify partitioner and merge function are NOT configured + assert.Nil(t, be.queueBatchSettings.Partitioner, "Partitioner should not be set when MetadataKeys is %s", tt.name) + assert.Nil(t, be.queueBatchSettings.MergeCtx, "MergeCtx should not be set when MetadataKeys is %s", tt.name) + }) + } + }) + + t.Run("error when custom partitioner already set and metadata_keys used", func(t *testing.T) { + qCfg := NewDefaultQueueConfig() + qCfg.MetadataKeys = []string{"key1", "key2"} + qCfg.Enabled = true + + // Set up queue batch settings with a custom partitioner already configured + customSettings := newFakeQueueBatch() + customPartitioner := queuebatch.NewPartitioner( + func(context.Context, request.Request) string { + return "custom" + }, + ) + customSettings.Partitioner = customPartitioner + + _, err := NewBaseExporter( + exportertest.NewNopSettings(exportertest.NopType), + pipeline.SignalMetrics, + noopExport, + WithQueueBatchSettings(customSettings), + WithQueue(qCfg), + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "cannot use metadata_keys when a custom partitioner is already configured") + }) + + t.Run("error when custom merge function already set and metadata_keys used", func(t *testing.T) { + qCfg := NewDefaultQueueConfig() + qCfg.MetadataKeys = []string{"key1", "key2"} + qCfg.Enabled = true + + // Set up queue batch settings with a custom merge function already configured + customSettings := newFakeQueueBatch() + customSettings.MergeCtx = func(context.Context, context.Context) context.Context { + return context.Background() + } + + _, err := NewBaseExporter( + exportertest.NewNopSettings(exportertest.NopType), + pipeline.SignalMetrics, + noopExport, + WithQueueBatchSettings(customSettings), + WithQueue(qCfg), + ) + require.Error(t, err) + assert.Contains(t, err.Error(), "cannot use metadata_keys when a custom merge function is already configured") + }) +} + func TestQueueRetryWithDisabledQueue(t *testing.T) { tests := []struct { name string diff --git a/exporter/exporterhelper/internal/queuebatch/config.go b/exporter/exporterhelper/internal/queuebatch/config.go index ecef3ce3040..36c5d5707e8 100644 --- a/exporter/exporterhelper/internal/queuebatch/config.go +++ b/exporter/exporterhelper/internal/queuebatch/config.go @@ -6,6 +6,7 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel import ( "errors" "fmt" + "strings" "time" "go.opentelemetry.io/collector/component" @@ -46,6 +47,16 @@ type Config struct { // BatchConfig it configures how the requests are consumed from the queue and batch together during consumption. Batch configoptional.Optional[BatchConfig] `mapstructure:"batch"` + + // MetadataKeys is a list of client.Metadata keys that will be used to partition + // the data into batches. If this setting is empty, a single batcher instance + // will be used. When this setting is not empty, one batcher will be used per + // distinct combination of values for the listed metadata keys. + // + // Empty value and unset metadata are treated as distinct cases. + // + // Entries are case-insensitive. Duplicated entries will trigger a validation error. + MetadataKeys []string `mapstructure:"metadata_keys"` } func (cfg *Config) Unmarshal(conf *confmap.Conf) error { @@ -90,6 +101,16 @@ func (cfg *Config) Validate() error { } } + // Validate metadata_keys for duplicates (case-insensitive) + uniq := map[string]bool{} + for _, k := range cfg.MetadataKeys { + l := strings.ToLower(k) + if _, has := uniq[l]; has { + return fmt.Errorf("duplicate entry in metadata_keys: %q (case-insensitive)", l) + } + uniq[l] = true + } + return nil } diff --git a/exporter/exporterhelper/internal/queuebatch/config_test.go b/exporter/exporterhelper/internal/queuebatch/config_test.go index f55f7bcb8ff..2d64127b651 100644 --- a/exporter/exporterhelper/internal/queuebatch/config_test.go +++ b/exporter/exporterhelper/internal/queuebatch/config_test.go @@ -56,6 +56,45 @@ func TestConfig_Validate(t *testing.T) { assert.NoError(t, xconfmap.Validate(cfg)) } +func TestConfig_Validate_MetadataKeys(t *testing.T) { + t.Run("no duplicates - valid", func(t *testing.T) { + cfg := newTestConfig() + cfg.MetadataKeys = []string{"key1", "key2", "key3"} + require.NoError(t, xconfmap.Validate(cfg)) + }) + + t.Run("duplicate keys mixed case - invalid", func(t *testing.T) { + cfg := newTestConfig() + cfg.MetadataKeys = []string{"Key1", "kEy1", "key2"} + err := xconfmap.Validate(cfg) + require.Error(t, err) + assert.Contains(t, err.Error(), "duplicate entry in metadata_keys") + assert.Contains(t, err.Error(), "key1") + assert.Contains(t, err.Error(), "case-insensitive") + }) + + t.Run("empty metadata_keys - valid", func(t *testing.T) { + cfg := newTestConfig() + cfg.MetadataKeys = []string{} + require.NoError(t, xconfmap.Validate(cfg)) + }) + + t.Run("nil metadata_keys - valid", func(t *testing.T) { + cfg := newTestConfig() + cfg.MetadataKeys = nil + require.NoError(t, xconfmap.Validate(cfg)) + }) + + t.Run("multiple duplicates - reports first duplicate", func(t *testing.T) { + cfg := newTestConfig() + cfg.MetadataKeys = []string{"key1", "key2", "key1", "key2"} + err := xconfmap.Validate(cfg) + require.Error(t, err) + assert.Contains(t, err.Error(), "duplicate entry in metadata_keys") + assert.Contains(t, err.Error(), "key1") + }) +} + func TestBatchConfig_Validate(t *testing.T) { cfg := newTestBatchConfig() require.NoError(t, xconfmap.Validate(cfg)) diff --git a/exporter/exporterhelper/internal/queuebatch/metadata_partitioner.go b/exporter/exporterhelper/internal/queuebatch/metadata_partitioner.go new file mode 100644 index 00000000000..47b39840768 --- /dev/null +++ b/exporter/exporterhelper/internal/queuebatch/metadata_partitioner.go @@ -0,0 +1,88 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" + +import ( + "bytes" + "context" + "fmt" + "slices" + + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" +) + +// metadataKeysPartitioner partitions requests based on client metadata keys. +type metadataKeysPartitioner struct { + keys []string +} + +// NewMetadataKeysPartitioner creates a new partitioner that partitions requests +// based on the specified metadata keys. If keys is empty, returns nil. +func NewMetadataKeysPartitioner(keys []string) Partitioner[request.Request] { + if len(keys) == 0 { + return nil + } + return &metadataKeysPartitioner{keys: keys} +} + +// GetKey returns a partition key based on the metadata keys configured. +func (p *metadataKeysPartitioner) GetKey( + ctx context.Context, + _ request.Request, +) string { + var kb bytes.Buffer + meta := client.FromContext(ctx).Metadata + + var afterFirst bool + for _, k := range p.keys { + if values := meta.Get(k); len(values) != 0 { + if afterFirst { + kb.WriteByte(0) + } + kb.WriteString(k) + afterFirst = true + for _, val := range values { + kb.WriteByte(0) + kb.WriteString(val) + } + } + } + return kb.String() +} + +// NewMetadataKeysMergeCtx creates a merge function for contexts that merges +// metadata based on the specified keys. If keys is empty, returns nil. +func NewMetadataKeysMergeCtx(keys []string) func(context.Context, context.Context) context.Context { + if len(keys) == 0 { + return nil + } + return func(ctx1, ctx2 context.Context) context.Context { + m1 := client.FromContext(ctx1).Metadata + m2 := client.FromContext(ctx2).Metadata + + m := make(map[string][]string, len(keys)) + for _, key := range keys { + v1 := m1.Get(key) + v2 := m2.Get(key) + if len(v1) == 0 && len(v2) == 0 { + continue + } + + // Since the mergeCtx is based on partition key, we MUST have the same + // partition key-values in both the metadata. If they are not same then + // fail fast and dramatically. + if !slices.Equal(v1, v2) { + panic(fmt.Errorf( + "unexpected client metadata found when merging context for key %s", key, + )) + } + m[key] = v1 + } + return client.NewContext( + context.Background(), + client.Info{Metadata: client.NewMetadata(m)}, + ) + } +} diff --git a/exporter/exporterhelper/internal/queuebatch/metadata_partitioner_test.go b/exporter/exporterhelper/internal/queuebatch/metadata_partitioner_test.go new file mode 100644 index 00000000000..3a269116d70 --- /dev/null +++ b/exporter/exporterhelper/internal/queuebatch/metadata_partitioner_test.go @@ -0,0 +1,416 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" +) + +func TestMetadataKeysPartitioner_MergeCtx(t *testing.T) { + t.Run("merge contexts with same metadata key values", func(t *testing.T) { + mergeCtx := NewMetadataKeysMergeCtx([]string{"key1", "key2"}) + + ctx1 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + "key2": {"value2"}, + }), + }, + ) + + ctx2 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + "key2": {"value2"}, + }), + }, + ) + + mergedCtx := mergeCtx(ctx1, ctx2) + require.NotNil(t, mergedCtx) + + mergedMeta := client.FromContext(mergedCtx).Metadata + assert.Equal(t, []string{"value1"}, mergedMeta.Get("key1")) + assert.Equal(t, []string{"value2"}, mergedMeta.Get("key2")) + }) + + t.Run("merge contexts with same metadata key values and additional keys", func(t *testing.T) { + mergeCtx := NewMetadataKeysMergeCtx([]string{"key1"}) + + ctx1 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + "other": {"other1"}, + }), + }, + ) + + ctx2 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + "other": {"other2"}, + }), + }, + ) + + mergedCtx := mergeCtx(ctx1, ctx2) + require.NotNil(t, mergedCtx) + + mergedMeta := client.FromContext(mergedCtx).Metadata + assert.Equal(t, []string{"value1"}, mergedMeta.Get("key1")) + // Other keys should not be in merged metadata since they're not in partitioner keys + assert.Empty(t, mergedMeta.Get("other")) + }) + + t.Run("merge contexts with empty metadata", func(t *testing.T) { + mergeCtx := NewMetadataKeysMergeCtx([]string{"key1"}) + + ctx1 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{}), + }, + ) + + ctx2 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{}), + }, + ) + + mergedCtx := mergeCtx(ctx1, ctx2) + require.NotNil(t, mergedCtx) + + mergedMeta := client.FromContext(mergedCtx).Metadata + assert.Empty(t, mergedMeta.Get("key1")) + }) + + t.Run("panic when one context has metadata and other is empty", func(t *testing.T) { + mergeCtx := NewMetadataKeysMergeCtx([]string{"key1"}) + + ctx1 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + }), + }, + ) + + ctx2 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{}), + }, + ) + + // When one has value and other is empty, they are different, so it should panic + assert.Panics(t, func() { + mergeCtx(ctx1, ctx2) + }, "should panic when contexts have different metadata values") + }) + + t.Run("panic when contexts have different metadata key values", func(t *testing.T) { + mergeCtx := NewMetadataKeysMergeCtx([]string{"key1"}) + + ctx1 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + }), + }, + ) + + ctx2 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value2"}, + }), + }, + ) + + assert.Panics(t, func() { + mergeCtx(ctx1, ctx2) + }, "should panic when contexts have different metadata values") + }) + + t.Run("panic when contexts have different metadata key values for multiple keys", func(t *testing.T) { + mergeCtx := NewMetadataKeysMergeCtx([]string{"key1", "key2"}) + + ctx1 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + "key2": {"value2"}, + }), + }, + ) + + ctx2 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + "key2": {"different"}, + }), + }, + ) + + assert.Panics(t, func() { + mergeCtx(ctx1, ctx2) + }, "should panic when contexts have different metadata values") + }) + + t.Run("merge contexts with multiple values for same key", func(t *testing.T) { + mergeCtx := NewMetadataKeysMergeCtx([]string{"key1"}) + + ctx1 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1", "value2"}, + }), + }, + ) + + ctx2 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1", "value2"}, + }), + }, + ) + + mergedCtx := mergeCtx(ctx1, ctx2) + require.NotNil(t, mergedCtx) + + mergedMeta := client.FromContext(mergedCtx).Metadata + assert.Equal(t, []string{"value1", "value2"}, mergedMeta.Get("key1")) + }) + + t.Run("panic when contexts have different multiple values for same key", func(t *testing.T) { + mergeCtx := NewMetadataKeysMergeCtx([]string{"key1"}) + + ctx1 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1", "value2"}, + }), + }, + ) + + ctx2 := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1", "value3"}, + }), + }, + ) + + assert.Panics(t, func() { + mergeCtx(ctx1, ctx2) + }, "should panic when contexts have different metadata values") + }) +} + +func TestMetadataKeysPartitioner_GetKey(t *testing.T) { + t.Run("single key with single value", func(t *testing.T) { + partitioner := NewMetadataKeysPartitioner([]string{"key1"}) + + ctx := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + }), + }, + ) + + key := partitioner.GetKey(ctx, &requesttest.FakeRequest{}) + expected := "key1\x00value1" + assert.Equal(t, expected, key) + }) + + t.Run("single key with multiple values", func(t *testing.T) { + partitioner := NewMetadataKeysPartitioner([]string{"key1"}) + + ctx := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1", "value2", "value3"}, + }), + }, + ) + + key := partitioner.GetKey(ctx, &requesttest.FakeRequest{}) + // Format: key1\0value1\0value2\0value3 + expected := "key1\x00value1\x00value2\x00value3" + assert.Equal(t, expected, key) + }) + + t.Run("multiple keys with values", func(t *testing.T) { + partitioner := NewMetadataKeysPartitioner([]string{"key1", "key2"}) + + ctx := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + "key2": {"value2"}, + }), + }, + ) + + key := partitioner.GetKey(ctx, &requesttest.FakeRequest{}) + // Format: key1\0value1\0key2\0value2 (separator between keys) + expected := "key1\x00value1\x00key2\x00value2" + assert.Equal(t, expected, key) + }) + + t.Run("multiple keys with multiple values", func(t *testing.T) { + partitioner := NewMetadataKeysPartitioner([]string{"key1", "key2"}) + + ctx := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1", "value1b"}, + "key2": {"value2", "value2b"}, + }), + }, + ) + + key := partitioner.GetKey(ctx, &requesttest.FakeRequest{}) + // Format: key1\0value1\0value1b\0key2\0value2\0value2b + expected := "key1\x00value1\x00value1b\x00key2\x00value2\x00value2b" + assert.Equal(t, expected, key) + }) + + t.Run("keys that don't exist in metadata are skipped", func(t *testing.T) { + partitioner := NewMetadataKeysPartitioner([]string{"key1", "key2", "key3"}) + + ctx := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + // key2 is missing + "key3": {"value3"}, + }), + }, + ) + + key := partitioner.GetKey(ctx, &requesttest.FakeRequest{}) + // Format: key1\0value1\0key3\0value3 (key2 is skipped) + expected := "key1\x00value1\x00key3\x00value3" + assert.Equal(t, expected, key) + }) + + t.Run("empty metadata returns empty string", func(t *testing.T) { + partitioner := NewMetadataKeysPartitioner([]string{"key1", "key2"}) + + ctx := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{}), + }, + ) + + key := partitioner.GetKey(ctx, &requesttest.FakeRequest{}) + assert.Empty(t, key) + }) + + t.Run("keys with empty values are skipped", func(t *testing.T) { + partitioner := NewMetadataKeysPartitioner([]string{"key1", "key2", "key3"}) + + ctx := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + "key2": {}, // empty slice + "key3": {"value3"}, + }), + }, + ) + + key := partitioner.GetKey(ctx, &requesttest.FakeRequest{}) + // Format: key1\0value1\0key3\0value3 (key2 is skipped) + expected := "key1\x00value1\x00key3\x00value3" + assert.Equal(t, expected, key) + }) + + t.Run("keys in order respect partitioner key order", func(t *testing.T) { + partitioner := NewMetadataKeysPartitioner([]string{"key3", "key1", "key2"}) + + ctx := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + "key2": {"value2"}, + "key3": {"value3"}, + }), + }, + ) + + key := partitioner.GetKey(ctx, &requesttest.FakeRequest{}) + // Format should follow partitioner order: key3\0value3\0key1\0value1\0key2\0value2 + expected := "key3\x00value3\x00key1\x00value1\x00key2\x00value2" + assert.Equal(t, expected, key) + }) + + t.Run("additional metadata keys not in partitioner are ignored", func(t *testing.T) { + partitioner := NewMetadataKeysPartitioner([]string{"key1"}) + + ctx := client.NewContext( + context.Background(), + client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "key1": {"value1"}, + "other": {"other1"}, + "extra": {"extra1"}, + }), + }, + ) + + key := partitioner.GetKey(ctx, &requesttest.FakeRequest{}) + // Format: key1\0value1 (other keys are ignored) + expected := "key1\x00value1" + assert.Equal(t, expected, key) + }) + + t.Run("returns nil partitioner when keys are empty", func(t *testing.T) { + partitioner := NewMetadataKeysPartitioner([]string{}) + assert.Nil(t, partitioner) + }) + + t.Run("returns nil merge function when keys are empty", func(t *testing.T) { + mergeCtx := NewMetadataKeysMergeCtx([]string{}) + assert.Nil(t, mergeCtx) + }) +} diff --git a/exporter/exporterhelper/xexporterhelper/new_request.go b/exporter/exporterhelper/xexporterhelper/new_request.go index 7fe7472eeb5..53dcdfe4e3b 100644 --- a/exporter/exporterhelper/xexporterhelper/new_request.go +++ b/exporter/exporterhelper/xexporterhelper/new_request.go @@ -81,6 +81,8 @@ func NewTracesQueueBatchSettings() QueueBatchSettings { // WithQueueBatch enables queueing and batching for an exporter. // This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// If cfg.MetadataKeys is set, it will automatically configure the partitioner and merge function +// to partition batches based on the specified metadata keys. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func WithQueueBatch(cfg exporterhelper.QueueBatchConfig, set QueueBatchSettings) exporterhelper.Option {