Skip to content

Commit 38a102e

Browse files
committed
wip
1 parent 422779b commit 38a102e

35 files changed

+6853
-378
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ coverage.txt
99
**/coverage.txt
1010
.vscode
1111
tmp/*
12+
13+
# Hitless upgrade documentation (temporary)
14+
hitless/docs/

adapters.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net"
7+
"time"
8+
9+
"github.com/redis/go-redis/v9/internal/interfaces"
10+
"github.com/redis/go-redis/v9/internal/pool"
11+
"github.com/redis/go-redis/v9/push"
12+
)
13+
14+
// ErrInvalidCommand is returned when an invalid command is passed to ExecuteCommand.
15+
var ErrInvalidCommand = errors.New("invalid command type")
16+
17+
// ErrInvalidPool is returned when the pool type is not supported.
18+
var ErrInvalidPool = errors.New("invalid pool type")
19+
20+
// NewClientAdapter creates a new client adapter for regular Redis clients.
21+
func NewClientAdapter(client *baseClient) interfaces.ClientInterface {
22+
return &clientAdapter{client: client}
23+
}
24+
25+
// clientAdapter adapts a Redis client to implement interfaces.ClientInterface.
26+
type clientAdapter struct {
27+
client *baseClient
28+
}
29+
30+
// GetOptions returns the client options.
31+
func (ca *clientAdapter) GetOptions() interfaces.OptionsInterface {
32+
return &optionsAdapter{options: ca.client.opt}
33+
}
34+
35+
// GetPushProcessor returns the client's push notification processor.
36+
func (ca *clientAdapter) GetPushProcessor() interfaces.NotificationProcessor {
37+
return &pushProcessorAdapter{processor: ca.client.pushProcessor}
38+
}
39+
40+
// optionsAdapter adapts Redis options to implement interfaces.OptionsInterface.
41+
type optionsAdapter struct {
42+
options *Options
43+
}
44+
45+
// GetReadTimeout returns the read timeout.
46+
func (oa *optionsAdapter) GetReadTimeout() time.Duration {
47+
return oa.options.ReadTimeout
48+
}
49+
50+
// GetWriteTimeout returns the write timeout.
51+
func (oa *optionsAdapter) GetWriteTimeout() time.Duration {
52+
return oa.options.WriteTimeout
53+
}
54+
55+
// GetAddr returns the connection address.
56+
func (oa *optionsAdapter) GetAddr() string {
57+
return oa.options.Addr
58+
}
59+
60+
// IsTLSEnabled returns true if TLS is enabled.
61+
func (oa *optionsAdapter) IsTLSEnabled() bool {
62+
return oa.options.TLSConfig != nil
63+
}
64+
65+
// GetProtocol returns the protocol version.
66+
func (oa *optionsAdapter) GetProtocol() int {
67+
return oa.options.Protocol
68+
}
69+
70+
// GetPoolSize returns the connection pool size.
71+
func (oa *optionsAdapter) GetPoolSize() int {
72+
return oa.options.PoolSize
73+
}
74+
75+
// NewDialer returns a new dialer function for the connection.
76+
func (oa *optionsAdapter) NewDialer() func(context.Context) (net.Conn, error) {
77+
baseDialer := oa.options.NewDialer()
78+
return func(ctx context.Context) (net.Conn, error) {
79+
// Extract network and address from the options
80+
network := "tcp"
81+
addr := oa.options.Addr
82+
return baseDialer(ctx, network, addr)
83+
}
84+
}
85+
86+
// connectionAdapter adapts a Redis connection to interfaces.ConnectionWithRelaxedTimeout
87+
type connectionAdapter struct {
88+
conn *pool.Conn
89+
}
90+
91+
// Close closes the connection.
92+
func (ca *connectionAdapter) Close() error {
93+
return ca.conn.Close()
94+
}
95+
96+
// IsUsable returns true if the connection is safe to use for new commands.
97+
func (ca *connectionAdapter) IsUsable() bool {
98+
return ca.conn.IsUsable()
99+
}
100+
101+
// GetPoolConnection returns the underlying pool connection.
102+
func (ca *connectionAdapter) GetPoolConnection() *pool.Conn {
103+
return ca.conn
104+
}
105+
106+
// SetRelaxedTimeout sets relaxed timeouts for this connection during hitless upgrades.
107+
// These timeouts remain active until explicitly cleared.
108+
func (ca *connectionAdapter) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration) {
109+
ca.conn.SetRelaxedTimeout(readTimeout, writeTimeout)
110+
}
111+
112+
// SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline.
113+
// After the deadline, timeouts automatically revert to normal values.
114+
func (ca *connectionAdapter) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time) {
115+
ca.conn.SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout, deadline)
116+
}
117+
118+
// ClearRelaxedTimeout clears relaxed timeouts for this connection.
119+
func (ca *connectionAdapter) ClearRelaxedTimeout() {
120+
ca.conn.ClearRelaxedTimeout()
121+
}
122+
123+
// pushProcessorAdapter adapts a push.NotificationProcessor to implement interfaces.NotificationProcessor.
124+
type pushProcessorAdapter struct {
125+
processor push.NotificationProcessor
126+
}
127+
128+
// RegisterHandler registers a handler for a specific push notification name.
129+
func (ppa *pushProcessorAdapter) RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error {
130+
if pushHandler, ok := handler.(push.NotificationHandler); ok {
131+
return ppa.processor.RegisterHandler(pushNotificationName, pushHandler, protected)
132+
}
133+
return errors.New("handler must implement push.NotificationHandler")
134+
}
135+
136+
// UnregisterHandler removes a handler for a specific push notification name.
137+
func (ppa *pushProcessorAdapter) UnregisterHandler(pushNotificationName string) error {
138+
return ppa.processor.UnregisterHandler(pushNotificationName)
139+
}
140+
141+
// GetHandler returns the handler for a specific push notification name.
142+
func (ppa *pushProcessorAdapter) GetHandler(pushNotificationName string) interface{} {
143+
return ppa.processor.GetHandler(pushNotificationName)
144+
}

0 commit comments

Comments
 (0)