Skip to content

Commit 5e60afe

Browse files
committed
merge context
1 parent db680a7 commit 5e60afe

File tree

7 files changed

+64
-83
lines changed

7 files changed

+64
-83
lines changed

exporter/exporterhelper/internal/queuebatch/batch_context.go

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,32 +31,9 @@ func parentsFromContext(ctx context.Context) []trace.Link {
3131
return LinksFromContext(ctx)
3232
}
3333

34-
func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context {
35-
return mergedContext{
36-
context.WithValue(context.Background(),
37-
batchSpanLinksKey,
38-
append(parentsFromContext(ctx1), parentsFromContext(ctx2)...)),
39-
ctx1,
40-
ctx2,
41-
}
42-
}
43-
44-
type mergedContext struct {
45-
context.Context
46-
ctx1 context.Context
47-
ctx2 context.Context
48-
}
49-
50-
func (c mergedContext) Value(key any) any {
51-
if c.ctx1 != nil {
52-
if val := c.ctx1.Value(key); val != nil {
53-
return val
54-
}
55-
}
56-
if c.ctx2 != nil {
57-
if val := c.ctx2.Value(key); val != nil {
58-
return val
59-
}
60-
}
61-
return nil
34+
func contextWithMergedLinks(mergedCtx context.Context, ctx1 context.Context, ctx2 context.Context) context.Context {
35+
return context.WithValue(
36+
mergedCtx,
37+
batchSpanLinksKey,
38+
append(parentsFromContext(ctx1), parentsFromContext(ctx2)...))
6239
}

exporter/exporterhelper/internal/queuebatch/batch_context_test.go

Lines changed: 35 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,31 @@ import (
1313
"go.opentelemetry.io/collector/component/componenttest"
1414
)
1515

16-
type testContextKey string
16+
type testTimestampKeyType int
17+
18+
const testTimestampKey testTimestampKeyType = iota
19+
20+
// mergeCtxFunc corresponds to user specified mergeCtx function in the batcher settings.
21+
// This specific merge Context function keeps the greater of timestamps from two contexts.
22+
func mergeCtxFunc(ctx1, ctx2 context.Context) context.Context {
23+
timestamp1 := ctx1.Value(testTimestampKey)
24+
timestamp2 := ctx2.Value(testTimestampKey)
25+
if timestamp1 != nil && timestamp2 != nil {
26+
if timestamp1.(int) > timestamp2.(int) {
27+
return context.WithValue(context.Background(), testTimestampKey, timestamp1)
28+
}
29+
return context.WithValue(context.Background(), testTimestampKey, timestamp2)
30+
}
31+
if timestamp1 != nil {
32+
return context.WithValue(context.Background(), testTimestampKey, timestamp1)
33+
}
34+
return context.WithValue(context.Background(), testTimestampKey, timestamp2)
35+
}
36+
37+
// mergeContextHelper performs the same operation done during batching.
38+
func mergeContextHelper(ctx1, ctx2 context.Context) context.Context {
39+
return contextWithMergedLinks(mergeCtxFunc(ctx1, ctx2), ctx1, ctx2)
40+
}
1741

1842
func TestBatchContextLink(t *testing.T) {
1943
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
@@ -30,53 +54,19 @@ func TestBatchContextLink(t *testing.T) {
3054
ctx4, span4 := tracer.Start(ctx1, "span4")
3155
defer span4.End()
3256

33-
batchContext := contextWithMergedLinks(ctx2, ctx3)
34-
batchContext = contextWithMergedLinks(batchContext, ctx4)
57+
batchContext := mergeContextHelper(ctx2, ctx3)
58+
batchContext = mergeContextHelper(batchContext, ctx4)
3559

3660
actualLinks := LinksFromContext(batchContext)
37-
// require.Len(t, actualLinks, 3)
38-
require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[0].SpanContext)
39-
// require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext)
40-
// require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext)
61+
require.Len(t, actualLinks, 3)
62+
require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext)
63+
require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext)
64+
require.Equal(t, trace.SpanContextFromContext(ctx4), actualLinks[2].SpanContext)
4165
}
4266

