Skip to content

Commit 1169338

Browse files
authored
Remove definition of Sizer from public API (#14001)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 2335ec7 commit 1169338

22 files changed

+217
-251
lines changed

.chloggen/rm-sizer.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: pkg/xexporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Remove definition of Sizer from public API and ability to configure.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [14001]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Now that Request has both Items/Bytes sizes no need to allow custom sizers.
20+
21+
# Optional: The change log or logs in which this entry should be included.
22+
# e.g. '[user]' or '[user, api]'
23+
# Include 'user' if the change is relevant to end users.
24+
# Include 'api' if there is a change to a library API.
25+
# Default: '[user]'
26+
change_logs: [api]

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,7 @@ func noopExport(context.Context, request.Request) error {
143143

144144
func newFakeQueueBatch() queuebatch.Settings[request.Request] {
145145
return queuebatch.Settings[request.Request]{
146-
Encoding: fakeEncoding{},
147-
ItemsSizer: request.NewItemsSizer(),
148-
BytesSizer: requesttest.NewBytesSizer(),
146+
Encoding: fakeEncoding{},
149147
}
150148
}
151149

exporter/exporterhelper/internal/queue/async_queue_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ func TestAsyncMemoryQueue(t *testing.T) {
2121
consumed := &atomic.Int64{}
2222

2323
set := newSettings(request.SizerTypeItems, 100)
24-
ac := newAsyncQueue(newMemoryQueue[int64](set),
25-
1, func(_ context.Context, _ int64, done Done) {
24+
ac := newAsyncQueue(newMemoryQueue[intRequest](set),
25+
1, func(_ context.Context, _ intRequest, done Done) {
2626
consumed.Add(1)
2727
done.OnDone(nil)
2828
}, set.ReferenceCounter)
@@ -38,8 +38,8 @@ func TestAsyncMemoryQueueBlocking(t *testing.T) {
3838
consumed := &atomic.Int64{}
3939
set := newSettings(request.SizerTypeItems, 100)
4040
set.BlockOnOverflow = true
41-
ac := newAsyncQueue(newMemoryQueue[int64](set),
42-
4, func(_ context.Context, _ int64, done Done) {
41+
ac := newAsyncQueue(newMemoryQueue[intRequest](set),
42+
4, func(_ context.Context, _ intRequest, done Done) {
4343
consumed.Add(1)
4444
done.OnDone(nil)
4545
}, set.ReferenceCounter)
@@ -64,8 +64,8 @@ func TestAsyncMemoryWaitForResultQueueBlocking(t *testing.T) {
6464
set := newSettings(request.SizerTypeItems, 100)
6565
set.BlockOnOverflow = true
6666
set.WaitForResult = true
67-
ac := newAsyncQueue(newMemoryQueue[int64](set),
68-
4, func(_ context.Context, _ int64, done Done) {
67+
ac := newAsyncQueue(newMemoryQueue[intRequest](set),
68+
4, func(_ context.Context, _ intRequest, done Done) {
6969
consumed.Add(1)
7070
done.OnDone(nil)
7171
}, set.ReferenceCounter)
@@ -89,8 +89,8 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
8989
stop := make(chan struct{})
9090
set := newSettings(request.SizerTypeItems, 10)
9191
set.BlockOnOverflow = true
92-
ac := newAsyncQueue(newMemoryQueue[int64](set),
93-
1, func(_ context.Context, _ int64, done Done) {
92+
ac := newAsyncQueue(newMemoryQueue[intRequest](set),
93+
1, func(_ context.Context, _ intRequest, done Done) {
9494
<-stop
9595
done.OnDone(nil)
9696
}, set.ReferenceCounter)
@@ -119,7 +119,7 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
119119
func BenchmarkAsyncMemoryQueue(b *testing.B) {
120120
consumed := &atomic.Int64{}
121121
set := newSettings(request.SizerTypeItems, int64(10*b.N))
122-
ac := newAsyncQueue(newMemoryQueue[int64](set), 1, func(_ context.Context, _ int64, done Done) {
122+
ac := newAsyncQueue(newMemoryQueue[intRequest](set), 1, func(_ context.Context, _ intRequest, done Done) {
123123
consumed.Add(1)
124124
done.OnDone(nil)
125125
}, set.ReferenceCounter)

exporter/exporterhelper/internal/queue/memory_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type memoryQueue[T any] struct {
4444

4545
// newMemoryQueue creates a sized elements channel. Each element is assigned a size by the provided sizer.
4646
// capacity is the capacity of the queue.
47-
func newMemoryQueue[T any](set Settings[T]) readableQueue[T] {
47+
func newMemoryQueue[T request.Request](set Settings[T]) readableQueue[T] {
4848
sq := &memoryQueue[T]{
4949
refCounter: set.ReferenceCounter,
5050
sizer: set.activeSizer(),

exporter/exporterhelper/internal/queue/memory_queue_test.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020

2121
func TestMemoryQueue(t *testing.T) {
2222
set := newSettings(request.SizerTypeItems, 7)
23-
q := newMemoryQueue[int64](set)
23+
q := newMemoryQueue[intRequest](set)
2424
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
2525
require.NoError(t, q.Offer(context.Background(), 1))
2626
assert.EqualValues(t, 1, q.Size())
@@ -33,27 +33,27 @@ func TestMemoryQueue(t *testing.T) {
3333
require.ErrorIs(t, q.Offer(context.Background(), 4), ErrQueueIsFull)
3434
assert.EqualValues(t, 4, q.Size())
3535

36-
assert.True(t, consume(q, func(_ context.Context, el int64) error {
36+
assert.True(t, consume(q, func(_ context.Context, el intRequest) error {
3737
assert.EqualValues(t, 1, el)
3838
return nil
3939
}))
4040
assert.EqualValues(t, 3, q.Size())
4141

42-
assert.True(t, consume(q, func(_ context.Context, el int64) error {
42+
assert.True(t, consume(q, func(_ context.Context, el intRequest) error {
4343
assert.EqualValues(t, 3, el)
4444
return nil
4545
}))
4646
assert.EqualValues(t, 0, q.Size())
4747

4848
require.NoError(t, q.Shutdown(context.Background()))
49-
assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil }))
49+
assert.False(t, consume(q, func(context.Context, intRequest) error { t.FailNow(); return nil }))
5050
require.NoError(t, q.Shutdown(context.Background()))
5151
}
5252

5353
func TestMemoryQueueBlockingCancelled(t *testing.T) {
5454
set := newSettings(request.SizerTypeItems, 5)
5555
set.BlockOnOverflow = true
56-
q := newMemoryQueue[int64](set)
56+
q := newMemoryQueue[intRequest](set)
5757
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
5858
require.NoError(t, q.Offer(context.Background(), 3))
5959
ctx, cancel := context.WithCancel(context.Background())
@@ -66,7 +66,7 @@ func TestMemoryQueueBlockingCancelled(t *testing.T) {
6666
cancel()
6767
wg.Wait()
6868
assert.EqualValues(t, 3, q.Size())
69-
assert.True(t, consume(q, func(_ context.Context, el int64) error {
69+
assert.True(t, consume(q, func(_ context.Context, el intRequest) error {
7070
assert.EqualValues(t, 3, el)
7171
return nil
7272
}))
@@ -75,30 +75,30 @@ func TestMemoryQueueBlockingCancelled(t *testing.T) {
7575

7676
func TestMemoryQueueDrainWhenShutdown(t *testing.T) {
7777
set := newSettings(request.SizerTypeItems, 7)
78-
q := newMemoryQueue[int64](set)
78+
q := newMemoryQueue[intRequest](set)
7979
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
8080
require.NoError(t, q.Offer(context.Background(), 1))
8181
require.NoError(t, q.Offer(context.Background(), 3))
8282

83-
assert.True(t, consume(q, func(_ context.Context, el int64) error {
83+
assert.True(t, consume(q, func(_ context.Context, el intRequest) error {
8484
assert.EqualValues(t, 1, el)
8585
return nil
8686
}))
8787
assert.EqualValues(t, 3, q.Size())
8888
require.NoError(t, q.Shutdown(context.Background()))
8989
assert.EqualValues(t, 3, q.Size())
90-
assert.True(t, consume(q, func(_ context.Context, el int64) error {
90+
assert.True(t, consume(q, func(_ context.Context, el intRequest) error {
9191
assert.EqualValues(t, 3, el)
9292
return nil
9393
}))
9494
assert.EqualValues(t, 0, q.Size())
95-
assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil }))
95+
assert.False(t, consume(q, func(context.Context, intRequest) error { t.FailNow(); return nil }))
9696
require.NoError(t, q.Shutdown(context.Background()))
9797
}
9898

9999
func TestMemoryQueueOfferInvalidSize(t *testing.T) {
100100
set := newSettings(request.SizerTypeItems, 1)
101-
q := newMemoryQueue[int64](set)
101+
q := newMemoryQueue[intRequest](set)
102102
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
103103
require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize)
104104
require.NoError(t, q.Shutdown(context.Background()))
@@ -107,25 +107,25 @@ func TestMemoryQueueOfferInvalidSize(t *testing.T) {
107107
func TestMemoryQueueRejectOverCapacityElements(t *testing.T) {
108108
set := newSettings(request.SizerTypeItems, 1)
109109
set.BlockOnOverflow = true
110-
q := newMemoryQueue[int64](set)
110+
q := newMemoryQueue[intRequest](set)
111111
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
112112
require.ErrorIs(t, q.Offer(context.Background(), 8), errSizeTooLarge)
113113
require.NoError(t, q.Shutdown(context.Background()))
114114
}
115115

116116
func TestMemoryQueueOfferZeroSize(t *testing.T) {
117117
set := newSettings(request.SizerTypeItems, 1)
118-
q := newMemoryQueue[int64](set)
118+
q := newMemoryQueue[intRequest](set)
119119
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
120120
require.NoError(t, q.Offer(context.Background(), 0))
121121
require.NoError(t, q.Shutdown(context.Background()))
122122
// Because the size 0 is ignored, nothing to drain.
123-
assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil }))
123+
assert.False(t, consume(q, func(context.Context, intRequest) error { t.FailNow(); return nil }))
124124
}
125125

126126
func TestMemoryQueueOverflow(t *testing.T) {
127127
set := newSettings(request.SizerTypeItems, 1)
128-
q := newMemoryQueue[int64](set)
128+
q := newMemoryQueue[intRequest](set)
129129
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
130130
require.NoError(t, q.Offer(context.Background(), 1))
131131
require.ErrorIs(t, q.Offer(context.Background(), 1), ErrQueueIsFull)
@@ -137,7 +137,7 @@ func TestMemoryQueueWaitForResultPassErrorBack(t *testing.T) {
137137
myErr := errors.New("test error")
138138
set := newSettings(request.SizerTypeItems, 100)
139139
set.WaitForResult = true
140-
q := newMemoryQueue[int64](set)
140+
q := newMemoryQueue[intRequest](set)
141141
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
142142
wg.Add(1)
143143
go func() {
@@ -147,7 +147,7 @@ func TestMemoryQueueWaitForResultPassErrorBack(t *testing.T) {
147147
assert.EqualValues(t, 1, req)
148148
done.OnDone(myErr)
149149
}()
150-
require.ErrorIs(t, q.Offer(context.Background(), int64(1)), myErr)
150+
require.ErrorIs(t, q.Offer(context.Background(), intRequest(1)), myErr)
151151
require.NoError(t, q.Shutdown(context.Background()))
152152
wg.Wait()
153153
}
@@ -157,7 +157,7 @@ func TestMemoryQueueWaitForResultCancelIncomingRequest(t *testing.T) {
157157
stop := make(chan struct{})
158158
set := newSettings(request.SizerTypeItems, 100)
159159
set.WaitForResult = true
160-
q := newMemoryQueue[int64](set)
160+
q := newMemoryQueue[intRequest](set)
161161
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
162162

163163
// Consume async new data.
@@ -177,7 +177,7 @@ func TestMemoryQueueWaitForResultCancelIncomingRequest(t *testing.T) {
177177
<-time.After(time.Second)
178178
cancel()
179179
}()
180-
require.ErrorIs(t, q.Offer(ctx, int64(1)), context.Canceled)
180+
require.ErrorIs(t, q.Offer(ctx, intRequest(1)), context.Canceled)
181181
close(stop)
182182
require.NoError(t, q.Shutdown(context.Background()))
183183
wg.Wait()
@@ -188,7 +188,7 @@ func TestMemoryQueueWaitForResultSizeAndCapacity(t *testing.T) {
188188
stop := make(chan struct{})
189189
set := newSettings(request.SizerTypeItems, 100)
190190
set.WaitForResult = true
191-
q := newMemoryQueue[int64](set)
191+
q := newMemoryQueue[intRequest](set)
192192
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
193193

194194
// Consume async new data.
@@ -206,7 +206,7 @@ func TestMemoryQueueWaitForResultSizeAndCapacity(t *testing.T) {
206206
wg.Add(1)
207207
go func() {
208208
defer wg.Done()
209-
assert.NoError(t, q.Offer(context.Background(), int64(1)))
209+
assert.NoError(t, q.Offer(context.Background(), intRequest(1)))
210210
}()
211211
assert.Eventually(t, func() bool { return q.Size() == 1 }, 1*time.Second, 10*time.Millisecond)
212212
assert.EqualValues(t, 100, q.Capacity())
@@ -220,7 +220,7 @@ func BenchmarkMemoryQueueWaitForResult(b *testing.B) {
220220
consumed := &atomic.Int64{}
221221
set := newSettings(request.SizerTypeItems, 100)
222222
set.WaitForResult = true
223-
q := newMemoryQueue[int64](set)
223+
q := newMemoryQueue[intRequest](set)
224224
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
225225

226226
// Consume async new data.
@@ -232,14 +232,14 @@ func BenchmarkMemoryQueueWaitForResult(b *testing.B) {
232232
if !ok {
233233
return
234234
}
235-
consumed.Add(req)
235+
consumed.Add(int64(req))
236236
done.OnDone(nil)
237237
}
238238
}()
239239

240240
b.ReportAllocs()
241241
for b.Loop() {
242-
require.NoError(b, q.Offer(context.Background(), int64(1)))
242+
require.NoError(b, q.Offer(context.Background(), intRequest(1)))
243243
}
244244
require.NoError(b, q.Shutdown(context.Background()))
245245
assert.Equal(b, int64(b.N), consumed.Load())

exporter/exporterhelper/internal/queue/persistent_queue.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,15 @@ type persistentQueue[T any] struct {
9393
}
9494

9595
// newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
96-
func newPersistentQueue[T any](set Settings[T]) readableQueue[T] {
96+
func newPersistentQueue[T request.Request](set Settings[T]) readableQueue[T] {
9797
pq := &persistentQueue[T]{
9898
logger: set.Telemetry.Logger,
9999
encoding: set.Encoding,
100100
capacity: set.Capacity,
101101
sizerType: set.SizerType,
102102
activeSizer: set.activeSizer(),
103-
itemsSizer: set.ItemsSizer,
104-
bytesSizer: set.BytesSizer,
103+
itemsSizer: request.NewItemsSizer[T](),
104+
bytesSizer: request.NewBytesSizer[T](),
105105
storageID: *set.StorageID,
106106
id: set.ID,
107107
signal: set.Signal,

0 commit comments

Comments
 (0)