Skip to content

Commit ca9c96e

Browse files
committed
feat: Create instrumented redis client
1 parent b2d9376 commit ca9c96e

File tree

4 files changed

+343
-0
lines changed

4 files changed

+343
-0
lines changed

pkg/cache/redis/redis.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package redis
2+
3+
import (
4+
"crypto/tls"
5+
"crypto/x509"
6+
7+
"github.com/redis/go-redis/v9"
8+
9+
"github.com/scribd/go-sdk/pkg/cache"
10+
)
11+
12+
func New(cfg *cache.Redis) (redis.UniversalClient, error) {
13+
opts, err := cfgToRedisClientOptions(cfg)
14+
if err != nil {
15+
return nil, err
16+
}
17+
18+
return redis.NewUniversalClient(opts), nil
19+
}
20+
21+
func cfgToRedisClientOptions(cfg *cache.Redis) (*redis.UniversalOptions, error) {
22+
var err error
23+
var clusterOptions *redis.ClusterOptions
24+
if cfg.URL != "" {
25+
clusterOptions, err = redis.ParseClusterURL(cfg.URL)
26+
if err != nil {
27+
return nil, err
28+
}
29+
}
30+
31+
opts := &redis.UniversalOptions{
32+
Addrs: cfg.Addrs,
33+
DB: cfg.DB,
34+
ClientName: cfg.ClientName,
35+
36+
Protocol: cfg.Protocol,
37+
Username: cfg.Username,
38+
Password: cfg.Password,
39+
40+
SentinelUsername: cfg.SentinelUsername,
41+
SentinelPassword: cfg.SentinelPassword,
42+
43+
MaxRetries: cfg.MaxRetries,
44+
MinRetryBackoff: cfg.MinRetryBackoff,
45+
MaxRetryBackoff: cfg.MaxRetryBackoff,
46+
47+
DialTimeout: cfg.DialTimeout,
48+
ReadTimeout: cfg.ReadTimeout,
49+
WriteTimeout: cfg.WriteTimeout,
50+
ContextTimeoutEnabled: cfg.ContextTimeoutEnabled,
51+
52+
PoolSize: cfg.PoolSize,
53+
PoolTimeout: cfg.PoolTimeout,
54+
MaxIdleConns: cfg.MaxIdleConns,
55+
MinIdleConns: cfg.MinIdleConns,
56+
MaxActiveConns: cfg.MaxActiveConns,
57+
ConnMaxIdleTime: cfg.ConnMaxIdleTime,
58+
ConnMaxLifetime: cfg.ConnMaxLifetime,
59+
60+
MaxRedirects: cfg.MaxRedirects,
61+
ReadOnly: cfg.ReadOnly,
62+
RouteByLatency: cfg.RouteByLatency,
63+
RouteRandomly: cfg.RouteRandomly,
64+
65+
MasterName: cfg.MasterName,
66+
DisableIndentity: cfg.DisableIndentity,
67+
IdentitySuffix: cfg.IdentitySuffix,
68+
}
69+
if clusterOptions != nil {
70+
opts.Addrs = clusterOptions.Addrs
71+
opts.ClientName = clusterOptions.ClientName
72+
73+
opts.Protocol = clusterOptions.Protocol
74+
opts.Username = clusterOptions.Username
75+
opts.Password = clusterOptions.Password
76+
77+
if clusterOptions.MaxRetries != 0 {
78+
opts.MaxRetries = clusterOptions.MaxRetries
79+
}
80+
if clusterOptions.MinRetryBackoff != 0 {
81+
opts.MinRetryBackoff = clusterOptions.MinRetryBackoff
82+
}
83+
if clusterOptions.MaxRetryBackoff != 0 {
84+
opts.MaxRetryBackoff = clusterOptions.MaxRetryBackoff
85+
}
86+
87+
if clusterOptions.DialTimeout != 0 {
88+
opts.DialTimeout = clusterOptions.DialTimeout
89+
}
90+
if clusterOptions.ReadTimeout != 0 {
91+
opts.ReadTimeout = clusterOptions.ReadTimeout
92+
}
93+
if clusterOptions.WriteTimeout != 0 {
94+
opts.WriteTimeout = clusterOptions.WriteTimeout
95+
}
96+
if clusterOptions.ContextTimeoutEnabled {
97+
opts.ContextTimeoutEnabled = clusterOptions.ContextTimeoutEnabled
98+
}
99+
100+
if clusterOptions.PoolSize != 0 {
101+
opts.PoolSize = clusterOptions.PoolSize
102+
}
103+
if clusterOptions.PoolTimeout != 0 {
104+
opts.PoolTimeout = clusterOptions.PoolTimeout
105+
}
106+
if clusterOptions.MaxIdleConns != 0 {
107+
opts.MaxIdleConns = clusterOptions.MaxIdleConns
108+
}
109+
if clusterOptions.MinIdleConns != 0 {
110+
opts.MinIdleConns = clusterOptions.MinIdleConns
111+
}
112+
if clusterOptions.MaxActiveConns != 0 {
113+
opts.MaxActiveConns = clusterOptions.MaxActiveConns
114+
}
115+
if clusterOptions.ConnMaxIdleTime != 0 {
116+
opts.ConnMaxIdleTime = clusterOptions.ConnMaxIdleTime
117+
}
118+
if clusterOptions.ConnMaxLifetime != 0 {
119+
opts.ConnMaxLifetime = clusterOptions.ConnMaxLifetime
120+
}
121+
122+
if clusterOptions.MaxRedirects != 0 {
123+
opts.MaxRedirects = clusterOptions.MaxRedirects
124+
}
125+
if clusterOptions.ReadOnly {
126+
opts.ReadOnly = clusterOptions.ReadOnly
127+
}
128+
if clusterOptions.RouteByLatency {
129+
opts.RouteByLatency = clusterOptions.RouteByLatency
130+
}
131+
if clusterOptions.RouteRandomly {
132+
opts.RouteRandomly = clusterOptions.RouteRandomly
133+
}
134+
}
135+
136+
if cfg.TLS.Enabled {
137+
var caCertPool *x509.CertPool
138+
139+
if cfg.TLS.Ca != "" {
140+
caCertPool = x509.NewCertPool()
141+
caCertPool.AppendCertsFromPEM([]byte(cfg.TLS.Ca))
142+
}
143+
144+
var certificates []tls.Certificate
145+
if cfg.TLS.Cert != "" && cfg.TLS.CertKey != "" {
146+
cert, err := tls.X509KeyPair([]byte(cfg.TLS.Cert), []byte(cfg.TLS.CertKey))
147+
if err != nil {
148+
return nil, err
149+
}
150+
certificates = []tls.Certificate{cert}
151+
}
152+
153+
opts.TLSConfig = &tls.Config{
154+
InsecureSkipVerify: cfg.TLS.InsecureSkipVerify,
155+
Certificates: certificates,
156+
RootCAs: caCertPool,
157+
}
158+
}
159+
160+
return opts, nil
161+
}