4367
func TestMergedContext_GetValue(t *testing.T) {
44-
ctx1 := context.WithValue(context.Background(), testContextKey("key1"), "value1")
45-
ctx2 := context.WithValue(context.Background(), testContextKey("key1"), "value2")
46-
ctx2 = context.WithValue(ctx2, testContextKey("key2"), "value2")
47-
ctx3 := context.WithValue(context.Background(), testContextKey("key2"), "value3")
48-
49-
var mergedCtx context.Context
50-
mergedCtx = contextWithMergedLinks(ctx1, ctx2)
51-
mergedCtx = contextWithMergedLinks(mergedCtx, ctx3)
52-
53-
require.Equal(t, "value1", mergedCtx.Value(testContextKey("key1")))
54-
require.Equal(t, "value2", mergedCtx.Value(testContextKey("key2")))
55-
require.Nil(t, mergedCtx.Value("nonexistent_key"))
56-
}
57-
58-
func TestMergedValues_GetValue_NilContext(t *testing.T) {
59-
ctx1 := context.WithValue(context.Background(), testContextKey("key1"), "value1")
60-
var ctx2 context.Context // nil context
61-
62-
var mergedCtx context.Context
63-
mergedCtx = contextWithMergedLinks(ctx1, ctx2)
64-
65-
require.Equal(t, "value1", mergedCtx.Value(testContextKey("key1")))
66-
require.Nil(t, mergedCtx.Value(testContextKey("key2")))
67-
require.Nil(t, mergedCtx.Value("nonexistent_key"))
68-
}
69-
70-
func TestMergedValues_GetValue_CanceledContext(t *testing.T) {
71-
ctx1 := context.WithValue(context.Background(), testContextKey("key1"), "value1")
72-
ctx2, cancel := context.WithCancel(context.WithValue(context.Background(), testContextKey("key2"), "value2"))
73-
74-
var mergedCtx context.Context
75-
mergedCtx = contextWithMergedLinks(ctx1, ctx2)
76-
77-
cancel()
78-
79-
require.Equal(t, "value1", mergedCtx.Value(testContextKey("key1")))
80-
require.Equal(t, "value2", mergedCtx.Value(testContextKey("key2")))
81-
require.Nil(t, mergedCtx.Value("nonexistent_key"))
68+
ctx1 := context.WithValue(context.Background(), testTimestampKey, 1234)
69+
ctx2 := context.WithValue(context.Background(), testTimestampKey, 2345)
70+
batchContext := mergeContextHelper(ctx1, ctx2)
71+
require.Equal(t, 2345, batchContext.Value(testTimestampKey))
8272
}

exporter/exporterhelper/internal/queuebatch/batcher.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type batcherSettings[T any] struct {
2424
itemsSizer request.Sizer[T]
2525
bytesSizer request.Sizer[T]
2626
partitioner Partitioner[T]
27+
mergeCtx func(context.Context, context.Context) context.Context
2728
next sender.SendFunc[T]
2829
maxWorkers int
2930
}
@@ -39,10 +40,10 @@ func NewBatcher(cfg configoptional.Optional[BatchConfig], set batcherSettings[re
3940
}
4041

4142
if set.partitioner == nil {
42-
return newPartitionBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.next), nil
43+
return newPartitionBatcher(*cfg.Get(), sizer, set.mergeCtx, newWorkerPool(set.maxWorkers), set.next), nil
4344
}
4445

45-
return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.next), nil
46+
return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.mergeCtx, set.next), nil
4647
}
4748

