Skip to content
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5261989
adding first changes for metadata
gizas Nov 6, 2025
5b74827
comment
gizas Nov 6, 2025
5d3f323
adding tests
gizas Nov 6, 2025
32f7498
mod tidy and cahngelog
gizas Nov 6, 2025
fbeef6b
mod tidy and cahngelog
gizas Nov 6, 2025
00a18f3
format and adding tests for coverage
gizas Nov 7, 2025
e07c0aa
adding metadata keys in xexporterhelper.WithQueueBatch()
gizas Nov 20, 2025
1178536
merging with main
gizas Nov 20, 2025
331a703
update changelog
gizas Nov 20, 2025
5b40717
adding metadata tests
gizas Nov 20, 2025
a89aef2
updating test for xexporterhelper
gizas Nov 20, 2025
f74f3a4
updating test for xexporterhelper
gizas Nov 20, 2025
f4b629f
updating base_exporter and add checks for Partitioner and Mergectx
gizas Nov 24, 2025
c290ef6
Merge branch 'main' into otlphttp_mergge_metadata
gizas Nov 24, 2025
77be9c9
updating test for base_exporter
gizas Nov 24, 2025
0836ef7
updating with exporterhelper.WithQueueBatch
gizas Nov 26, 2025
c7746e5
Merge branch 'main' into otlphttp_mergge_metadata
gizas Nov 26, 2025
759ac81
removing exporterhelper.WithQueueBatch
gizas Nov 26, 2025
d233ec4
Merge branch 'otlphttp_mergge_metadata' of github.com:gizas/opentelem…
gizas Nov 26, 2025
bafdd11
updating chloggen message and kept only exporterhelper
gizas Nov 26, 2025
e197735
updating chloggen message and kept only exporterhelper
gizas Nov 26, 2025
5c4559b
Merge branch 'main' into otlphttp_mergge_metadata
gizas Nov 26, 2025
8cb459d
removing duplicate xexporterhelper/new_request_test.go test file
gizas Nov 26, 2025
8b766a3
gotidy
gizas Nov 26, 2025
513a1fd
Update exporter/exporterhelper/internal/queuebatch/config_test.go
gizas Nov 28, 2025
5502a54
Merge branch 'main' into otlphttp_mergge_metadata
gizas Nov 28, 2025
a341256
combine tests
gizas Nov 28, 2025
2fd73f9
updating description
gizas Nov 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .chloggen/exporterhelper_merge_metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: pkg/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add `metadata_keys` configuration to `sending_queue` to partition batches by client metadata"

