Skip to content

Commit 02fbe61

Browse files
authored
Remove usage of generics for request.Sizer, no need to have it (#14025)
Internal cleanup, no need for changelog. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent a4a8dfd commit 02fbe61

File tree

10 files changed

+69
-82
lines changed

10 files changed

+69
-82
lines changed

exporter/exporterhelper/internal/queue/memory_queue.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ var (
2626
)
2727

2828
// memoryQueue is an in-memory implementation of a Queue.
29-
type memoryQueue[T any] struct {
29+
type memoryQueue[T request.Request] struct {
3030
component.StartFunc
3131
refCounter ReferenceCounter[T]
32-
sizer request.Sizer[T]
32+
sizer request.Sizer
3333
cap int64
3434

3535
mu sync.Mutex
@@ -47,7 +47,7 @@ type memoryQueue[T any] struct {
4747
func newMemoryQueue[T request.Request](set Settings[T]) readableQueue[T] {
4848
sq := &memoryQueue[T]{
4949
refCounter: set.ReferenceCounter,
50-
sizer: set.activeSizer(),
50+
sizer: request.NewSizer(set.SizerType),
5151
cap: set.Capacity,
5252
items: &linkedQueue[T]{},
5353
waitForResult: set.WaitForResult,

exporter/exporterhelper/internal/queue/persistent_queue.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,15 @@ var indexDonePool = sync.Pool{
6868
// write read x └── currently dispatched item
6969
// index index x
7070
// xxxx deleted
71-
type persistentQueue[T any] struct {
71+
type persistentQueue[T request.Request] struct {
7272
logger *zap.Logger
7373
client storage.Client
7474
encoding Encoding[T]
7575
capacity int64
7676
sizerType request.SizerType
77-
activeSizer request.Sizer[T]
78-
itemsSizer request.Sizer[T]
79-
bytesSizer request.Sizer[T]
77+
activeSizer request.Sizer
78+
itemsSizer request.Sizer
79+
bytesSizer request.Sizer
8080
storageID component.ID
8181
id component.ID
8282
signal pipeline.Signal
@@ -99,9 +99,9 @@ func newPersistentQueue[T request.Request](set Settings[T]) readableQueue[T] {
9999
encoding: set.Encoding,
100100
capacity: set.Capacity,
101101
sizerType: set.SizerType,
102-
activeSizer: set.activeSizer(),
103-
itemsSizer: request.NewItemsSizer[T](),
104-
bytesSizer: request.NewBytesSizer[T](),
102+
activeSizer: request.NewSizer(set.SizerType),
103+
itemsSizer: request.NewItemsSizer(),
104+
bytesSizer: request.NewBytesSizer(),
105105
storageID: *set.StorageID,
106106
id: set.ID,
107107
signal: set.Signal,

exporter/exporterhelper/internal/queue/queue.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,6 @@ type Settings[T request.Request] struct {
7878
Telemetry component.TelemetrySettings
7979
}
8080

81-
func (set *Settings[T]) activeSizer() request.Sizer[T] {
82-
switch set.SizerType {
83-
case request.SizerTypeBytes:
84-
return request.NewBytesSizer[T]()
85-
case request.SizerTypeItems:
86-
return request.NewItemsSizer[T]()
87-
default:
88-
return request.RequestsSizer[T]{}
89-
}
90-
}
91-
9281
func NewQueue[T request.Request](set Settings[T], next ConsumeFunc[T]) (Queue[T], error) {
9382
q := newBaseQueue(set)
9483
oq, err := newObsQueue(set, newAsyncQueue(q, set.NumConsumers, next, set.ReferenceCounter))

exporter/exporterhelper/internal/queuebatch/batcher.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func NewBatcher(cfg configoptional.Optional[BatchConfig], set batcherSettings[re
3535
return newDisabledBatcher(set.next), nil
3636
}
3737

38-
sizer := activeSizer[request.Request](cfg.Get().Sizer)
38+
sizer := request.NewSizer(cfg.Get().Sizer)
3939
if sizer == nil {
4040
return nil, fmt.Errorf("queue_batch: unsupported sizer %q", cfg.Get().Sizer)
4141
}
@@ -46,14 +46,3 @@ func NewBatcher(cfg configoptional.Optional[BatchConfig], set batcherSettings[re
4646

4747
return newMultiBatcher(*cfg.Get(), sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.mergeCtx, set.next, set.logger), nil
4848
}
49-
50-
func activeSizer[T request.Request](sizerType request.SizerType) request.Sizer[T] {
51-
switch sizerType {
52-
case request.SizerTypeBytes:
53-
return request.NewBytesSizer[T]()
54-
case request.SizerTypeItems:
55-
return request.NewItemsSizer[T]()
56-
default:
57-
return request.RequestsSizer[T]{}
58-
}
59-
}

exporter/exporterhelper/internal/queuebatch/multi_batcher.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@ import (
1414
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1515
)
1616

17-
var _ Batcher[request.Request] = (*multiBatcher)(nil)
18-
1917
type multiBatcher struct {
2018
cfg BatchConfig
2119
wp *workerPool
22-
sizer request.Sizer[request.Request]
20+
sizer request.Sizer
2321
partitioner Partitioner[request.Request]
2422
mergeCtx func(context.Context, context.Context) context.Context
2523
consumeFunc sender.SendFunc[request.Request]
@@ -29,7 +27,7 @@ type multiBatcher struct {
2927

3028
func newMultiBatcher(
3129
bCfg BatchConfig,
32-
sizer request.Sizer[request.Request],
30+
sizer request.Sizer,
3331
wp *workerPool,
3432
partitioner Partitioner[request.Request],
3533
mergeCtx func(context.Context, context.Context) context.Context,

exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestMultiBatcher_NoTimeout(t *testing.T) {
2828
type partitionKey struct{}
2929

3030
ba := newMultiBatcher(cfg,
31-
request.NewItemsSizer[request.Request](),
31+
request.NewItemsSizer(),
3232
newWorkerPool(1),
3333
NewPartitioner(func(ctx context.Context, _ request.Request) string {
3434
return ctx.Value(partitionKey{}).(string)
@@ -81,7 +81,7 @@ func TestMultiBatcher_Timeout(t *testing.T) {
8181
type partitionKey struct{}
8282

8383
ba := newMultiBatcher(cfg,
84-
request.NewItemsSizer[request.Request](),
84+
request.NewItemsSizer(),
8585
newWorkerPool(1),
8686
NewPartitioner(func(ctx context.Context, _ request.Request) string {
8787
return ctx.Value(partitionKey{}).(string)

exporter/exporterhelper/internal/queuebatch/partition_batcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type batch struct {
2929
type partitionBatcher struct {
3030
cfg BatchConfig
3131
wp *workerPool
32-
sizer request.Sizer[request.Request]
32+
sizer request.Sizer
3333
mergeCtx func(context.Context, context.Context) context.Context
3434
consumeFunc sender.SendFunc[request.Request]
3535
stopWG sync.WaitGroup
@@ -42,7 +42,7 @@ type partitionBatcher struct {
4242

4343
func newPartitionBatcher(
4444
cfg BatchConfig,
45-
sizer request.Sizer[request.Request],
45+
sizer request.Sizer,
4646
mergeCtx func(context.Context, context.Context) context.Context,
4747
wp *workerPool,
4848
next sender.SendFunc[request.Request],

exporter/exporterhelper/internal/queuebatch/partition_batcher_test.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,31 +29,31 @@ func TestPartitionBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T)
2929
tests := []struct {
3030
name string
3131
sizerType request.SizerType
32-
sizer request.Sizer[request.Request]
32+
sizer request.Sizer
3333
maxWorkers int
3434
}{
3535
{
3636
name: "items/one_worker",
3737
sizerType: request.SizerTypeItems,
38-
sizer: request.NewItemsSizer[request.Request](),
38+
sizer: request.NewItemsSizer(),
3939
maxWorkers: 1,
4040
},
4141
{
4242
name: "items/three_workers",
4343
sizerType: request.SizerTypeItems,
44-
sizer: request.NewItemsSizer[request.Request](),
44+
sizer: request.NewItemsSizer(),
4545
maxWorkers: 3,
4646
},
4747
{
4848
name: "bytes/one_worker",
4949
sizerType: request.SizerTypeBytes,
50-
sizer: request.NewBytesSizer[request.Request](),
50+
sizer: request.NewItemsSizer(),
5151
maxWorkers: 1,
5252
},
5353
{
5454
name: "bytes/three_workers",
5555
sizerType: request.SizerTypeBytes,
56-
sizer: request.NewBytesSizer[request.Request](),
56+
sizer: request.NewItemsSizer(),
5757
maxWorkers: 3,
5858
},
5959
}
@@ -95,31 +95,31 @@ func TestPartitionBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
9595
tests := []struct {
9696
name string
9797
sizerType request.SizerType
98-
sizer request.Sizer[request.Request]
98+
sizer request.Sizer
9999
maxWorkers int
100100
}{
101101
{
102102
name: "items/one_worker",
103103
sizerType: request.SizerTypeItems,
104-
sizer: request.NewItemsSizer[request.Request](),
104+
sizer: request.NewItemsSizer(),
105105
maxWorkers: 1,
106106
},
107107
{
108108
name: "items/three_workers",
109109
sizerType: request.SizerTypeItems,
110-
sizer: request.NewItemsSizer[request.Request](),
110+
sizer: request.NewItemsSizer(),
111111
maxWorkers: 3,
112112
},
113113
{
114114
name: "bytes/one_worker",
115115
sizerType: request.SizerTypeBytes,
116-
sizer: request.NewBytesSizer[request.Request](),
116+
sizer: request.NewItemsSizer(),
117117
maxWorkers: 1,
118118
},
119119
{
120120
name: "bytes/three_workers",
121121
sizerType: request.SizerTypeBytes,
122-
sizer: request.NewBytesSizer[request.Request](),
122+
sizer: request.NewItemsSizer(),
123123
maxWorkers: 3,
124124
},
125125
}
@@ -176,31 +176,31 @@ func TestPartitionBatcher_NoSplit_WithTimeout(t *testing.T) {
176176
tests := []struct {
177177
name string
178178
sizerType request.SizerType
179-
sizer request.Sizer[request.Request]
179+
sizer request.Sizer
180180
maxWorkers int
181181
}{
182182
{
183183
name: "items/one_worker",
184184
sizerType: request.SizerTypeItems,
185-
sizer: request.NewItemsSizer[request.Request](),
185+
sizer: request.NewItemsSizer(),
186186
maxWorkers: 1,
187187
},
188188
{
189189
name: "items/three_workers",
190190
sizerType: request.SizerTypeItems,
191-
sizer: request.NewItemsSizer[request.Request](),
191+
sizer: request.NewItemsSizer(),
192192
maxWorkers: 3,
193193
},
194194
{
195195
name: "bytes/one_worker",
196196
sizerType: request.SizerTypeBytes,
197-
sizer: request.NewBytesSizer[request.Request](),
197+
sizer: request.NewItemsSizer(),
198198
maxWorkers: 1,
199199
},
200200
{
201201
name: "bytes/three_workers",
202202
sizerType: request.SizerTypeBytes,
203-
sizer: request.NewBytesSizer[request.Request](),
203+
sizer: request.NewItemsSizer(),
204204
maxWorkers: 3,
205205
},
206206
}
@@ -247,31 +247,31 @@ func TestPartitionBatcher_Split_TimeoutDisabled(t *testing.T) {
247247
tests := []struct {
248248
name string
249249
sizerType request.SizerType
250-
sizer request.Sizer[request.Request]
250+
sizer request.Sizer
251251
maxWorkers int
252252
}{
253253
{
254254
name: "items/one_worker",
255255
sizerType: request.SizerTypeItems,
256-
sizer: request.NewItemsSizer[request.Request](),
256+
sizer: request.NewItemsSizer(),
257257
maxWorkers: 1,
258258
},
259259
{
260260
name: "items/three_workers",
261261
sizerType: request.SizerTypeItems,
262-
sizer: request.NewItemsSizer[request.Request](),
262+
sizer: request.NewItemsSizer(),
263263
maxWorkers: 3,
264264
},
265265
{
266266
name: "bytes/one_worker",
267267
sizerType: request.SizerTypeBytes,
268-
sizer: request.NewBytesSizer[request.Request](),
268+
sizer: request.NewItemsSizer(),
269269
maxWorkers: 1,
270270
},
271271
{
272272
name: "bytes/three_workers",
273273
sizerType: request.SizerTypeBytes,
274-
sizer: request.NewBytesSizer[request.Request](),
274+
sizer: request.NewItemsSizer(),
275275
maxWorkers: 3,
276276
},
277277
}
@@ -333,7 +333,7 @@ func TestPartitionBatcher_Shutdown(t *testing.T) {
333333
}
334334

335335
sink := requesttest.NewSink()
336-
ba := newPartitionBatcher(cfg, request.NewItemsSizer[request.Request](), nil, newWorkerPool(2), sink.Export, zap.NewNop())
336+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export, zap.NewNop())
337337
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
338338

339339
done := newFakeDone()
@@ -362,7 +362,7 @@ func TestPartitionBatcher_MergeError(t *testing.T) {
362362
}
363363

364364
sink := requesttest.NewSink()
365-
ba := newPartitionBatcher(cfg, request.NewItemsSizer[request.Request](), nil, newWorkerPool(2), sink.Export, zap.NewNop())
365+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(2), sink.Export, zap.NewNop())
366366
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
367367
t.Cleanup(func() {
368368
require.NoError(t, ba.Shutdown(context.Background()))
@@ -396,7 +396,7 @@ func TestPartitionBatcher_PartialSuccessError(t *testing.T) {
396396
core, observed := observer.New(zap.WarnLevel)
397397
logger := zap.New(core)
398398
sink := requesttest.NewSink()
399-
ba := newPartitionBatcher(cfg, request.NewItemsSizer[request.Request](), nil, newWorkerPool(1), sink.Export, logger)
399+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, logger)
400400
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
401401

402402
done := newFakeDone()
@@ -438,7 +438,7 @@ func TestSPartitionBatcher_PartialSuccessError_AfterOkRequest(t *testing.T) {
438438
core, observed := observer.New(zap.WarnLevel)
439439
logger := zap.New(core)
440440
sink := requesttest.NewSink()
441-
ba := newPartitionBatcher(cfg, request.NewItemsSizer[request.Request](), nil, newWorkerPool(1), sink.Export, logger)
441+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, logger)
442442
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
443443

444444
done := newFakeDone()
@@ -498,7 +498,7 @@ func TestShardBatcher_EmptyRequestList(t *testing.T) {
498498
}
499499

500500
sink := requesttest.NewSink()
501-
ba := newPartitionBatcher(cfg, request.NewItemsSizer[request.Request](), nil, newWorkerPool(1), sink.Export, zap.NewNop())
501+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), nil, newWorkerPool(1), sink.Export, zap.NewNop())
502502
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
503503
t.Cleanup(func() {
504504
require.NoError(t, ba.Shutdown(context.Background()))
@@ -548,7 +548,7 @@ func TestPartitionBatcher_ContextMerging(t *testing.T) {
548548
MinSize: 10,
549549
}
550550
sink := requesttest.NewSink()
551-
ba := newPartitionBatcher(cfg, request.NewItemsSizer[request.Request](), tt.mergeCtxFunc, newWorkerPool(1), sink.Export, zap.NewNop())
551+
ba := newPartitionBatcher(cfg, request.NewItemsSizer(), tt.mergeCtxFunc, newWorkerPool(1), sink.Export, zap.NewNop())
552552
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
553553

554554
done := newFakeDone()

0 commit comments

Comments
 (0)