Skip to content

Commit e873023

Browse files
authored
[chore] rename internal Queue func to Offer; return error (open-telemetry#8884)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent d0f9a78 commit e873023

File tree

11 files changed

+63
-53
lines changed

11 files changed

+63
-53
lines changed

exporter/exporterhelper/internal/bounded_memory_queue.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,16 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu
5252
}
5353

5454
// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
55-
func (q *boundedMemoryQueue) Produce(ctx context.Context, req any) bool {
55+
func (q *boundedMemoryQueue) Offer(ctx context.Context, req any) error {
5656
if q.stopped.Load() {
57-
return false
57+
return ErrQueueIsStopped
5858
}
5959

6060
select {
6161
case q.items <- newQueueRequest(ctx, req):
62-
return true
62+
return nil
6363
default:
64-
return false
64+
return ErrQueueIsFull
6565
}
6666
}
6767

exporter/exporterhelper/internal/bounded_memory_queue_test.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestBoundedQueue(t *testing.T) {
5656
startLock.Unlock()
5757
})))
5858

59-
assert.True(t, q.Produce(context.Background(), newStringRequest("a")))
59+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("a")))
6060

6161
// at this point "a" may or may not have been received by the consumer go-routine
6262
// so let's make sure it has been
@@ -69,10 +69,10 @@ func TestBoundedQueue(t *testing.T) {
6969
})
7070

7171
// produce two more items. The first one should be accepted, but not consumed.
72-
assert.True(t, q.Produce(context.Background(), newStringRequest("b")))
72+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("b")))
7373
assert.Equal(t, 1, q.Size())
7474
// the second should be rejected since the queue is full
75-
assert.False(t, q.Produce(context.Background(), newStringRequest("c")))
75+
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("c")), ErrQueueIsFull)
7676
assert.Equal(t, 1, q.Size())
7777

7878
startLock.Unlock() // unblock consumer
@@ -88,13 +88,13 @@ func TestBoundedQueue(t *testing.T) {
8888
"b": true,
8989
}
9090
for _, item := range []string{"d", "e", "f"} {
91-
assert.True(t, q.Produce(context.Background(), newStringRequest(item)))
91+
assert.NoError(t, q.Offer(context.Background(), newStringRequest(item)))
9292
expected[item] = true
9393
consumerState.assertConsumed(expected)
9494
}
9595

9696
assert.NoError(t, q.Shutdown(context.Background()))
97-
assert.False(t, q.Produce(context.Background(), newStringRequest("x")), "cannot push to closed queue")
97+
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("x")), ErrQueueIsStopped)
9898
}
9999

100100
// In this test we run a queue with many items and a slow consumer.
@@ -113,20 +113,20 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
113113
time.Sleep(1 * time.Second)
114114
})))
115115

116-
q.Produce(context.Background(), newStringRequest("a"))
117-
q.Produce(context.Background(), newStringRequest("b"))
118-
q.Produce(context.Background(), newStringRequest("c"))
119-
q.Produce(context.Background(), newStringRequest("d"))
120-
q.Produce(context.Background(), newStringRequest("e"))
121-
q.Produce(context.Background(), newStringRequest("f"))
122-
q.Produce(context.Background(), newStringRequest("g"))
123-
q.Produce(context.Background(), newStringRequest("h"))
124-
q.Produce(context.Background(), newStringRequest("i"))
125-
q.Produce(context.Background(), newStringRequest("j"))
116+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("a")))
117+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("b")))
118+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("c")))
119+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("d")))
120+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("e")))
121+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("f")))
122+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("g")))
123+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("h")))
124+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("i")))
125+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("j")))
126126

127127
assert.NoError(t, q.Shutdown(context.Background()))
128128

129-
assert.False(t, q.Produce(context.Background(), newStringRequest("x")), "cannot push to closed queue")
129+
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("x")), ErrQueueIsStopped)
130130
consumerState.assertConsumed(map[string]bool{
131131
"a": true,
132132
"b": true,
@@ -193,7 +193,7 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int)
193193
}))
194194
require.NoError(b, err)
195195
for j := 0; j < numberOfItems; j++ {
196-
q.Produce(context.Background(), newStringRequest(fmt.Sprintf("%d", j)))
196+
_ = q.Offer(context.Background(), newStringRequest(fmt.Sprintf("%d", j)))
197197
}
198198
assert.NoError(b, q.Shutdown(context.Background()))
199199
}
@@ -250,7 +250,7 @@ func TestZeroSizeWithConsumers(t *testing.T) {
250250
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {}))
251251
assert.NoError(t, err)
252252

253-
assert.True(t, q.Produce(context.Background(), newStringRequest("a"))) // in process
253+
assert.NoError(t, q.Offer(context.Background(), newStringRequest("a"))) // in process
254254

