Skip to content

Commit a3a7acd

Browse files
authored
Merge pull request #25 from botchris/snssqs-write-only-options
[SNSSQS] Write-only broker option
2 parents 5ef676b + 858f651 commit a3a7acd

File tree

3 files changed

+96
-7
lines changed

3 files changed

+96
-7
lines changed

provider/snssqs/broker.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/aws/aws-sdk-go-v2/service/sns"
1414
"github.com/aws/aws-sdk-go-v2/service/sqs"
1515
"github.com/botchris/go-pubsub"
16+
"github.com/botchris/go-pubsub/middleware/codec"
1617
"github.com/hashicorp/go-multierror"
1718
"github.com/kubemq-io/kubemq-go/pkg/uuid"
1819
)
@@ -38,10 +39,15 @@ type broker struct {
3839
// implementation (`pubsub.topic`). This allows to keep the topic name
3940
// agnostic to their underlying implementation.
4041
//
42+
// You can instance a write-only broker by passing no SQS related configuration options.
43+
// This broker will only be able to publish messages to SNS topics, but will not be able to
44+
// subscribe handlers to topics nor receive messages from SQS queues. Of course, no goroutine
45+
// to poll the SQS queue will be started in this case.
46+
//
4147
// IMPORTANT: this broker must be used in conjunction with a Codec middleware in
4248
// order to ensure that the messages are properly encoded and decoded.
4349
// Otherwise, only binary messages will be accepted when publishing or
44-
// delivering messages.
50+
// delivering messages. Hence, `WithCodec` option is mandatory.
4551
func NewBroker(ctx context.Context, option ...Option) (pubsub.Broker, error) {
4652
opts := &options{
4753
deliverTimeout: 5 * time.Second,
@@ -59,12 +65,12 @@ func NewBroker(ctx context.Context, option ...Option) (pubsub.Broker, error) {
5965
return nil, errors.New("no SNS client was provided")
6066
}
6167

62-
if opts.sqsClient == nil {
63-
return nil, errors.New("no SQS client was provided")
68+
if err := opts.validateSQSSettings(); err != nil {
69+
return nil, err
6470
}
6571

66-
if opts.sqsQueueURL == "" {
67-
return nil, errors.New("no SQS queue URL was provided")
72+
if opts.codec == nil {
73+
return nil, errors.New("no codec was provided")
6874
}
6975

7076
ctx, cancel := context.WithCancel(ctx)
@@ -84,7 +90,7 @@ func NewBroker(ctx context.Context, option ...Option) (pubsub.Broker, error) {
8490
go b.run()
8591
}()
8692

87-
return b, nil
93+
return codec.NewCodecMiddleware(b, opts.codec), nil
8894
}
8995

9096
func (b *broker) Publish(ctx context.Context, topic pubsub.Topic, m interface{}) error {
@@ -111,6 +117,10 @@ func (b *broker) Publish(ctx context.Context, topic pubsub.Topic, m interface{})
111117
}
112118

113119
func (b *broker) Subscribe(ctx context.Context, topic pubsub.Topic, handler pubsub.Handler, option ...pubsub.SubscribeOption) (pubsub.Subscription, error) {
120+
if b.options.isWriteOnly() {
121+
return nil, errors.New("no SQS queue ARN provided, broker is configured as write-only")
122+
}
123+
114124
topicARN, err := b.topics.arnOf(topic)
115125
if err != nil {
116126
return nil, err
@@ -196,6 +206,10 @@ func (b *broker) Shutdown(ctx context.Context) error {
196206
}
197207

198208
func (b *broker) run() {
209+
if b.options.isWriteOnly() {
210+
return
211+
}
212+
199213
done := b.runnerCtx.Done()
200214
topicTicker := time.NewTicker(b.options.topicsReloadInterval)
201215
defer topicTicker.Stop()

provider/snssqs/broker_test.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,41 @@ func TestMultiHostBroker(t *testing.T) {
226226
})
227227
}
228228

229+
func TestWriteOnlyBroker(t *testing.T) {
230+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
231+
defer cancel()
232+
233+
t.Run("GIVEN a sns topic AND no sqs configurations", func(t *testing.T) {
234+
cfg := awsConfig(t)
235+
snsCli := awssns.NewFromConfig(cfg)
236+
237+
topic := pubsub.Topic("test-topic")
238+
topicARN, err := prepareTopic(ctx, snsCli, "test-topic")
239+
require.NoError(t, err)
240+
require.NotEmpty(t, topicARN)
241+
242+
broker, err := snssqs.NewBroker(ctx,
243+
snssqs.WithSNSClient(snsCli),
244+
snssqs.WithCodec(codec.JSON),
245+
)
246+
247+
t.Run("WHEN we instance the broker with just sns configuration", func(t *testing.T) {
248+
t.Run("THEN there is no error", func(t *testing.T) {
249+
require.NoError(t, err)
250+
})
251+
252+
t.Run("AND we can publish messages to the topic", func(t *testing.T) {
253+
require.NoError(t, broker.Publish(ctx, topic, "hello world"))
254+
})
255+
256+
t.Run("BUT we cannot subscribe to the topic", func(t *testing.T) {
257+
_, err := broker.Subscribe(ctx, topic, pubsub.NewHandler((&consumer{}).handle))
258+
require.Error(t, err)
259+
})
260+
})
261+
})
262+
}
263+
229264
type consumer struct {
230265
rcv queue
231266
mu sync.RWMutex
@@ -303,13 +338,14 @@ func prepareBroker(
303338
snssqs.WithSQSQueueARN(attr.Attributes[string(sqstypes.QueueAttributeNameQueueArn)]),
304339
snssqs.WithSQSQueueURL(aws.ToString(qRes.QueueUrl)),
305340
snssqs.WithWaitTimeSeconds(1),
341+
snssqs.WithCodec(codec.JSON),
306342
)
307343

308344
if err != nil {
309345
return nil, err
310346
}
311347

312-
return codec.NewCodecMiddleware(broker, codec.JSON), nil
348+
return broker, nil
313349
}
314350

315351
func prepareTopic(ctx context.Context, cli *awssns.Client, topic pubsub.Topic) (string, error) {

provider/snssqs/options.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package snssqs
22

33
import (
4+
"errors"
45
"time"
6+
7+
"github.com/botchris/go-pubsub/middleware/codec"
58
)
69

710
// Option defines a function signature for configuration options.
@@ -19,6 +22,7 @@ type options struct {
1922
maxMessages int32
2023
visibilityTimeout int32
2124
waitTimeSeconds int32
25+
codec codec.Codec
2226
}
2327

2428
type fnOption struct {
@@ -120,3 +124,38 @@ func WithWaitTimeSeconds(t int32) Option {
120124
},
121125
}
122126
}
127+
128+
// WithCodec sets the codec to be used by the broker.
129+
func WithCodec(codec codec.Codec) Option {
130+
return fnOption{
131+
f: func(o *options) {
132+
o.codec = codec
133+
},
134+
}
135+
}
136+
137+
func (o options) validateSQSSettings() error {
138+
if o.isWriteOnly() {
139+
return nil
140+
}
141+
142+
if o.sqsQueueURL == "" {
143+
return errors.New("no SQS queue URL was provided")
144+
}
145+
146+
if o.sqsQueueARN == "" {
147+
return errors.New("no SQS queue ARN was provided")
148+
}
149+
150+
if o.sqsClient == nil {
151+
return errors.New("no SQS client was provided")
152+
}
153+
154+
return nil
155+
}
156+
157+
func (o options) isWriteOnly() bool {
158+
return o.sqsQueueURL == "" &&
159+
o.sqsQueueARN == "" &&
160+
o.sqsClient == nil
161+
}

0 commit comments

Comments
 (0)