Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
- [Local Cache](#local-cache)
- [Redis](#redis)
- [Redis type](#redis-type)
- [Connection Timeout](#connection-timeout)
- [Pipelining](#pipelining)
- [One Redis Instance](#one-redis-instance)
- [Two Redis Instances](#two-redis-instances)
Expand Down Expand Up @@ -1113,6 +1114,13 @@ The deployment type can be specified with the `REDIS_TYPE` / `REDIS_PERSECOND_TY
1. "sentinel": A comma separated list with the first string as the master name of the sentinel cluster followed by hostname:port pairs. The list size should be >= 2. The first item is the name of the master and the rest are the sentinels.
1. "cluster": A comma separated list of hostname:port pairs with all the nodes in the cluster.

## Connection Timeout

Connection timeout controls the maximum duration for Redis connection establishment, read operations, and write operations.

1. `REDIS_TIMEOUT`: sets the timeout for Redis connection and I/O operations. Default: `10s`
1. `REDIS_PERSECOND_TIMEOUT`: sets the timeout for per-second Redis connection and I/O operations. Default: `10s`

## Pipelining

By default, for each request, ratelimit will pick up a connection from pool, write multiple redis commands in a single write then reads their responses in a single read. This reduces network delay.
Expand Down
4 changes: 2 additions & 2 deletions src/redis/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca
var perSecondPool Client
if s.RedisPerSecond {
perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType,
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv)
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout)
closer.Closers = append(closer.Closers, perSecondPool)
}

otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize,
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv)
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout)
closer.Closers = append(closer.Closers, otherPool)

return NewFixedRateLimitCacheImpl(
Expand Down
3 changes: 3 additions & 0 deletions src/redis/driver_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,16 @@ func checkError(err error) {

func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int,
pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server,
timeout time.Duration,
) Client {
maskedUrl := utils.MaskCredentialsInUrl(url)
logger.Warnf("connecting to redis on %s with pool size %d", maskedUrl, poolSize)

df := func(network, addr string) (radix.Conn, error) {
var dialOpts []radix.DialOpt

dialOpts = append(dialOpts, radix.DialTimeout(timeout))

if useTls {
dialOpts = append(dialOpts, radix.DialUseTLS(tlsConfig))
}
Expand Down
4 changes: 4 additions & 0 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ type Settings struct {
RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"`
// Enable healthcheck to check Redis Connection. If there is no active connection, healthcheck failed.
RedisHealthCheckActiveConnection bool `envconfig:"REDIS_HEALTH_CHECK_ACTIVE_CONNECTION" default:"false"`
// RedisTimeout sets the timeout for Redis connection and I/O operations.
RedisTimeout time.Duration `envconfig:"REDIS_TIMEOUT" default:"10s"`
// RedisPerSecondTimeout sets the timeout for per-second Redis connection and I/O operations.
RedisPerSecondTimeout time.Duration `envconfig:"REDIS_PERSECOND_TIMEOUT" default:"10s"`
// Memcache settings
MemcacheHostPort []string `envconfig:"MEMCACHE_HOST_PORT" default:""`
// MemcacheMaxIdleConns sets the maximum number of idle TCP connections per memcached node.
Expand Down
2 changes: 1 addition & 1 deletion test/redis/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) {
return func(b *testing.B) {
statsStore := gostats.NewStore(gostats.NewNullSink(), false)
sm := stats.NewMockStatManager(statsStore)
client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil)
client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
defer client.Close()

cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm, true)
Expand Down
6 changes: 3 additions & 3 deletions test/redis/driver_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit
statsStore := stats.NewStore(stats.NewNullSink(), false)

mkRedisClient := func(auth, addr string) redis.Client {
return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil)
return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
}

t.Run("connection refused", func(t *testing.T) {
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestDoCmd(t *testing.T) {
statsStore := stats.NewStore(stats.NewNullSink(), false)

mkRedisClient := func(addr string) redis.Client {
return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil)
return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second)
}

t.Run("SETGET ok", func(t *testing.T) {
Expand Down Expand Up @@ -176,7 +176,7 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f
statsStore := stats.NewStore(stats.NewNullSink(), false)

mkRedisClient := func(addr string) redis.Client {
return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil)
return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
}

t.Run("SETGET ok", func(t *testing.T) {
Expand Down
Loading