Skip to content

Commit 576e61b

Browse files
guyinyouguyinyou
andauthored
golang: Added rate limiting for SimpleConsumer's Receive requests by clientID dimension (#1141)
* golang: Added rate limiting for SimpleConsumer's Receive requests by clientID dimension Change-Id: I0968728e0fa1532c264b786c5f7e5f4234dcc017 * default maxReceiveConcurrency set 20 Change-Id: I27a15654b51cede7453ea99d058a613cd4729600 --------- Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent fd58dab commit 576e61b

File tree

4 files changed

+379
-1
lines changed

4 files changed

+379
-1
lines changed

golang/receive_rate_limiter.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package golang
19+
20+
import (
21+
"context"
22+
"sync"
23+
)
24+
25+
type receiveRateLimiter struct {
26+
mu sync.Mutex
27+
cond *sync.Cond
28+
maxConcurrency int
29+
currentCount int
30+
}
31+
32+
func newReceiveRateLimiter(maxConcurrency int) *receiveRateLimiter {
33+
if maxConcurrency <= 0 {
34+
maxConcurrency = 10 // default 10 concurrent requests
35+
}
36+
rl := &receiveRateLimiter{
37+
maxConcurrency: maxConcurrency,
38+
}
39+
rl.cond = sync.NewCond(&rl.mu)
40+
return rl
41+
}
42+
43+
func (rl *receiveRateLimiter) acquire(ctx context.Context) error {
44+
rl.mu.Lock()
45+
defer rl.mu.Unlock()
46+
47+
for rl.currentCount >= rl.maxConcurrency {
48+
if ctx.Err() != nil {
49+
return ctx.Err()
50+
}
51+
52+
waitDone := make(chan struct{})
53+
go func() {
54+
select {
55+
case <-ctx.Done():
56+
rl.cond.Broadcast()
57+
case <-waitDone:
58+
}
59+
}()
60+
61+
rl.cond.Wait()
62+
close(waitDone)
63+
64+
if ctx.Err() != nil {
65+
return ctx.Err()
66+
}
67+
}
68+
69+
rl.currentCount++
70+
return nil
71+
}
72+
73+
func (rl *receiveRateLimiter) release() {
74+
rl.mu.Lock()
75+
defer rl.mu.Unlock()
76+
77+
if rl.currentCount > 0 {
78+
rl.currentCount--
79+
rl.cond.Signal()
80+
}
81+
}
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package golang
19+
20+
import (
21+
"context"
22+
"sync"
23+
"sync/atomic"
24+
"testing"
25+
"time"
26+
27+
"github.com/stretchr/testify/assert"
28+
)
29+
30+
func TestReceiveRateLimiter_BasicLimit(t *testing.T) {
31+
maxConcurrency := 3
32+
limiter := newReceiveRateLimiter(maxConcurrency)
33+
34+
// Test basic limit: should be able to acquire maxConcurrency permits
35+
ctx := context.Background()
36+
for i := 0; i < maxConcurrency; i++ {
37+
err := limiter.acquire(ctx)
38+
assert.NoError(t, err, "should be able to acquire permit")
39+
}
40+
41+
// Try to acquire the (maxConcurrency+1)th permit, should be blocked
42+
acquired := int32(0)
43+
go func() {
44+
err := limiter.acquire(ctx)
45+
if err == nil {
46+
atomic.StoreInt32(&acquired, 1)
47+
limiter.release()
48+
}
49+
}()
50+
51+
// Wait a short time to confirm it was not immediately acquired
52+
time.Sleep(100 * time.Millisecond)
53+
assert.Equal(t, int32(0), atomic.LoadInt32(&acquired), "should not acquire permit immediately")
54+
55+
// Release one permit, should wake up the waiting goroutine
56+
limiter.release()
57+
time.Sleep(100 * time.Millisecond)
58+
assert.Equal(t, int32(1), atomic.LoadInt32(&acquired), "should acquire permit after release")
59+
}
60+
61+
func TestReceiveRateLimiter_ConcurrentLimit(t *testing.T) {
62+
maxConcurrency := 5
63+
limiter := newReceiveRateLimiter(maxConcurrency)
64+
ctx := context.Background()
65+
66+
// Start many goroutines to request permits concurrently
67+
totalGoroutines := 20
68+
acquiredCount := int32(0)
69+
activeCount := int32(0)
70+
maxActive := int32(0)
71+
var wg sync.WaitGroup
72+
var mu sync.Mutex
73+
74+
for i := 0; i < totalGoroutines; i++ {
75+
wg.Add(1)
76+
go func() {
77+
defer wg.Done()
78+
err := limiter.acquire(ctx)
79+
if err != nil {
80+
return
81+
}
82+
83+
// Record current active count
84+
current := atomic.AddInt32(&activeCount, 1)
85+
mu.Lock()
86+
if current > maxActive {
87+
maxActive = current
88+
}
89+
mu.Unlock()
90+
91+
// Simulate some work
92+
time.Sleep(50 * time.Millisecond)
93+
94+
atomic.AddInt32(&acquiredCount, 1)
95+
atomic.AddInt32(&activeCount, -1)
96+
limiter.release()
97+
}()
98+
}
99+
100+
wg.Wait()
101+
102+
// Verify: all goroutines should successfully acquire permits
103+
assert.Equal(t, int32(totalGoroutines), acquiredCount, "all goroutines should successfully acquire permits")
104+
105+
// Verify: concurrent active count should not exceed maxConcurrency
106+
assert.LessOrEqual(t, maxActive, int32(maxConcurrency), "concurrent active count should not exceed limit")
107+
}
108+
109+
func TestReceiveRateLimiter_ContextCancel(t *testing.T) {
110+
maxConcurrency := 2
111+
limiter := newReceiveRateLimiter(maxConcurrency)
112+
113+
// Acquire all permits first
114+
ctx := context.Background()
115+
for i := 0; i < maxConcurrency; i++ {
116+
err := limiter.acquire(ctx)
117+
assert.NoError(t, err)
118+
}
119+
120+
// Create a context that will be cancelled
121+
cancelCtx, cancel := context.WithCancel(context.Background())
122+
123+
// Try to acquire permit, should be blocked
124+
acquired := int32(0)
125+
errChan := make(chan error, 1)
126+
go func() {
127+
err := limiter.acquire(cancelCtx)
128+
errChan <- err
129+
if err == nil {
130+
atomic.StoreInt32(&acquired, 1)
131+
limiter.release()
132+
}
133+
}()
134+
135+
// Wait a short time to confirm it was blocked
136+
time.Sleep(100 * time.Millisecond)
137+
assert.Equal(t, int32(0), atomic.LoadInt32(&acquired), "should be blocked")
138+
139+
// Cancel context
140+
cancel()
141+
142+
// Wait for goroutine to return
143+
select {
144+
case err := <-errChan:
145+
assert.Error(t, err, "should return context cancellation error")
146+
assert.Equal(t, context.Canceled, err, "error should be context.Canceled")
147+
case <-time.After(1 * time.Second):
148+
t.Fatal("should return immediately after context cancellation")
149+
}
150+
151+
// Release previous permits
152+
limiter.release()
153+
limiter.release()
154+
}
155+
156+
func TestReceiveRateLimiter_ReleaseWithoutAcquire(t *testing.T) {
157+
limiter := newReceiveRateLimiter(5)
158+
ctx := context.Background()
159+
160+
// Release without acquiring, should not panic
161+
assert.NotPanics(t, func() {
162+
limiter.release()
163+
})
164+
165+
// Should be able to acquire permit normally after release
166+
err := limiter.acquire(ctx)
167+
assert.NoError(t, err, "should be able to acquire permit normally")
168+
limiter.release()
169+
}
170+
171+
func TestReceiveRateLimiter_MultipleRelease(t *testing.T) {
172+
maxConcurrency := 3
173+
limiter := newReceiveRateLimiter(maxConcurrency)
174+
ctx := context.Background()
175+
176+
// Acquire one permit
177+
err := limiter.acquire(ctx)
178+
assert.NoError(t, err)
179+
180+
// Release multiple times, should not panic
181+
limiter.release()
182+
assert.NotPanics(t, func() {
183+
limiter.release()
184+
limiter.release()
185+
})
186+
187+
// Should be able to acquire maxConcurrency permits (because count was reset)
188+
for i := 0; i < maxConcurrency; i++ {
189+
err := limiter.acquire(ctx)
190+
assert.NoError(t, err, "should be able to acquire permit")
191+
}
192+
}
193+
194+
func TestReceiveRateLimiter_ZeroMaxConcurrency(t *testing.T) {
195+
// Test that when maxConcurrency is 0 or negative, default value 10 should be used
196+
limiter := newReceiveRateLimiter(0)
197+
ctx := context.Background()
198+
199+
// Should be able to acquire 10 permits
200+
for i := 0; i < 10; i++ {
201+
err := limiter.acquire(ctx)
202+
assert.NoError(t, err, "should be able to acquire permit")
203+
}
204+
205+
// The 11th should be blocked
206+
acquired := int32(0)
207+
go func() {
208+
err := limiter.acquire(ctx)
209+
if err == nil {
210+
atomic.StoreInt32(&acquired, 1)
211+
limiter.release()
212+
}
213+
}()
214+
215+
time.Sleep(100 * time.Millisecond)
216+
assert.Equal(t, int32(0), atomic.LoadInt32(&acquired), "should be blocked")
217+
218+
// Cleanup
219+
for i := 0; i < 10; i++ {
220+
limiter.release()
221+
}
222+
}
223+
224+
func TestReceiveRateLimiter_StressTest(t *testing.T) {
225+
maxConcurrency := 10
226+
limiter := newReceiveRateLimiter(maxConcurrency)
227+
ctx := context.Background()
228+
229+
// Stress test: large number of concurrent requests
230+
totalRequests := 1000
231+
successCount := int32(0)
232+
var wg sync.WaitGroup
233+
startTime := time.Now()
234+
235+
for i := 0; i < totalRequests; i++ {
236+
wg.Add(1)
237+
go func() {
238+
defer wg.Done()
239+
err := limiter.acquire(ctx)
240+
if err != nil {
241+
return
242+
}
243+
atomic.AddInt32(&successCount, 1)
244+
time.Sleep(1 * time.Millisecond) // Simulate work
245+
limiter.release()
246+
}()
247+
}
248+
249+
wg.Wait()
250+
duration := time.Since(startTime)
251+
252+
// Verify all requests succeeded
253+
assert.Equal(t, int32(totalRequests), successCount, "all requests should succeed")
254+
255+
// Verify rate limiting works: without rate limiting, 1000 requests should complete quickly
256+
// With rate limiting, it should take longer
257+
// This is just a simple verification, actual time depends on scheduling
258+
t.Logf("Completed %d requests in %v", totalRequests, duration)
259+
assert.Greater(t, duration, 50*time.Millisecond, "with rate limiting, it should take some time")
260+
}
261+
262+
func TestReceiveRateLimiter_SequentialAcquireRelease(t *testing.T) {
263+
limiter := newReceiveRateLimiter(3)
264+
ctx := context.Background()
265+
266+
// Sequential acquire and release
267+
for i := 0; i < 10; i++ {
268+
err := limiter.acquire(ctx)
269+
assert.NoError(t, err, "should be able to acquire permit")
270+
limiter.release()
271+
}
272+
273+
// Verify can acquire again
274+
err := limiter.acquire(ctx)
275+
assert.NoError(t, err, "should be able to acquire permit again")
276+
limiter.release()
277+
}
278+

