Skip to content

Commit fc44670

Browse files
authored
feat: Add Pool On-Empty Behavior Configuration for Redis Connections (#1018)
* feat: Add Pool On-Empty Behavior Configuration for Redis Connections Signed-off-by: notdu <huudutg@gmail.com> * update Signed-off-by: notdu <huudutg@gmail.com> * update Signed-off-by: notdu <huudutg@gmail.com> * update Signed-off-by: notdu <huudutg@gmail.com> --------- Signed-off-by: notdu <huudutg@gmail.com>
1 parent e4ac90e commit fc44670

File tree

8 files changed

+337
-13
lines changed

8 files changed

+337
-13
lines changed

README.md

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,11 @@
6161
- [Local Cache](#local-cache)
6262
- [Redis](#redis)
6363
- [Redis type](#redis-type)
64-
- [Connection Timeout](#connection-timeout)
65-
- [Pipelining](#pipelining)
64+
- [Connection Pool Settings](#connection-pool-settings)
65+
- [Pool Size](#pool-size)
66+
- [Connection Timeout](#connection-timeout)
67+
- [Pool On-Empty Behavior](#pool-on-empty-behavior)
68+
- [Pipelining](#pipelining)
6669
- [One Redis Instance](#one-redis-instance)
6770
- [Two Redis Instances](#two-redis-instances)
6871
- [Health Checking for Redis Active Connection](#health-checking-for-redis-active-connection)
@@ -1274,14 +1277,33 @@ The deployment type can be specified with the `REDIS_TYPE` / `REDIS_PERSECOND_TY
12741277
1. "sentinel": A comma separated list with the first string as the master name of the sentinel cluster followed by hostname:port pairs. The list size should be >= 2. The first item is the name of the master and the rest are the sentinels.
12751278
1. "cluster": A comma separated list of hostname:port pairs with all the nodes in the cluster.
12761279

1277-
## Connection Timeout
1280+
## Connection Pool Settings
12781281

1279-
Connection timeout controls the maximum duration for Redis connection establishment, read operations, and write operations.
1282+
### Pool Size
1283+
1284+
1. `REDIS_POOL_SIZE`: the number of connections to keep in the pool. Default: `10`
1285+
1. `REDIS_PERSECOND_POOL_SIZE`: pool size for per-second Redis. Default: `10`
1286+
1287+
### Connection Timeout
1288+
1289+
Controls the maximum duration for Redis connection establishment, read operations, and write operations.
12801290

12811291
1. `REDIS_TIMEOUT`: sets the timeout for Redis connection and I/O operations. Default: `10s`
12821292
1. `REDIS_PERSECOND_TIMEOUT`: sets the timeout for per-second Redis connection and I/O operations. Default: `10s`
12831293

1284-
## Pipelining
1294+
### Pool On-Empty Behavior
1295+
1296+
Controls what happens when all connections in the pool are in use and a new request arrives.
1297+
1298+
1. `REDIS_POOL_ON_EMPTY_BEHAVIOR`: controls what happens when the pool is empty. Default: `CREATE`
1299+
- `CREATE`: create a new overflow connection after waiting for `REDIS_POOL_ON_EMPTY_WAIT_DURATION`. This is the [default radix behavior](https://github.com/mediocregopher/radix/blob/v3.8.1/pool.go#L291-L312).
1300+
- `ERROR`: return an error after waiting for `REDIS_POOL_ON_EMPTY_WAIT_DURATION`. This enforces a strict pool size limit.
1301+
- `WAIT`: block until a connection becomes available. This enforces a strict pool size limit but may cause goroutine buildup.
1302+
1. `REDIS_POOL_ON_EMPTY_WAIT_DURATION`: the duration to wait before taking the configured action (`CREATE` or `ERROR`). Default: `1s`
1303+
1. `REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR`: same as above for per-second Redis pool. Default: `CREATE`
1304+
1. `REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION`: same as above for per-second Redis pool. Default: `1s`
1305+
1306+
### Pipelining
12851307

12861308
By default, for each request, ratelimit will pick up a connection from pool, write multiple redis commands in a single write then reads their responses in a single read. This reduces network delay.
12871309

src/redis/cache_impl.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca
1818
var perSecondPool Client
1919
if s.RedisPerSecond {
2020
perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType,
21-
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout)
21+
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout,
22+
s.RedisPerSecondPoolOnEmptyBehavior, s.RedisPerSecondPoolOnEmptyWaitDuration)
2223
closer.Closers = append(closer.Closers, perSecondPool)
2324
}
2425

2526
otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize,
26-
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout)
27+
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout,
28+
s.RedisPoolOnEmptyBehavior, s.RedisPoolOnEmptyWaitDuration)
2729
closer.Closers = append(closer.Closers, otherPool)
2830

2931
return NewFixedRateLimitCacheImpl(

src/redis/driver_impl.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func checkError(err error) {
7272

7373
func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int,
7474
pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server,
75-
timeout time.Duration,
75+
timeout time.Duration, poolOnEmptyBehavior string, poolOnEmptyWaitDuration time.Duration,
7676
) Client {
7777
maskedUrl := utils.MaskCredentialsInUrl(url)
7878
logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize)
@@ -112,6 +112,21 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT
112112
}
113113
logger.Debugf("Implicit pipelining enabled: %v", implicitPipelining)
114114

115+
switch strings.ToUpper(poolOnEmptyBehavior) {
116+
case "WAIT":
117+
opts = append(opts, radix.PoolOnEmptyWait())
118+
logger.Warnf("Redis pool %s: on-empty=WAIT (block until connection available)", maskedUrl)
119+
case "CREATE":
120+
opts = append(opts, radix.PoolOnEmptyCreateAfter(poolOnEmptyWaitDuration))
121+
logger.Warnf("Redis pool %s: on-empty=CREATE after %v", maskedUrl, poolOnEmptyWaitDuration)
122+
case "ERROR":
123+
opts = append(opts, radix.PoolOnEmptyErrAfter(poolOnEmptyWaitDuration))
124+
logger.Warnf("Redis pool %s: on-empty=ERROR after %v (fail-fast)", maskedUrl, poolOnEmptyWaitDuration)
125+
default:
126+
logger.Warnf("Redis pool %s: invalid on-empty behavior '%s', using default CREATE after %v", maskedUrl, poolOnEmptyBehavior, poolOnEmptyWaitDuration)
127+
opts = append(opts, radix.PoolOnEmptyCreateAfter(poolOnEmptyWaitDuration))
128+
}
129+
115130
poolFunc := func(network, addr string) (radix.Client, error) {
116131
return radix.NewPool(network, addr, poolSize, opts...)
117132
}

src/settings/settings.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,25 @@ type Settings struct {
163163
RedisTimeout time.Duration `envconfig:"REDIS_TIMEOUT" default:"10s"`
164164
// RedisPerSecondTimeout sets the timeout for per-second Redis connection and I/O operations.
165165
RedisPerSecondTimeout time.Duration `envconfig:"REDIS_PERSECOND_TIMEOUT" default:"10s"`
166+
167+
// RedisPoolOnEmptyBehavior controls what happens when Redis connection pool is empty.
168+
// This setting helps prevent connection storms during Redis failures.
169+
// Possible values:
170+
// - "CREATE": Create a new connection after RedisPoolOnEmptyWaitDuration (default)
171+
// - "ERROR": Return error after RedisPoolOnEmptyWaitDuration
172+
// - "WAIT": Block until a connection is available
173+
RedisPoolOnEmptyBehavior string `envconfig:"REDIS_POOL_ON_EMPTY_BEHAVIOR" default:"CREATE"`
174+
// RedisPoolOnEmptyWaitDuration is the wait duration before taking action when pool is empty.
175+
// Only applicable when RedisPoolOnEmptyBehavior is "CREATE" or "ERROR".
176+
RedisPoolOnEmptyWaitDuration time.Duration `envconfig:"REDIS_POOL_ON_EMPTY_WAIT_DURATION" default:"1s"`
177+
178+
// RedisPerSecondPoolOnEmptyBehavior controls pool-empty behavior for per-second Redis.
179+
// See RedisPoolOnEmptyBehavior for possible values and details.
180+
RedisPerSecondPoolOnEmptyBehavior string `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR" default:"CREATE"`
181+
// RedisPerSecondPoolOnEmptyWaitDuration is the wait duration for per-second Redis pool.
182+
// See RedisPoolOnEmptyWaitDuration for details.
183+
RedisPerSecondPoolOnEmptyWaitDuration time.Duration `envconfig:"REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION" default:"1s"`
184+
166185
// Memcache settings
167186
MemcacheHostPort []string `envconfig:"MEMCACHE_HOST_PORT" default:""`
168187
// MemcacheMaxIdleConns sets the maximum number of idle TCP connections per memcached node.

src/settings/settings_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package settings
22

33
import (
4+
"os"
45
"testing"
6+
"time"
57

68
"github.com/stretchr/testify/assert"
79
)
@@ -11,3 +13,115 @@ func TestSettingsTlsConfigUnmodified(t *testing.T) {
1113
assert.NotNil(t, settings.RedisTlsConfig)
1214
assert.Nil(t, settings.RedisTlsConfig.RootCAs)
1315
}
16+
17+
// Tests for RedisPoolOnEmptyBehavior
18+
func TestRedisPoolOnEmptyBehavior_Default(t *testing.T) {
19+
os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
20+
os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION")
21+
22+
settings := NewSettings()
23+
24+
assert.Equal(t, "CREATE", settings.RedisPoolOnEmptyBehavior)
25+
assert.Equal(t, 1*time.Second, settings.RedisPoolOnEmptyWaitDuration)
26+
}
27+
28+
func TestRedisPoolOnEmptyBehavior_Error(t *testing.T) {
29+
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR")
30+
os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "0")
31+
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
32+
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION")
33+
34+
settings := NewSettings()
35+
36+
assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior)
37+
assert.Equal(t, time.Duration(0), settings.RedisPoolOnEmptyWaitDuration)
38+
}
39+
40+
func TestRedisPoolOnEmptyBehavior_ErrorWithDuration(t *testing.T) {
41+
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR")
42+
os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "100ms")
43+
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
44+
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION")
45+
46+
settings := NewSettings()
47+
48+
assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior)
49+
assert.Equal(t, 100*time.Millisecond, settings.RedisPoolOnEmptyWaitDuration)
50+
}
51+
52+
func TestRedisPoolOnEmptyBehavior_Create(t *testing.T) {
53+
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "CREATE")
54+
os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "500ms")
55+
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
56+
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION")
57+
58+
settings := NewSettings()
59+
60+
assert.Equal(t, "CREATE", settings.RedisPoolOnEmptyBehavior)
61+
assert.Equal(t, 500*time.Millisecond, settings.RedisPoolOnEmptyWaitDuration)
62+
}
63+
64+
func TestRedisPoolOnEmptyBehavior_Wait(t *testing.T) {
65+
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "WAIT")
66+
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
67+
68+
settings := NewSettings()
69+
70+
assert.Equal(t, "WAIT", settings.RedisPoolOnEmptyBehavior)
71+
}
72+
73+
func TestRedisPoolOnEmptyBehavior_CaseInsensitive(t *testing.T) {
74+
// Test that lowercase values work (processing is done in driver_impl.go)
75+
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "error")
76+
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
77+
78+
settings := NewSettings()
79+
80+
// Setting stores as-is, case conversion happens in driver_impl.go
81+
assert.Equal(t, "error", settings.RedisPoolOnEmptyBehavior)
82+
}
83+
84+
// Tests for RedisPerSecondPoolOnEmptyBehavior
85+
func TestRedisPerSecondPoolOnEmptyBehavior_Default(t *testing.T) {
86+
os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR")
87+
os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION")
88+
89+
settings := NewSettings()
90+
91+
assert.Equal(t, "CREATE", settings.RedisPerSecondPoolOnEmptyBehavior)
92+
assert.Equal(t, 1*time.Second, settings.RedisPerSecondPoolOnEmptyWaitDuration)
93+
}
94+
95+
func TestRedisPerSecondPoolOnEmptyBehavior_Error(t *testing.T) {
96+
os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR", "ERROR")
97+
os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION", "50ms")
98+
defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR")
99+
defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION")
100+
101+
settings := NewSettings()
102+
103+
assert.Equal(t, "ERROR", settings.RedisPerSecondPoolOnEmptyBehavior)
104+
assert.Equal(t, 50*time.Millisecond, settings.RedisPerSecondPoolOnEmptyWaitDuration)
105+
}
106+
107+
// Test both pools can be configured independently
108+
func TestRedisPoolOnEmptyBehavior_IndependentConfiguration(t *testing.T) {
109+
os.Setenv("REDIS_POOL_ON_EMPTY_BEHAVIOR", "ERROR")
110+
os.Setenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION", "0")
111+
os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR", "CREATE")
112+
os.Setenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION", "100ms")
113+
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
114+
defer os.Unsetenv("REDIS_POOL_ON_EMPTY_WAIT_DURATION")
115+
defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_BEHAVIOR")
116+
defer os.Unsetenv("REDIS_PERSECOND_POOL_ON_EMPTY_WAIT_DURATION")
117+
118+
settings := NewSettings()
119+
120+
// Main pool configured for fail-fast
121+
assert.Equal(t, "ERROR", settings.RedisPoolOnEmptyBehavior)
122+
assert.Equal(t, time.Duration(0), settings.RedisPoolOnEmptyWaitDuration)
123+
124+
// Per-second pool configured differently
125+
assert.Equal(t, "CREATE", settings.RedisPerSecondPoolOnEmptyBehavior)
126+
assert.Equal(t, 100*time.Millisecond, settings.RedisPerSecondPoolOnEmptyWaitDuration)
127+
}

src/srv/srv_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ import (
88
)
99

1010
func mockAddrsLookup(service, proto, name string) (cname string, addrs []*net.SRV, err error) {
11-
return "ignored", []*net.SRV{{"z", 1, 0, 0}, {"z", 0, 0, 0}, {"a", 9001, 0, 0}}, nil
11+
return "ignored", []*net.SRV{
12+
{Target: "z", Port: 1, Priority: 0, Weight: 0},
13+
{Target: "z", Port: 0, Priority: 0, Weight: 0},
14+
{Target: "a", Port: 9001, Priority: 0, Weight: 0},
15+
}, nil
1216
}
1317

1418
func TestLookupServerStringsFromSrvReturnsServersSorted(t *testing.T) {

test/redis/bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) {
4444
return func(b *testing.B) {
4545
statsStore := gostats.NewStore(gostats.NewNullSink(), false)
4646
sm := stats.NewMockStatManager(statsStore)
47-
client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
47+
client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second, "", 0)
4848
defer client.Close()
4949

5050
cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm, true)

0 commit comments

Comments
 (0)