Skip to content

Commit 40590ba

Browse files
authored
Merge branch 'main' into dapr-state-store-clickhouse
2 parents cc80a66 + 1137759 commit 40590ba

File tree

4 files changed

+49
-6
lines changed

4 files changed

+49
-6
lines changed

pubsub/aws/snssqs/metadata.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ type snsSqsMetadata struct {
5757
AccountID string `mapstructure:"accountID"`
5858
// processing concurrency mode
5959
ConcurrencyMode pubsub.ConcurrencyMode `mapstructure:"concurrencyMode"`
60+
// limits the number of concurrent goroutines
61+
ConcurrencyLimit int `mapstructure:"concurrencyLimit"`
6062
}
6163

6264
func maskLeft(s string) string {
@@ -130,6 +132,10 @@ func (s *snsSqs) getSnsSqsMetadata(meta pubsub.Metadata) (*snsSqsMetadata, error
130132
return nil, err
131133
}
132134

135+
if md.ConcurrencyLimit < 0 {
136+
return nil, errors.New("concurrencyLimit must be greater than or equal to 0")
137+
}
138+
133139
s.logger.Debug(md.hideDebugPrintedCredentials())
134140

135141
return md, nil

pubsub/aws/snssqs/metadata.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,15 @@ metadata:
128128
default: '"parallel"'
129129
example: '"single", "parallel"'
130130
type: string
131+
- name: concurrencyLimit
132+
required: false
133+
description: |
134+
Defines the maximum number of concurrent workers handling messages.
135+
This value is ignored when "concurrencyMode" is set to “single“.
136+
To avoid limiting the number of concurrent workers set this to “0“.
137+
type: number
138+
default: '0'
139+
example: '100'
131140
- name: accountId
132141
required: false
133142
description: |

pubsub/aws/snssqs/snssqs.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,13 @@ func (s *snsSqs) consumeSubscription(ctx context.Context, queueInfo, deadLetters
595595
WaitTimeSeconds: aws.Int64(s.metadata.MessageWaitTimeSeconds),
596596
}
597597

598+
// sem is a semaphore used to control the concurrencyLimit.
599+
// It is set only when we are in parallel mode and limit is > 0.
600+
var sem chan (struct{}) = nil
601+
if (s.metadata.ConcurrencyMode == pubsub.Parallel) && s.metadata.ConcurrencyLimit > 0 {
602+
sem = make(chan struct{}, s.metadata.ConcurrencyLimit)
603+
}
604+
598605
for {
599606
// If the context is canceled, stop requesting messages
600607
if ctx.Err() != nil {
@@ -629,33 +636,37 @@ func (s *snsSqs) consumeSubscription(ctx context.Context, queueInfo, deadLetters
629636
}
630637
s.logger.Debugf("%v message(s) received on queue %s", len(messageResponse.Messages), queueInfo.arn)
631638

632-
var wg sync.WaitGroup
633639
for _, message := range messageResponse.Messages {
634640
if err := s.validateMessage(ctx, message, queueInfo, deadLettersQueueInfo); err != nil {
635641
s.logger.Errorf("message is not valid for further processing by the handler. error is: %v", err)
636642
continue
637643
}
638644

639645
f := func(message *sqs.Message) {
640-
defer wg.Done()
641646
if err := s.callHandler(ctx, message, queueInfo); err != nil {
642647
s.logger.Errorf("error while handling received message. error is: %v", err)
643648
}
644649
}
645650

646-
wg.Add(1)
647651
switch s.metadata.ConcurrencyMode {
648652
case pubsub.Single:
649653
f(message)
650654
case pubsub.Parallel:
651-
wg.Add(1)
655+
// This is the back pressure mechanism.
656+
// It will block until another goroutine frees a slot.
657+
if sem != nil {
658+
sem <- struct{}{}
659+
}
660+
652661
go func(message *sqs.Message) {
653-
defer wg.Done()
662+
if sem != nil {
663+
defer func() { <-sem }()
664+
}
665+
654666
f(message)
655667
}(message)
656668
}
657669
}
658-
wg.Wait()
659670
}
660671
}
661672

pubsub/aws/snssqs/snssqs_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func Test_getSnsSqsMetadata_AllConfiguration(t *testing.T) {
5151
"consumerID": "consumer",
5252
"Endpoint": "endpoint",
5353
"concurrencyMode": string(pubsub.Single),
54+
"concurrencyLimit": "42",
5455
"accessKey": "a",
5556
"secretKey": "s",
5657
"sessionToken": "t",
@@ -68,6 +69,7 @@ func Test_getSnsSqsMetadata_AllConfiguration(t *testing.T) {
6869
r.Equal("consumer", md.SqsQueueName)
6970
r.Equal("endpoint", md.Endpoint)
7071
r.Equal(pubsub.Single, md.ConcurrencyMode)
72+
r.Equal(42, md.ConcurrencyLimit)
7173
r.Equal("a", md.AccessKey)
7274
r.Equal("s", md.SecretKey)
7375
r.Equal("t", md.SessionToken)
@@ -105,6 +107,7 @@ func Test_getSnsSqsMetadata_defaults(t *testing.T) {
105107
r.Equal("", md.SessionToken)
106108
r.Equal("r", md.Region)
107109
r.Equal(pubsub.Parallel, md.ConcurrencyMode)
110+
r.Equal(0, md.ConcurrencyLimit)
108111
r.Equal(int64(10), md.MessageVisibilityTimeout)
109112
r.Equal(int64(10), md.MessageRetryLimit)
110113
r.Equal(int64(2), md.MessageWaitTimeSeconds)
@@ -273,6 +276,20 @@ func Test_getSnsSqsMetadata_invalidMetadataSetup(t *testing.T) {
273276
}}},
274277
name: "invalid message concurrencyMode",
275278
},
279+
// invalid concurrencyLimit
280+
{
281+
metadata: pubsub.Metadata{Base: metadata.Base{Properties: map[string]string{
282+
"consumerID": "consumer",
283+
"Endpoint": "endpoint",
284+
"AccessKey": "acctId",
285+
"SecretKey": "secret",
286+
"awsToken": "token",
287+
"Region": "region",
288+
"messageRetryLimit": "10",
289+
"concurrencyLimit": "-1",
290+
}}},
291+
name: "invalid message concurrencyLimit",
292+
},
276293
}
277294

278295
l := logger.NewLogger("SnsSqs unit test")

0 commit comments

Comments
 (0)