# One or more tracking issues or pull requests related to the change
issues: [14139]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The `metadata_keys` configuration option is now available in the `sending_queue` section for all exporters
using queue batch functionality. When specified, batches are partitioned based on the values of the listed metadata keys, allowing separate batching per metadata partition. This feature
is automatically configured when using `exporterhelper.WithQueue()`.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
13 changes: 13 additions & 0 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func WithQueue(cfg queuebatch.Config) Option {

// WithQueueBatch enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// If cfg.MetadataKeys is set, it will automatically configure the partitioner and merge function
// to partition batches based on the specified metadata keys.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithQueueBatch(cfg queuebatch.Config, set queuebatch.Settings[request.Request]) Option {
Expand All @@ -213,6 +215,17 @@ func WithQueueBatch(cfg queuebatch.Config, set queuebatch.Settings[request.Reque
if cfg.StorageID != nil && set.Encoding == nil {
return errors.New("`Settings.Encoding` must not be nil when persistent queue is enabled")
}
// Automatically configure partitioner if MetadataKeys is set
if len(cfg.MetadataKeys) > 0 {
if set.Partitioner != nil {
return errors.New("cannot use metadata_keys when a custom partitioner is already configured")
}
if set.MergeCtx != nil {
return errors.New("cannot use metadata_keys when a custom merge function is already configured")
}
set.Partitioner = queuebatch.NewMetadataKeysPartitioner(cfg.MetadataKeys)
set.MergeCtx = queuebatch.NewMetadataKeysMergeCtx(cfg.MetadataKeys)
}
Comment on lines +218 to +228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I think we should probably compose them. But we can always do that in a followup - it would be an enhancement over what you have implemented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it was quicker just to error for now.

Do you want me to create a follow up once this merged?

Some details that we consider are things like:
Which logic to merge first, probably the custom logic?
How to check metadata within each custom partition with overall metadata_keys if those exist?

o.queueBatchSettings = set
o.queueCfg = cfg
return nil
Expand Down
109 changes: 109 additions & 0 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,115 @@ func TestBaseExporterLogging(t *testing.T) {
require.NoError(t, bs.Shutdown(context.Background()))
}

func TestWithQueue_MetadataKeys(t *testing.T) {
t.Run("with MetadataKeys - configures partitioner and merge function", func(t *testing.T) {
qCfg := NewDefaultQueueConfig()
qCfg.MetadataKeys = []string{"key1", "key2"}
qCfg.Enabled = true

be, err := NewBaseExporter(
exportertest.NewNopSettings(exportertest.NopType),
pipeline.SignalMetrics,
noopExport,
WithQueueBatchSettings(newFakeQueueBatch()),
WithQueue(qCfg),
)
require.NoError(t, err)
assert.NotNil(t, be)

// Verify partitioner and merge function are configured
assert.NotNil(t, be.queueBatchSettings.Partitioner, "Partitioner should be set when MetadataKeys is provided")
assert.NotNil(t, be.queueBatchSettings.MergeCtx, "MergeCtx should be set when MetadataKeys is provided")
})

t.Run("without MetadataKeys - does not configure partitioner", func(t *testing.T) {
qCfg := NewDefaultQueueConfig()
qCfg.MetadataKeys = []string{}
qCfg.Enabled = true

be, err := NewBaseExporter(
exportertest.NewNopSettings(exportertest.NopType),
pipeline.SignalMetrics,
noopExport,
WithQueueBatchSettings(newFakeQueueBatch()),
WithQueue(qCfg),
)
require.NoError(t, err)
assert.NotNil(t, be)

// Verify partitioner and merge function are NOT configured
assert.Nil(t, be.queueBatchSettings.Partitioner, "Partitioner should not be set when MetadataKeys is empty")
assert.Nil(t, be.queueBatchSettings.MergeCtx, "MergeCtx should not be set when MetadataKeys is empty")
})

t.Run("with nil MetadataKeys - does not configure partitioner", func(t *testing.T) {
qCfg := NewDefaultQueueConfig()
qCfg.MetadataKeys = nil
qCfg.Enabled = true

be, err := NewBaseExporter(
exportertest.NewNopSettings(exportertest.NopType),
pipeline.SignalMetrics,
noopExport,
WithQueueBatchSettings(newFakeQueueBatch()),
WithQueue(qCfg),
)
require.NoError(t, err)
assert.NotNil(t, be)

// Verify partitioner and merge function are NOT configured
assert.Nil(t, be.queueBatchSettings.Partitioner, "Partitioner should not be set when MetadataKeys is nil")
assert.Nil(t, be.queueBatchSettings.MergeCtx, "MergeCtx should not be set when MetadataKeys is nil")
})

t.Run("error when custom partitioner already set and metadata_keys used", func(t *testing.T) {
qCfg := NewDefaultQueueConfig()
qCfg.MetadataKeys = []string{"key1", "key2"}
qCfg.Enabled = true

// Set up queue batch settings with a custom partitioner already configured
customSettings := newFakeQueueBatch()
customPartitioner := queuebatch.NewPartitioner(
func(context.Context, request.Request) string {
return "custom"
},
)
customSettings.Partitioner = customPartitioner

_, err := NewBaseExporter(
exportertest.NewNopSettings(exportertest.NopType),
pipeline.SignalMetrics,
noopExport,
WithQueueBatchSettings(customSettings),
WithQueue(qCfg),
)
require.Error(t, err)
assert.Contains(t, err.Error(), "cannot use metadata_keys when a custom partitioner is already configured")
})

t.Run("error when custom merge function already set and metadata_keys used", func(t *testing.T) {
qCfg := NewDefaultQueueConfig()
qCfg.MetadataKeys = []string{"key1", "key2"}
qCfg.Enabled = true

// Set up queue batch settings with a custom merge function already configured
customSettings := newFakeQueueBatch()
customSettings.MergeCtx = func(context.Context, context.Context) context.Context {
return context.Background()
}

_, err := NewBaseExporter(
exportertest.NewNopSettings(exportertest.NopType),
pipeline.SignalMetrics,
noopExport,
WithQueueBatchSettings(customSettings),
WithQueue(qCfg),
)
require.Error(t, err)
assert.Contains(t, err.Error(), "cannot use metadata_keys when a custom merge function is already configured")
})
}

func TestQueueRetryWithDisabledQueue(t *testing.T) {
tests := []struct {
name string
Expand Down
21 changes: 21 additions & 0 deletions exporter/exporterhelper/internal/queuebatch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel
import (
"errors"
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -46,6 +47,16 @@ type Config struct {

// BatchConfig it configures how the requests are consumed from the queue and batch together during consumption.
Batch configoptional.Optional[BatchConfig] `mapstructure:"batch"`

// MetadataKeys is a list of client.Metadata keys that will be used to partition
// the data into batches. If this setting is empty, a single batcher instance
// will be used. When this setting is not empty, one batcher will be used per
// distinct combination of values for the listed metadata keys.
//
// Empty value and unset metadata are treated as distinct cases.
//
// Entries are case-insensitive. Duplicated entries will trigger a validation error.
MetadataKeys []string `mapstructure:"metadata_keys"`
Copy link
Member

@dmitryax dmitryax Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to provide an option to split batches using keys from other sources, not only request-context metadata, but also resource attributes, instrumentation name etc. #12795 has more details. This interface would conflict with that functionality. I’m not sure it’s worth having two configuration options that overlap or conflict with each other. I’d prefer to make progress toward #12795 instead. @axw WDYT?

Copy link
Contributor

@axw axw Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder @dmitryax, good points. I'd also love to make some progress on that - I'll have try to have a play with some ideas today.

This interface would conflict with that functionality.

Why would it conflict? Could we not have both? i.e. config for partitioning by metadata, as well as say support for partitioner extensions? It should also be possible to compose them, as I suggested in another comment thread, e.g. partition first by metadata, and then partition those batches via an extension.

(Playing devil's advocate a little bit, not necessarily saying that's what we should do.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent a while looking into this today, and I'm struggling to see a good path for adding resource/scope/record-level partitioning to exporterhelper.

I previously had a bit of a warped understanding of the Partitioner interface. I had in mind that it was operating on plog.Logs and friends, but it's actually operating on request.Request. That interface totally hides the data type, which makes splitting/merging batches complicated except for in generic ways, i.e. sizer-based splitting/merging.

I started looking into adding partitioning before we even get to batching, but stopped pretty quickly because that's effectively equivalent to processor. So... I'm back to thinking we should:

@dmitryax if you have ideas about how we can achieve it all in exporterhelper I'd be keen to hear them.

}

func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
Expand Down Expand Up @@ -90,6 +101,16 @@ func (cfg *Config) Validate() error {
}
}

// Validate metadata_keys for duplicates (case-insensitive)
uniq := map[string]bool{}
for _, k := range cfg.MetadataKeys {
l := strings.ToLower(k)
if _, has := uniq[l]; has {
return fmt.Errorf("duplicate entry in metadata_keys: %q (case-insensitive)", l)
}
uniq[l] = true
}

return nil
}

Expand Down
59 changes: 59 additions & 0 deletions exporter/exporterhelper/internal/queuebatch/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,65 @@ func TestConfig_Validate(t *testing.T) {
assert.NoError(t, xconfmap.Validate(cfg))
}

func TestConfig_Validate_MetadataKeys(t *testing.T) {
t.Run("no duplicates - valid", func(t *testing.T) {
cfg := newTestConfig()
cfg.MetadataKeys = []string{"key1", "key2", "key3"}
require.NoError(t, xconfmap.Validate(cfg))
})

t.Run("duplicate keys same case - invalid", func(t *testing.T) {
cfg := newTestConfig()
cfg.MetadataKeys = []string{"key1", "key2", "key1"}
err := xconfmap.Validate(cfg)
require.Error(t, err)
assert.Contains(t, err.Error(), "duplicate entry in metadata_keys")
assert.Contains(t, err.Error(), "key1")
assert.Contains(t, err.Error(), "case-insensitive")
})

t.Run("duplicate keys different case - invalid", func(t *testing.T) {
cfg := newTestConfig()
cfg.MetadataKeys = []string{"key1", "KEY1", "key2"}
err := xconfmap.Validate(cfg)
require.Error(t, err)
assert.Contains(t, err.Error(), "duplicate entry in metadata_keys")
assert.Contains(t, err.Error(), "key1")
assert.Contains(t, err.Error(), "case-insensitive")
})

t.Run("duplicate keys mixed case - invalid", func(t *testing.T) {
cfg := newTestConfig()
cfg.MetadataKeys = []string{"Key1", "kEy1", "key2"}
err := xconfmap.Validate(cfg)
require.Error(t, err)
assert.Contains(t, err.Error(), "duplicate entry in metadata_keys")
assert.Contains(t, err.Error(), "key1")
assert.Contains(t, err.Error(), "case-insensitive")
})

t.Run("empty metadata_keys - valid", func(t *testing.T) {
cfg := newTestConfig()
cfg.MetadataKeys = []string{}
require.NoError(t, xconfmap.Validate(cfg))
})

t.Run("nil metadata_keys - valid", func(t *testing.T) {
cfg := newTestConfig()
cfg.MetadataKeys = nil
require.NoError(t, xconfmap.Validate(cfg))
})

t.Run("multiple duplicates - reports first duplicate", func(t *testing.T) {
cfg := newTestConfig()
cfg.MetadataKeys = []string{"key1", "key2", "key1", "key2"}
err := xconfmap.Validate(cfg)
require.Error(t, err)
assert.Contains(t, err.Error(), "duplicate entry in metadata_keys")
assert.Contains(t, err.Error(), "key1")
})
}

func TestBatchConfig_Validate(t *testing.T) {
cfg := newTestBatchConfig()
require.NoError(t, xconfmap.Validate(cfg))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"

import (
"bytes"
"context"
"fmt"
"slices"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
)

// metadataKeysPartitioner partitions requests based on client metadata keys.
type metadataKeysPartitioner struct {
keys []string
}

// NewMetadataKeysPartitioner creates a new partitioner that partitions requests
// based on the specified metadata keys. If keys is empty, returns nil.
func NewMetadataKeysPartitioner(keys []string) Partitioner[request.Request] {
if len(keys) == 0 {
return nil
}
return &metadataKeysPartitioner{keys: keys}
}

// GetKey returns a partition key based on the metadata keys configured.
func (p *metadataKeysPartitioner) GetKey(
ctx context.Context,
_ request.Request,
) string {
var kb bytes.Buffer
meta := client.FromContext(ctx).Metadata

var afterFirst bool
for _, k := range p.keys {
if values := meta.Get(k); len(values) != 0 {
if afterFirst {
kb.WriteByte(0)
}
kb.WriteString(k)
afterFirst = true
for _, val := range values {
kb.WriteByte(0)
kb.WriteString(val)
}
}
}
return kb.String()
}

// NewMetadataKeysMergeCtx creates a merge function for contexts that merges
// metadata based on the specified keys. If keys is empty, returns nil.
func NewMetadataKeysMergeCtx(keys []string) func(context.Context, context.Context) context.Context {
if len(keys) == 0 {
return nil
}
return func(ctx1, ctx2 context.Context) context.Context {
m1 := client.FromContext(ctx1).Metadata
m2 := client.FromContext(ctx2).Metadata

m := make(map[string][]string, len(keys))
for _, key := range keys {
v1 := m1.Get(key)
v2 := m2.Get(key)
if len(v1) == 0 && len(v2) == 0 {
continue
}

// Since the mergeCtx is based on partition key, we MUST have the same
// partition key-values in both the metadata. If they are not same then
// fail fast and dramatically.
if !slices.Equal(v1, v2) {
panic(fmt.Errorf(
"unexpected client metadata found when merging context for key %s", key,
))
}
m[key] = v1
}
return client.NewContext(
context.Background(),
client.Info{Metadata: client.NewMetadata(m)},
)
}
}
Loading
Loading