1
1
package pubsub
2
2
3
3
import (
4
+ "errors"
4
5
"fmt"
5
6
"strings"
6
7
"time"
14
15
Config struct {
15
16
// Kafka contains the configuration for Kafka client
16
17
Kafka Kafka `mapstructure:"kafka"`
18
+ // SQS contains the configuration for AWS SQS client
19
+ SQS SQS `mapstructure:"sqs"`
17
20
}
18
21
19
22
// Publisher contains the publisher specific configuration
@@ -135,6 +138,27 @@ type (
135
138
// MetricsEnabled controls if metrics publishing is enabled or not
136
139
MetricsEnabled bool `mapstructure:"metrics_enabled"`
137
140
}
141
+
142
+ SQSSubscriber struct {
143
+ Enabled bool `mapstructure:"enabled"`
144
+ QueueURL string `mapstructure:"queue_url"`
145
+ MaxMessages int `mapstructure:"max_messages"`
146
+ Workers int `mapstructure:"workers"`
147
+
148
+ WaitTime time.Duration `mapstructure:"wait_time"`
149
+ }
150
+
151
+ SQSPublisher struct {
152
+ Enabled bool `mapstructure:"enabled"`
153
+ QueueURL string `mapstructure:"queue_url"`
154
+ }
155
+
156
+ SQS struct {
157
+ // SQS Publisher specific configuration
158
+ Publisher SQSPublisher
159
+ // SQS Subscriber specific configuration
160
+ Subscriber SQSSubscriber
161
+ }
138
162
)
139
163
140
164
const (
@@ -152,6 +176,8 @@ var (
152
176
saslMechanismPlainString : Plain ,
153
177
saslMechanismAWsMskIamString : AWSMskIam ,
154
178
}
179
+
180
+ ErrEmptySQSQueueURL = errors .New ("sqs queue url is empty" )
155
181
)
156
182
157
183
// NewConfig returns a new Config instance.
@@ -192,6 +218,12 @@ func (c *Config) validate() error {
192
218
strings .Join (allowedMechanisms , "," ),
193
219
)
194
220
}
221
+ if c .SQS .Publisher .Enabled && c .SQS .Publisher .QueueURL == "" {
222
+ return ErrEmptySQSQueueURL
223
+ }
224
+ if c .SQS .Subscriber .Enabled && c .SQS .Subscriber .QueueURL == "" {
225
+ return ErrEmptySQSQueueURL
226
+ }
195
227
196
228
return nil
197
229
}
0 commit comments