Skip to content

Commit e8073bc

Browse files
committed
Return errors from queue message
1 parent 5670f84 commit e8073bc

File tree

2 files changed

+42
-7
lines changed

2 files changed

+42
-7
lines changed

pkg/chipingress/batch/client.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package batch
22

33
import (
44
"context"
5+
"errors"
56
"sync"
67
"time"
78

@@ -118,10 +119,11 @@ func (b *Client) Stop() {
118119
// The callback receives an error parameter (nil on success).
119120
// Callbacks are invoked from goroutines
120121
// Returns immediately with no blocking - drops message if channel is full.
121-
// Returns true if message was queued, false if it was dropped.
122-
func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(error)) bool {
122+
// Returns an error if the message was dropped.
123+
func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(error)) error {
124+
123125
if event == nil {
124-
return false
126+
return nil
125127
}
126128

127129
msg := &messageWithCallback{
@@ -130,10 +132,12 @@ func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(err
130132
}
131133

132134
select {
135+
case <-b.shutdownChan:
136+
return errors.New("client is shutdown")
133137
case b.messageBuffer <- msg:
134-
return true
138+
return nil
135139
default:
136-
return false
140+
return errors.New("message buffer is full")
137141
}
138142
}
139143

pkg/chipingress/batch/client_test.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,8 +338,8 @@ func TestStart(t *testing.T) {
338338

339339
queued1 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-1", Source: "test-source", Type: "test.event.type"}, nil)
340340
queued2 := client.QueueMessage(&chipingress.CloudEventPb{Id: "test-id-2", Source: "test-source", Type: "test.event.type"}, nil)
341-
require.True(t, queued1)
342-
require.True(t, queued2)
341+
require.NoError(t, queued1)
342+
require.NoError(t, queued2)
343343

344344
select {
345345
case <-done:
@@ -813,4 +813,35 @@ func TestStop(t *testing.T) {
813813
client.Stop()
814814
client.Stop()
815815
})
816+
817+
t.Run("QueueMessage returns error after Stop", func(t *testing.T) {
818+
mockClient := mocks.NewClient(t)
819+
client, err := NewBatchClient(mockClient, WithBatchSize(10))
820+
require.NoError(t, err)
821+
822+
ctx, cancel := context.WithCancel(t.Context())
823+
defer cancel()
824+
825+
client.Start(ctx)
826+
827+
// Queue message before stop - should succeed
828+
err = client.QueueMessage(&chipingress.CloudEventPb{
829+
Id: "test-id-1",
830+
Source: "test-source",
831+
Type: "test.event.type",
832+
}, nil)
833+
require.NoError(t, err)
834+
835+
// Stop the client
836+
client.Stop()
837+
838+
// Queue message after stop - should fail
839+
err = client.QueueMessage(&chipingress.CloudEventPb{
840+
Id: "test-id-2",
841+
Source: "test-source",
842+
Type: "test.event.type",
843+
}, nil)
844+
require.Error(t, err)
845+
assert.Contains(t, err.Error(), "shutdown")
846+
})
816847
}

0 commit comments

Comments
 (0)