255255
assert.NoError(t, q.Shutdown(context.Background()))
256256
}
@@ -261,7 +261,7 @@ func TestZeroSizeNoConsumers(t *testing.T) {
261261
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {}))
262262
assert.NoError(t, err)
263263

264-
assert.False(t, q.Produce(context.Background(), newStringRequest("a"))) // in process
264+
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("a")), ErrQueueIsFull) // in process
265265

266266
assert.NoError(t, q.Shutdown(context.Background()))
267267
}

exporter/exporterhelper/internal/persistent_queue.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q
7373
return nil
7474
}
7575

76-
// Produce adds an item to the queue and returns true if it was accepted
77-
// Request context is currently ignored by the persistent queue.
78-
func (pq *persistentQueue) Produce(_ context.Context, item any) bool {
79-
err := pq.storage.put(item)
80-
return err == nil
76+
// Offer inserts the specified element into this queue if it is possible to do so immediately
77+
// without violating capacity restrictions. If success returns no error.
78+
// It returns ErrQueueIsFull if no space is currently available.
79+
func (pq *persistentQueue) Offer(_ context.Context, item any) error {
80+
return pq.storage.put(item)
8181
}
8282

8383
// Shutdown stops accepting items, shuts down the queue and closes the persistent queue

exporter/exporterhelper/internal/persistent_queue_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ func TestPersistentQueue_Capacity(t *testing.T) {
6262
req := newFakeTracesRequest(newTraces(1, 10))
6363

6464
for i := 0; i < 10; i++ {
65-
result := pq.Produce(context.Background(), req)
65+
result := pq.Offer(context.Background(), req)
6666
if i < 5 {
67-
assert.True(t, result)
67+
assert.NoError(t, result)
6868
} else {
69-
assert.False(t, result)
69+
assert.ErrorIs(t, result, ErrQueueIsFull)
7070
}
7171
}
7272
assert.Equal(t, 5, pq.Size())
@@ -78,7 +78,7 @@ func TestPersistentQueueShutdown(t *testing.T) {
7878
req := newFakeTracesRequest(newTraces(1, 10))
7979

8080
for i := 0; i < 1000; i++ {
81-
pq.Produce(context.Background(), req)
81+
assert.NoError(t, pq.Offer(context.Background(), req))
8282
}
8383
assert.NoError(t, pq.Shutdown(context.Background()))
8484
}
@@ -101,7 +101,7 @@ func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) {
101101
}
102102

103103
for i := 0; i < 1000; i++ {
104-
pq.Produce(context.Background(), req)
104+
assert.NoError(t, pq.Offer(context.Background(), req))
105105
}
106106
assert.NoError(t, pq.Shutdown(context.Background()))
107107
assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time")
@@ -147,7 +147,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
147147
})
148148

149149
for i := 0; i < c.numMessagesProduced; i++ {
150-
pq.Produce(context.Background(), req)
150+
assert.NoError(t, pq.Offer(context.Background(), req))
151151
}
152152