4849
func activeSizer[T any](sizerType request.SizerType, itemsSizer, bytesSizer request.Sizer[T]) request.Sizer[T] {

exporter/exporterhelper/internal/queuebatch/multi_batcher.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type multiBatcher struct {
1919
wp *workerPool
2020
sizer request.Sizer[request.Request]
2121
partitioner Partitioner[request.Request]
22+
mergeCtx func(context.Context, context.Context) context.Context
2223
consumeFunc sender.SendFunc[request.Request]
2324
shards sync.Map
2425
}
@@ -28,13 +29,15 @@ func newMultiBatcher(
2829
sizer request.Sizer[request.Request],
2930
wp *workerPool,
3031
partitioner Partitioner[request.Request],
32+
mergeCtx func(context.Context, context.Context) context.Context,
3133
next sender.SendFunc[request.Request],
3234
) *multiBatcher {
3335
return &multiBatcher{
3436
cfg: bCfg,
3537
wp: wp,
3638
sizer: sizer,
3739
partitioner: partitioner,
40+
mergeCtx: mergeCtx,
3841
consumeFunc: next,
3942
}
4043
}
@@ -46,7 +49,7 @@ func (mb *multiBatcher) getPartition(ctx context.Context, req request.Request) *
4649
if found {
4750
return s.(*partitionBatcher)
4851
}
49-
newS := newPartitionBatcher(mb.cfg, mb.sizer, mb.wp, mb.consumeFunc)
52+
newS := newPartitionBatcher(mb.cfg, mb.sizer, mb.mergeCtx, mb.wp, mb.consumeFunc)
5053
_ = newS.Start(ctx, nil)
5154
s, loaded := mb.shards.LoadOrStore(key, newS)
5255
// If not loaded, there was a race condition in adding the new shard. Shutdown the newly created shard.

exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func TestMultiBatcher_NoTimeout(t *testing.T) {
3232
NewPartitioner(func(ctx context.Context, _ request.Request) string {
3333
return ctx.Value(partitionKey{}).(string)
3434
}),
35+
nil,
3536
sink.Export,
3637
)
3738

@@ -83,6 +84,7 @@ func TestMultiBatcher_Timeout(t *testing.T) {
8384
NewPartitioner(func(ctx context.Context, _ request.Request) string {
8485
return ctx.Value(partitionKey{}).(string)
8586
}),
87+
nil,
8688
sink.Export,
8789
)
8890

exporter/exporterhelper/internal/queuebatch/partition_batcher.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type partitionBatcher struct {
2929
cfg BatchConfig
3030
wp *workerPool
3131
sizer request.Sizer[request.Request]
32+
mergeCtx func(context.Context, context.Context) context.Context
3233
consumeFunc sender.SendFunc[request.Request]
3334
stopWG sync.WaitGroup
3435
currentBatchMu sync.Mutex
@@ -40,13 +41,15 @@ type partitionBatcher struct {
4041
func newPartitionBatcher(
4142
cfg BatchConfig,
4243
sizer request.Sizer[request.Request],
44+
mergeCtx func(context.Context, context.Context) context.Context,
4345
wp *workerPool,
4446
next sender.SendFunc[request.Request],
4547
) *partitionBatcher {
4648
return &partitionBatcher{
4749
cfg: cfg,
4850
wp: wp,
4951
sizer: sizer,
52+
mergeCtx: mergeCtx,
5053
consumeFunc: next,
5154
shutdownCh: make(chan struct{}, 1),
5255
}
@@ -117,7 +120,12 @@ func (qb *partitionBatcher) Consume(ctx context.Context, req request.Request, do
117120
// Logic on how to deal with the current batch:
118121
qb.currentBatch.req = reqList[0]
119122
qb.currentBatch.done = append(qb.currentBatch.done, done)
120-
qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx)
123+
124+
if qb.mergeCtx != nil {
125+
qb.currentBatch.ctx = contextWithMergedLinks(qb.mergeCtx(qb.currentBatch.ctx, ctx), qb.currentBatch.ctx, ctx)
126+
} else {
127+
qb.currentBatch.ctx = contextWithMergedLinks(context.Background(), qb.currentBatch.ctx, ctx)
128+
}
121129

122130
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
123131
// cannot unlock and re-lock because we are not done processing all the responses.

exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T)
6060
}
6161

6262
sink := requesttest.NewSink()
63-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
63+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export)
6464
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
6565
t.Cleanup(func() {
6666
require.NoError(t, ba.Shutdown(context.Background()))
@@ -126,7 +126,7 @@ func TestPartitionBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
126126
}
127127

128128
sink := requesttest.NewSink()
129-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
129+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export)
130130
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
131131

132132
done := newFakeDone()
@@ -207,7 +207,7 @@ func TestPartitionBatcher_NoSplit_WithTimeout(t *testing.T) {
207207
}
208208

209209
sink := requesttest.NewSink()
210-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
210+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export)
211211
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
212212
t.Cleanup(func() {
213213
require.NoError(t, ba.Shutdown(context.Background()))
@@ -279,7 +279,7 @@ func TestPartitionBatcher_Split_TimeoutDisabled(t *testing.T) {
279279
}
280280

281281
sink := requesttest.NewSink()
282-
ba := newPartitionBatcher(cfg, tt.sizer, newWorkerPool(tt.maxWorkers), sink.Export)
282+
ba := newPartitionBatcher(cfg, tt.sizer, nil, newWorkerPool(tt.maxWorkers), sink.Export)
283283
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
284284

285285
done := newFakeDone()
@@ -327,7 +327,7 @@ func TestPartitionBatcher_Shutdown(t *testing.T) {
327327
}
328328

329329
sink := requesttest.NewSink()
330-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(2), sink.Export)
330+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export)
331331
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
332332

333333
done := newFakeDone()
@@ -356,7 +356,7 @@ func TestPartitionBatcher_MergeError(t *testing.T) {
356356
}
357357

358358
sink := requesttest.NewSink()
359-
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), newWorkerPool(2), sink.Export)
359+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export)
360360
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
361361
t.Cleanup(func() {
362362
require.NoError(t, ba.Shutdown(context.Background()))

0 commit comments

Comments
 (0)