pkg/cache/redis/redis_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package redis
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/redis/go-redis/v9"
8+
"github.com/stretchr/testify/assert"
9+
10+
"github.com/scribd/go-sdk/pkg/cache"
11+
)
12+
13+
func TestNew(t *testing.T) {
14+
tests := []struct {
15+
name string
16+
cfg cache.Redis
17+
wantErr bool
18+
}{
19+
{
20+
name: "Config without URL set",
21+
cfg: cache.Redis{
22+
Addrs: []string{"localhost:6379"},
23+
},
24+
},
25+
{
26+
name: "Config with URL set",
27+
cfg: cache.Redis{
28+
URL: "redis://localhost:6379",
29+
},
30+
},
31+
{
32+
name: "Config with URL set to cluster URL",
33+
cfg: cache.Redis{
34+
URL: "redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791",
35+
},
36+
},
37+
{
38+
name: "Config with invalid URL",
39+
cfg: cache.Redis{
40+
URL: "localhost:6379",
41+
},
42+
wantErr: true,
43+
},
44+
}
45+
46+
for _, tt := range tests {
47+
t.Run(tt.name, func(t *testing.T) {
48+
_, err := New(&tt.cfg)
49+
if tt.wantErr {
50+
assert.Error(t, err)
51+
} else {
52+
assert.NoError(t, err)
53+
}
54+
})
55+
}
56+
}
57+
58+
func TestCfgToRedisClientOptions(t *testing.T) {
59+
tests := []struct {
60+
name string
61+
cfg cache.Redis
62+
check func(t *testing.T, opts *redis.UniversalOptions)
63+
wantErr bool
64+
}{
65+
{
66+
name: "Config without URL set",
67+
cfg: cache.Redis{
68+
Addrs: []string{"localhost:6379"},
69+
},
70+
check: func(t *testing.T, opts *redis.UniversalOptions) {
71+
assert.Equal(t, []string{"localhost:6379"}, opts.Addrs)
72+
},
73+
},
74+
{
75+
name: "Config with URL set",
76+
cfg: cache.Redis{
77+
URL: "redis://localhost:6379",
78+
},
79+
check: func(t *testing.T, opts *redis.UniversalOptions) {
80+
assert.Equal(t, []string{"localhost:6379"}, opts.Addrs)
81+
},
82+
},
83+
{
84+
name: "Config with TLS enabled",
85+
cfg: cache.Redis{
86+
URL: "rediss://localhost:6379",
87+
TLS: cache.TLS{
88+
Enabled: true,
89+
},
90+
},
91+
check: func(t *testing.T, opts *redis.UniversalOptions) {
92+
assert.NotNil(t, opts.TLSConfig)
93+
assert.False(t, opts.TLSConfig.InsecureSkipVerify)
94+
},
95+
},
96+
{
97+
name: "Config with URL set to cluster URL",
98+
cfg: cache.Redis{
99+
URL: "redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791",
100+
},
101+
check: func(t *testing.T, opts *redis.UniversalOptions) {
102+
assert.Equal(t, []string{"localhost:6789", "localhost:6790", "localhost:6791"}, opts.Addrs)
103+
assert.Equal(t, 3*time.Second, opts.DialTimeout)
104+
assert.Equal(t, 6*time.Second, opts.ReadTimeout)
105+
assert.Equal(t, "user", opts.Username)
106+
assert.Equal(t, "password", opts.Password)
107+
},
108+
},
109+
{
110+
name: "Config with invalid URL",
111+
cfg: cache.Redis{
112+
URL: "localhost:6379",
113+
},
114+
wantErr: true,
115+
},
116+
}
117+
118+
for _, tt := range tests {
119+
t.Run(tt.name, func(t *testing.T) {
120+
opts, err := cfgToRedisClientOptions(&tt.cfg)
121+
if tt.wantErr {
122+
assert.Error(t, err)
123+
} else {
124+
assert.NoError(t, err)
125+
126+
tt.check(t, opts)
127+
}
128+
})
129+
}
130+
}