153153
assert.Eventually(t, func() bool {

exporter/exporterhelper/internal/persistent_storage.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@ const (
7171
)
7272

7373
var (
74-
errMaxCapacityReached = errors.New("max capacity reached")
75-
errValueNotSet = errors.New("value not set")
74+
errValueNotSet = errors.New("value not set")
7675
)
7776

7877
// newPersistentContiguousStorage creates a new file-storage extension backed queue;
@@ -187,7 +186,7 @@ func (pcs *persistentContiguousStorage) put(req any) error {
187186

188187
if pcs.size() >= pcs.capacity {
189188
pcs.logger.Warn("Maximum queue capacity reached")
190-
return errMaxCapacityReached
189+
return ErrQueueIsFull
191190
}
192191

193192
itemKey := getItemKey(pcs.writeIndex)

exporter/exporterhelper/internal/queue.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,18 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
77

88
import (
99
"context"
10+
"errors"
1011

1112
"go.opentelemetry.io/collector/component"
1213
)
1314

15+
var (
16+
// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full.
17+
ErrQueueIsFull = errors.New("sending queue is full")
18+
// ErrQueueIsStopped is the error returned when an item is offered to the Queue and the queue is stopped.
19+
ErrQueueIsStopped = errors.New("sending queue is stopped")
20+
)
21+
1422
type QueueSettings struct {
1523
DataType component.DataType
1624
Callback func(QueueRequest)
@@ -22,9 +30,10 @@ type Queue interface {
2230
// Start starts the queue with a given number of goroutines consuming items from the queue
2331
// and passing them into the consumer callback.
2432
Start(ctx context.Context, host component.Host, set QueueSettings) error
25-
// Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added
26-
// to the queue due to queue overflow.
27-
Produce(ctx context.Context, item any) bool
33+
// Offer inserts the specified element into this queue if it is possible to do so immediately
34+
// without violating capacity restrictions. If success returns no error.
35+
// It returns ErrQueueIsFull if no space is currently available.
36+
Offer(ctx context.Context, item any) error
2837
// Size returns the current Size of the queue
2938
Size() int
3039
// Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped.

exporter/exporterhelper/logs.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.opentelemetry.io/collector/consumer"
1414
"go.opentelemetry.io/collector/consumer/consumererror"
1515
"go.opentelemetry.io/collector/exporter"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
1617
"go.opentelemetry.io/collector/pdata/plog"
1718
)
1819

@@ -95,7 +96,7 @@ func NewLogsExporter(
9596
lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
9697
req := newLogsRequest(ld, pusher)
9798
serr := be.send(ctx, req)
98-
if errors.Is(serr, errSendingQueueIsFull) {
99+
if errors.Is(serr, internal.ErrQueueIsFull) {
99100
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
100101
}
101102
return serr
@@ -146,7 +147,7 @@ func NewLogsRequestExporter(
146147
return consumererror.NewPermanent(cErr)
147148
}
148149
sErr := be.send(ctx, req)
149-
if errors.Is(sErr, errSendingQueueIsFull) {
150+
if errors.Is(sErr, internal.ErrQueueIsFull) {
150151
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
151152
}
152153
return sErr

exporter/exporterhelper/metrics.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.opentelemetry.io/collector/consumer"
1414
"go.opentelemetry.io/collector/consumer/consumererror"
1515
"go.opentelemetry.io/collector/exporter"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
1617
"go.opentelemetry.io/collector/pdata/pmetric"
1718
)
1819

@@ -95,7 +96,7 @@ func NewMetricsExporter(
9596
mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
9697
req := newMetricsRequest(md, pusher)
9798
serr := be.send(ctx, req)
98-
if errors.Is(serr, errSendingQueueIsFull) {
99+
if errors.Is(serr, internal.ErrQueueIsFull) {
99100
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
100101
}
101102
return serr
@@ -146,7 +147,7 @@ func NewMetricsRequestExporter(
146147
return consumererror.NewPermanent(cErr)
147148
}
148149
sErr := be.send(ctx, req)
149-
if errors.Is(sErr, errSendingQueueIsFull) {
150+
if errors.Is(sErr, internal.ErrQueueIsFull) {
150151
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
151152
}
152153
return sErr

exporter/exporterhelper/queue_sender.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ import (
2626
const defaultQueueSize = 1000
2727

2828
var (
29-
errSendingQueueIsFull = errors.New("sending_queue is full")
30-
scopeName = "go.opentelemetry.io/collector/exporterhelper"
29+
scopeName = "go.opentelemetry.io/collector/exporterhelper"
3130
)
3231

3332
// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
@@ -119,7 +118,7 @@ func (qs *queueSender) onTemporaryFailure(ctx context.Context, req Request, err
119118
return err
120119
}
121120

122-
if qs.queue.Produce(ctx, req) {
121+
if qs.queue.Offer(ctx, req) == nil {
123122
logger.Error(
124123
"Exporting failed. Putting back to the end of the queue.",
125124
zap.Error(err),
@@ -220,13 +219,13 @@ func (qs *queueSender) send(ctx context.Context, req Request) error {
220219
c := noCancellationContext{Context: ctx}
221220

222221
span := trace.SpanFromContext(c)
223-
if !qs.queue.Produce(c, req) {
222+
if err := qs.queue.Offer(c, req); err != nil {
224223
qs.logger.Error(
225224
"Dropping data because sending_queue is full. Try increasing queue_size.",
226225
zap.Int("dropped_items", req.ItemsCount()),
227226
)
228227
span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute))
229-
return errSendingQueueIsFull
228+
return err
230229
}
231230

232231
span.AddEvent("Enqueued item.", trace.WithAttributes(qs.traceAttribute))

exporter/exporterhelper/retry_sender_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ type producerConsumerQueueWithCounter struct {
411411
produceCounter *atomic.Uint32
412412
}
413413

414-
func (pcq *producerConsumerQueueWithCounter) Produce(ctx context.Context, item any) bool {
414+
func (pcq *producerConsumerQueueWithCounter) Offer(ctx context.Context, item any) error {
415415
pcq.produceCounter.Add(1)
416-
return pcq.Queue.Produce(ctx, item)
416+
return pcq.Queue.Offer(ctx, item)
417417
}

0 commit comments

Comments
 (0)