golang/simple_consumer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type defaultSimpleConsumer struct {
5959
subscriptionExpressionsLock sync.RWMutex
6060
subscriptionExpressions *map[string]*FilterExpression
6161
subTopicRouteDataResultCache sync.Map
62+
receiveRateLimiter *receiveRateLimiter
6263
}
6364

6465
func (sc *defaultSimpleConsumer) SetRequestTimeout(timeout time.Duration) {
@@ -318,6 +319,13 @@ func (sc *defaultSimpleConsumer) Receive(ctx context.Context, maxMessageNum int3
318319
if err != nil {
319320
return nil, err
320321
}
322+
323+
// Apply rate limiting
324+
if err = sc.receiveRateLimiter.acquire(ctx); err != nil {
325+
return nil, fmt.Errorf("failed to acquire rate limit permit: %w", err)
326+
}
327+
defer sc.receiveRateLimiter.release()
328+
321329
request := sc.wrapReceiveMessageRequest(int(maxMessageNum), selectMessageQueue, filterExpression, invisibleDuration)
322330
timeout := sc.scOpts.awaitDuration + sc.cli.opts.timeout
323331
return sc.receiveMessage(ctx, request, selectMessageQueue, timeout)
@@ -365,6 +373,7 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp
365373

366374
awaitDuration: scOpts.awaitDuration,
367375
subscriptionExpressions: &scOpts.subscriptionExpressions,
376+
receiveRateLimiter: newReceiveRateLimiter(scOpts.maxReceiveConcurrency),
368377
}
369378

370379
sc.cli.initTopics = make([]string, 0)

0 commit comments

Comments
 (0)