pkg/instrumentation/redis.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package instrumentation
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/redis/go-redis/v9"
7+
redistrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/go-redis.v9"
8+
)
9+
10+
const (
11+
redisServiceNameSuffix = "cache-redis"
12+
)
13+
14+
func InstrumentRedis(client redis.UniversalClient, applicationName string) {
15+
serviceName := fmt.Sprintf("%s-%s", applicationName, redisServiceNameSuffix)
16+
17+
redistrace.WrapClient(client, redistrace.WithServiceName(serviceName))
18+
}

pkg/logger/redis.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package logger
2+
3+
import (
4+
"context"
5+
6+
"github.com/redis/go-redis/v9"
7+
8+
"github.com/scribd/go-sdk/pkg/instrumentation"
9+
)
10+
11+
type (
12+
RedisLogger struct {
13+
logger Logger
14+
}
15+
)
16+
17+
func NewRedisLogger(l Logger) *RedisLogger {
18+
return &RedisLogger{l}
19+
}
20+
21+
func (r *RedisLogger) Printf(ctx context.Context, format string, v ...interface{}) {
22+
logContext := instrumentation.TraceLogs(ctx)
23+
24+
r.logger.WithFields(Fields{
25+
"dd": Fields{
26+
"trace_id": logContext.TraceID,
27+
"span_id": logContext.SpanID,
28+
},
29+
}).Errorf(format, v...)
30+
}
31+
32+
func SetRedisLogger(logger Logger) {
33+
redis.SetLogger(NewRedisLogger(logger))
34+
}

0 commit comments

Comments
 (0)