Skip to content

Commit 4e6ec29

Browse files
authored
Merge branch 'master' into master
2 parents 3dc3051 + 3c85d09 commit 4e6ec29

File tree

14 files changed

+1015
-91
lines changed

14 files changed

+1015
-91
lines changed

.github/actions/run-tests/action.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ runs:
2525
2626
# Mapping of redis version to redis testing containers
2727
declare -A redis_version_mapping=(
28-
["8.2.x"]="8.2-M01-pre"
28+
["8.2.x"]="8.2-rc2-pre"
2929
["8.0.x"]="8.0.2"
3030
["7.4.x"]="rs-7.4.0-v5"
3131
["7.2.x"]="rs-7.2.0-v17"

.github/workflows/build.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ name: Go
22

33
on:
44
push:
5-
branches: [master, v9, v9.7, v9.8]
5+
branches: [master, v9, v9.7, v9.8, 'ndyakov/*', 'ofekshenawa/*', 'htemelski-redis/*', 'ce/*']
66
pull_request:
7-
branches: [master, v9, v9.7, v9.8]
7+
branches: [master, v9, v9.7, v9.8, 'ndyakov/*', 'ofekshenawa/*', 'htemelski-redis/*', 'ce/*']
88

99
permissions:
1010
contents: read
@@ -44,7 +44,7 @@ jobs:
4444
4545
# Mapping of redis version to redis testing containers
4646
declare -A redis_version_mapping=(
47-
["8.2.x"]="8.2-M01-pre"
47+
["8.2.x"]="8.2-rc2-pre"
4848
["8.0.x"]="8.0.2"
4949
["7.4.x"]="rs-7.4.0-v5"
5050
)

commands_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6169,6 +6169,34 @@ var _ = Describe("Commands", func() {
61696169
Expect(n).To(Equal(int64(3)))
61706170
})
61716171

6172+
It("should XTrimMaxLenMode", func() {
6173+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6174+
n, err := client.XTrimMaxLenMode(ctx, "stream", 0, "KEEPREF").Result()
6175+
Expect(err).NotTo(HaveOccurred())
6176+
Expect(n).To(BeNumerically(">=", 0))
6177+
})
6178+
6179+
It("should XTrimMaxLenApproxMode", func() {
6180+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6181+
n, err := client.XTrimMaxLenApproxMode(ctx, "stream", 0, 0, "KEEPREF").Result()
6182+
Expect(err).NotTo(HaveOccurred())
6183+
Expect(n).To(BeNumerically(">=", 0))
6184+
})
6185+
6186+
It("should XTrimMinIDMode", func() {
6187+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6188+
n, err := client.XTrimMinIDMode(ctx, "stream", "4-0", "KEEPREF").Result()
6189+
Expect(err).NotTo(HaveOccurred())
6190+
Expect(n).To(BeNumerically(">=", 0))
6191+
})
6192+
6193+
It("should XTrimMinIDApproxMode", func() {
6194+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6195+
n, err := client.XTrimMinIDApproxMode(ctx, "stream", "4-0", 0, "KEEPREF").Result()
6196+
Expect(err).NotTo(HaveOccurred())
6197+
Expect(n).To(BeNumerically(">=", 0))
6198+
})
6199+
61726200
It("should XAdd", func() {
61736201
id, err := client.XAdd(ctx, &redis.XAddArgs{
61746202
Stream: "stream",
@@ -6222,6 +6250,37 @@ var _ = Describe("Commands", func() {
62226250
Expect(n).To(Equal(int64(3)))
62236251
})
62246252

6253+
It("should XAckDel", func() {
6254+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6255+
// First, create a consumer group
6256+
err := client.XGroupCreate(ctx, "stream", "testgroup", "0").Err()
6257+
Expect(err).NotTo(HaveOccurred())
6258+
6259+
// Read messages to create pending entries
6260+
_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
6261+
Group: "testgroup",
6262+
Consumer: "testconsumer",
6263+
Streams: []string{"stream", ">"},
6264+
}).Result()
6265+
Expect(err).NotTo(HaveOccurred())
6266+
6267+
// Test XAckDel with KEEPREF mode
6268+
n, err := client.XAckDel(ctx, "stream", "testgroup", "KEEPREF", "1-0", "2-0").Result()
6269+
Expect(err).NotTo(HaveOccurred())
6270+
Expect(n).To(HaveLen(2))
6271+
6272+
// Clean up
6273+
client.XGroupDestroy(ctx, "stream", "testgroup")
6274+
})
6275+
6276+
It("should XDelEx", func() {
6277+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6278+
// Test XDelEx with KEEPREF mode
6279+
n, err := client.XDelEx(ctx, "stream", "KEEPREF", "1-0", "2-0").Result()
6280+
Expect(err).NotTo(HaveOccurred())
6281+
Expect(n).To(HaveLen(2))
6282+
})
6283+
62256284
It("should XLen", func() {
62266285
n, err := client.XLen(ctx, "stream").Result()
62276286
Expect(err).NotTo(HaveOccurred())

doctests/geo_index_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,11 @@ func ExampleClient_geoindex() {
199199
// OK
200200
// OK
201201
// OK
202-
// {1 [{product:46885 <nil> <nil> <nil> map[$:{"city":"Denver","description":"Navy Blue Slippers","location":"-104.991531, 39.742043","price":45.99}]}]}
202+
// {1 [{product:46885 <nil> <nil> <nil> map[$:{"city":"Denver","description":"Navy Blue Slippers","location":"-104.991531, 39.742043","price":45.99}] <nil>}]}
203203
// OK
204204
// OK
205205
// OK
206206
// OK
207207
// OK
208-
// {1 [{shape:4 <nil> <nil> <nil> map[$:[{"geom":"POINT (2 2)","name":"Purple Point"}]]}]}
208+
// {1 [{shape:4 <nil> <nil> <nil> map[$:[{"geom":"POINT (2 2)","name":"Purple Point"}]] <nil>}]}
209209
}

doctests/home_json_example_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func ExampleClient_search_json() {
219219
// STEP_END
220220

221221
// Output:
222-
// {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv","email":"[email protected]","name":"Paul Zamir"}]}]}
222+
// {1 [{user:3 <nil> <nil> <nil> map[$:{"age":35,"city":"Tel Aviv","email":"[email protected]","name":"Paul Zamir"}] <nil>}]}
223223
// London
224224
// Tel Aviv
225225
// 0
@@ -329,5 +329,5 @@ func ExampleClient_search_hash() {
329329
// STEP_END
330330

331331
// Output:
332-
// {1 [{huser:3 <nil> <nil> <nil> map[age:35 city:Tel Aviv email:[email protected] name:Paul Zamir]}]}
332+
// {1 [{huser:3 <nil> <nil> <nil> map[age:35 city:Tel Aviv email:[email protected] name:Paul Zamir] <nil>}]}
333333
}

doctests/search_quickstart_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,6 @@ func ExampleClient_search_qs() {
257257

258258
// Output:
259259
// Documents found: 10
260-
// {1 [{bicycle:0 <nil> <nil> <nil> map[$:{"brand":"Velorim","condition":"new","description":"Small and powerful, the Jigger is the best ride for the smallest of tikes! This is the tiniest kids’ pedal bike on the market available without a coaster brake, the Jigger is the vehicle of choice for the rare tenacious little rider raring to go.","model":"Jigger","price":270}]}]}
261-
// {1 [{bicycle:4 <nil> <nil> <nil> map[$:{"brand":"Noka Bikes","condition":"used","description":"Whether you want to try your hand at XC racing or are looking for a lively trail bike that's just as inspiring on the climbs as it is over rougher ground, the Wilder is one heck of a bike built specifically for short women. Both the frames and components have been tweaked to include a women’s saddle, different bars and unique colourway.","model":"Kahuna","price":3200}]}]}
260+
// {1 [{bicycle:0 <nil> <nil> <nil> map[$:{"brand":"Velorim","condition":"new","description":"Small and powerful, the Jigger is the best ride for the smallest of tikes! This is the tiniest kids’ pedal bike on the market available without a coaster brake, the Jigger is the vehicle of choice for the rare tenacious little rider raring to go.","model":"Jigger","price":270}] <nil>}]}
261+
// {1 [{bicycle:4 <nil> <nil> <nil> map[$:{"brand":"Noka Bikes","condition":"used","description":"Whether you want to try your hand at XC racing or are looking for a lively trail bike that's just as inspiring on the climbs as it is over rougher ground, the Wilder is one heck of a bike built specifically for short women. Both the frames and components have been tweaked to include a women’s saddle, different bars and unique colourway.","model":"Kahuna","price":3200}] <nil>}]}
262262
}

extra/redisotel/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type config struct {
2828
meter metric.Meter
2929

3030
poolName string
31+
32+
closeChan chan struct{}
3133
}
3234

3335
type baseOption interface {
@@ -145,3 +147,9 @@ func WithMeterProvider(mp metric.MeterProvider) MetricsOption {
145147
conf.mp = mp
146148
})
147149
}
150+
151+
func WithCloseChan(closeChan chan struct{}) MetricsOption {
152+
return metricsOption(func(conf *config) {
153+
conf.closeChan = closeChan
154+
})
155+
}

extra/redisotel/metrics.go

Lines changed: 73 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net"
7+
"sync"
78
"time"
89

910
"go.opentelemetry.io/otel"
@@ -13,6 +14,12 @@ import (
1314
"github.com/redis/go-redis/v9"
1415
)
1516

17+
type metricsState struct {
18+
registrations []metric.Registration
19+
closed bool
20+
mutex sync.Mutex
21+
}
22+
1623
// InstrumentMetrics starts reporting OpenTelemetry Metrics.
1724
//
1825
// Based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/database-metrics.md
@@ -30,49 +37,42 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
3037
)
3138
}
3239

33-
switch rdb := rdb.(type) {
34-
case *redis.Client:
35-
if conf.poolName == "" {
36-
opt := rdb.Options()
37-
conf.poolName = opt.Addr
40+
var state *metricsState
41+
if conf.closeChan != nil {
42+
state = &metricsState{
43+
registrations: make([]metric.Registration, 0),
44+
closed: false,
45+
mutex: sync.Mutex{},
3846
}
39-
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
4047

41-
if err := reportPoolStats(rdb, conf); err != nil {
42-
return err
43-
}
44-
if err := addMetricsHook(rdb, conf); err != nil {
45-
return err
46-
}
47-
return nil
48-
case *redis.ClusterClient:
49-
rdb.OnNewNode(func(rdb *redis.Client) {
50-
if conf.poolName == "" {
51-
opt := rdb.Options()
52-
conf.poolName = opt.Addr
53-
}
54-
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
48+
go func() {
49+
<-conf.closeChan
5550

56-
if err := reportPoolStats(rdb, conf); err != nil {
57-
otel.Handle(err)
51+
state.mutex.Lock()
52+
state.closed = true
53+
54+
for _, registration := range state.registrations {
55+
if err := registration.Unregister(); err != nil {
56+
otel.Handle(err)
57+
}
5858
}
59-
if err := addMetricsHook(rdb, conf); err != nil {
59+
state.mutex.Unlock()
60+
}()
61+
}
62+
63+
switch rdb := rdb.(type) {
64+
case *redis.Client:
65+
return registerClient(rdb, conf, state)
66+
case *redis.ClusterClient:
67+
rdb.OnNewNode(func(rdb *redis.Client) {
68+
if err := registerClient(rdb, conf, state); err != nil {
6069
otel.Handle(err)
6170
}
6271
})
6372
return nil
6473
case *redis.Ring:
6574
rdb.OnNewNode(func(rdb *redis.Client) {
66-
if conf.poolName == "" {
67-
opt := rdb.Options()
68-
conf.poolName = opt.Addr
69-
}
70-
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
71-
72-
if err := reportPoolStats(rdb, conf); err != nil {
73-
otel.Handle(err)
74-
}
75-
if err := addMetricsHook(rdb, conf); err != nil {
75+
if err := registerClient(rdb, conf, state); err != nil {
7676
otel.Handle(err)
7777
}
7878
})
@@ -82,7 +82,38 @@ func InstrumentMetrics(rdb redis.UniversalClient, opts ...MetricsOption) error {
8282
}
8383
}
8484

85-
func reportPoolStats(rdb *redis.Client, conf *config) error {
85+
func registerClient(rdb *redis.Client, conf *config, state *metricsState) error {
86+
if state != nil {
87+
state.mutex.Lock()
88+
defer state.mutex.Unlock()
89+
90+
if state.closed {
91+
return nil
92+
}
93+
}
94+
95+
if conf.poolName == "" {
96+
opt := rdb.Options()
97+
conf.poolName = opt.Addr
98+
}
99+
conf.attrs = append(conf.attrs, attribute.String("pool.name", conf.poolName))
100+
101+
registration, err := reportPoolStats(rdb, conf)
102+
if err != nil {
103+
return err
104+
}
105+
106+
if state != nil {
107+
state.registrations = append(state.registrations, registration)
108+
}
109+
110+
if err := addMetricsHook(rdb, conf); err != nil {
111+
return err
112+
}
113+
return nil
114+
}
115+
116+
func reportPoolStats(rdb *redis.Client, conf *config) (metric.Registration, error) {
86117
labels := conf.attrs
87118
idleAttrs := append(labels, attribute.String("state", "idle"))
88119
usedAttrs := append(labels, attribute.String("state", "used"))
@@ -92,59 +123,59 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
92123
metric.WithDescription("The maximum number of idle open connections allowed"),
93124
)
94125
if err != nil {
95-
return err
126+
return nil, err
96127
}
97128

98129
idleMin, err := conf.meter.Int64ObservableUpDownCounter(
99130
"db.client.connections.idle.min",
100131
metric.WithDescription("The minimum number of idle open connections allowed"),
101132
)
102133
if err != nil {
103-
return err
134+
return nil, err
104135
}
105136

106137
connsMax, err := conf.meter.Int64ObservableUpDownCounter(
107138
"db.client.connections.max",
108139
metric.WithDescription("The maximum number of open connections allowed"),
109140
)
110141
if err != nil {
111-
return err
142+
return nil, err
112143
}
113144

114145
usage, err := conf.meter.Int64ObservableUpDownCounter(
115146
"db.client.connections.usage",
116147
metric.WithDescription("The number of connections that are currently in state described by the state attribute"),
117148
)
118149
if err != nil {
119-
return err
150+
return nil, err
120151
}
121152

122153
timeouts, err := conf.meter.Int64ObservableUpDownCounter(
123154
"db.client.connections.timeouts",
124155
metric.WithDescription("The number of connection timeouts that have occurred trying to obtain a connection from the pool"),
125156
)
126157
if err != nil {
127-
return err
158+
return nil, err
128159
}
129160

130161
hits, err := conf.meter.Int64ObservableUpDownCounter(
131162
"db.client.connections.hits",
132163
metric.WithDescription("The number of times free connection was found in the pool"),
133164
)
134165
if err != nil {
135-
return err
166+
return nil, err
136167
}
137168

138169
misses, err := conf.meter.Int64ObservableUpDownCounter(
139170
"db.client.connections.misses",
140171
metric.WithDescription("The number of times free connection was not found in the pool"),
141172
)
142173
if err != nil {
143-
return err
174+
return nil, err
144175
}
145176

146177
redisConf := rdb.Options()
147-
_, err = conf.meter.RegisterCallback(
178+
return conf.meter.RegisterCallback(
148179
func(ctx context.Context, o metric.Observer) error {
149180
stats := rdb.PoolStats()
150181

@@ -168,8 +199,6 @@ func reportPoolStats(rdb *redis.Client, conf *config) error {
168199
hits,
169200
misses,
170201
)
171-
172-
return err
173202
}
174203

175204
func addMetricsHook(rdb *redis.Client, conf *config) error {

0 commit comments

Comments
 (0)