Skip to content

Commit 0ef6d07

Browse files
ndyakovCopilothtemelski-redis
authored
feat: RESP3 notifications support & Hitless notifications handling [CAE-1088] & [CAE-1072] (#3418)
- Adds support for handling push notifications with RESP3. - Using this support adds handlers for hitless upgrades. --------- Co-authored-by: Copilot <[email protected]> Co-authored-by: Hristo Temelski <[email protected]>
1 parent 2da6ca0 commit 0ef6d07

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+11666
-594
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ coverage.txt
99
**/coverage.txt
1010
.vscode
1111
tmp/*
12+
13+
# Hitless upgrade documentation (temporary)
14+
hitless/docs/

adapters.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net"
7+
"time"
8+
9+
"github.com/redis/go-redis/v9/internal/interfaces"
10+
"github.com/redis/go-redis/v9/push"
11+
)
12+
13+
// ErrInvalidCommand is returned when an invalid command is passed to ExecuteCommand.
14+
var ErrInvalidCommand = errors.New("invalid command type")
15+
16+
// ErrInvalidPool is returned when the pool type is not supported.
17+
var ErrInvalidPool = errors.New("invalid pool type")
18+
19+
// newClientAdapter creates a new client adapter for regular Redis clients.
20+
func newClientAdapter(client *baseClient) interfaces.ClientInterface {
21+
return &clientAdapter{client: client}
22+
}
23+
24+
// clientAdapter adapts a Redis client to implement interfaces.ClientInterface.
25+
type clientAdapter struct {
26+
client *baseClient
27+
}
28+
29+
// GetOptions returns the client options.
30+
func (ca *clientAdapter) GetOptions() interfaces.OptionsInterface {
31+
return &optionsAdapter{options: ca.client.opt}
32+
}
33+
34+
// GetPushProcessor returns the client's push notification processor.
35+
func (ca *clientAdapter) GetPushProcessor() interfaces.NotificationProcessor {
36+
return &pushProcessorAdapter{processor: ca.client.pushProcessor}
37+
}
38+
39+
// optionsAdapter adapts Redis options to implement interfaces.OptionsInterface.
40+
type optionsAdapter struct {
41+
options *Options
42+
}
43+
44+
// GetReadTimeout returns the read timeout.
45+
func (oa *optionsAdapter) GetReadTimeout() time.Duration {
46+
return oa.options.ReadTimeout
47+
}
48+
49+
// GetWriteTimeout returns the write timeout.
50+
func (oa *optionsAdapter) GetWriteTimeout() time.Duration {
51+
return oa.options.WriteTimeout
52+
}
53+
54+
// GetNetwork returns the network type.
55+
func (oa *optionsAdapter) GetNetwork() string {
56+
return oa.options.Network
57+
}
58+
59+
// GetAddr returns the connection address.
60+
func (oa *optionsAdapter) GetAddr() string {
61+
return oa.options.Addr
62+
}
63+
64+
// IsTLSEnabled returns true if TLS is enabled.
65+
func (oa *optionsAdapter) IsTLSEnabled() bool {
66+
return oa.options.TLSConfig != nil
67+
}
68+
69+
// GetProtocol returns the protocol version.
70+
func (oa *optionsAdapter) GetProtocol() int {
71+
return oa.options.Protocol
72+
}
73+
74+
// GetPoolSize returns the connection pool size.
75+
func (oa *optionsAdapter) GetPoolSize() int {
76+
return oa.options.PoolSize
77+
}
78+
79+
// NewDialer returns a new dialer function for the connection.
80+
func (oa *optionsAdapter) NewDialer() func(context.Context) (net.Conn, error) {
81+
baseDialer := oa.options.NewDialer()
82+
return func(ctx context.Context) (net.Conn, error) {
83+
// Extract network and address from the options
84+
network := oa.options.Network
85+
addr := oa.options.Addr
86+
return baseDialer(ctx, network, addr)
87+
}
88+
}
89+
90+
// pushProcessorAdapter adapts a push.NotificationProcessor to implement interfaces.NotificationProcessor.
91+
type pushProcessorAdapter struct {
92+
processor push.NotificationProcessor
93+
}
94+
95+
// RegisterHandler registers a handler for a specific push notification name.
96+
func (ppa *pushProcessorAdapter) RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error {
97+
if pushHandler, ok := handler.(push.NotificationHandler); ok {
98+
return ppa.processor.RegisterHandler(pushNotificationName, pushHandler, protected)
99+
}
100+
return errors.New("handler must implement push.NotificationHandler")
101+
}
102+
103+
// UnregisterHandler removes a handler for a specific push notification name.
104+
func (ppa *pushProcessorAdapter) UnregisterHandler(pushNotificationName string) error {
105+
return ppa.processor.UnregisterHandler(pushNotificationName)
106+
}
107+
108+
// GetHandler returns the handler for a specific push notification name.
109+
func (ppa *pushProcessorAdapter) GetHandler(pushNotificationName string) interface{} {
110+
return ppa.processor.GetHandler(pushNotificationName)
111+
}

0 commit comments

Comments
 (0)