Skip to content

[WIP] hitless #3447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 33 commits into
base: ndyakov/CAE-1088-resp3-notification-handlers
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
47bb58d
wip
ndyakov Aug 12, 2025
a1461e9
refactor manager
ndyakov Aug 14, 2025
478875d
refactor processor
ndyakov Aug 14, 2025
49e7df6
wip
ndyakov Aug 14, 2025
e3b7052
wip
ndyakov Aug 15, 2025
2f9f85a
wip
ndyakov Aug 15, 2025
a8ef7e9
wip
ndyakov Aug 15, 2025
6fb87ca
wip
ndyakov Aug 15, 2025
01844e6
wip
ndyakov Aug 15, 2025
e2d6be8
fix linter
ndyakov Aug 15, 2025
49a2e43
deadlock?
ndyakov Aug 15, 2025
3c40e22
deadlock?
ndyakov Aug 15, 2025
02c2447
wip
ndyakov Aug 15, 2025
30e5c82
wip review
ndyakov Aug 15, 2025
76bb698
still hunting this deadlock
ndyakov Aug 15, 2025
69569ea
still hunting this deadlock
ndyakov Aug 15, 2025
668f578
still hunting this deadlock
ndyakov Aug 15, 2025
c0d63fa
remove in separate goroutine
ndyakov Aug 15, 2025
b80f6d5
one more additional check for shutdown
ndyakov Aug 15, 2025
34edec2
what if those conns are tracked and closed on Close?
ndyakov Aug 15, 2025
a4758c2
add debug and make sure workers are not started after shutdown
ndyakov Aug 15, 2025
a4090d4
on demand workers
ndyakov Aug 15, 2025
0091c74
pubsub pool still outperforms track/untrack
ndyakov Aug 15, 2025
d3c3eb7
pubsub fixes
ndyakov Aug 16, 2025
ca73e4b
wip
ndyakov Aug 16, 2025
06ea120
wip
ndyakov Aug 16, 2025
374eb2d
wip
ndyakov Aug 16, 2025
08d746c
safe close of pools
ndyakov Aug 16, 2025
b35380a
safe creation on closed pool
ndyakov Aug 16, 2025
49f3f0d
fix error
ndyakov Aug 16, 2025
fedff39
potential data race?
ndyakov Aug 16, 2025
a938edb
clone the config
ndyakov Aug 16, 2025
8e73688
update example tests
ndyakov Aug 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ coverage.txt
**/coverage.txt
.vscode
tmp/*

# Hitless upgrade documentation (temporary)
hitless/docs/
149 changes: 149 additions & 0 deletions adapters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package redis

import (
"context"
"errors"
"net"
"time"

"github.com/redis/go-redis/v9/internal/interfaces"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/push"
)

// ErrInvalidCommand is returned when an invalid command is passed to ExecuteCommand.
var ErrInvalidCommand = errors.New("invalid command type")

// ErrInvalidPool is returned when the pool type is not supported.
var ErrInvalidPool = errors.New("invalid pool type")

// newClientAdapter creates a new client adapter for regular Redis clients.
func newClientAdapter(client *baseClient) interfaces.ClientInterface {
return &clientAdapter{client: client}
}

// clientAdapter adapts a Redis client to implement interfaces.ClientInterface.
type clientAdapter struct {
client *baseClient
}

// GetOptions returns the client options.
func (ca *clientAdapter) GetOptions() interfaces.OptionsInterface {
return &optionsAdapter{options: ca.client.opt}
}

// GetPushProcessor returns the client's push notification processor.
func (ca *clientAdapter) GetPushProcessor() interfaces.NotificationProcessor {
return &pushProcessorAdapter{processor: ca.client.pushProcessor}
}

// optionsAdapter adapts Redis options to implement interfaces.OptionsInterface.
type optionsAdapter struct {
options *Options
}

// GetReadTimeout returns the read timeout.
func (oa *optionsAdapter) GetReadTimeout() time.Duration {
return oa.options.ReadTimeout
}

// GetWriteTimeout returns the write timeout.
func (oa *optionsAdapter) GetWriteTimeout() time.Duration {
return oa.options.WriteTimeout
}

// GetNetwork returns the network type.
func (oa *optionsAdapter) GetNetwork() string {
return oa.options.Network
}

// GetAddr returns the connection address.
func (oa *optionsAdapter) GetAddr() string {
return oa.options.Addr
}

// IsTLSEnabled returns true if TLS is enabled.
func (oa *optionsAdapter) IsTLSEnabled() bool {
return oa.options.TLSConfig != nil
}

// GetProtocol returns the protocol version.
func (oa *optionsAdapter) GetProtocol() int {
return oa.options.Protocol
}

// GetPoolSize returns the connection pool size.
func (oa *optionsAdapter) GetPoolSize() int {
return oa.options.PoolSize
}

// NewDialer returns a new dialer function for the connection.
func (oa *optionsAdapter) NewDialer() func(context.Context) (net.Conn, error) {
baseDialer := oa.options.NewDialer()
return func(ctx context.Context) (net.Conn, error) {
// Extract network and address from the options
network := oa.options.Network
addr := oa.options.Addr
return baseDialer(ctx, network, addr)
}
}

// connectionAdapter adapts a Redis connection to interfaces.ConnectionWithRelaxedTimeout
type connectionAdapter struct {
conn *pool.Conn
}

// Close closes the connection.
func (ca *connectionAdapter) Close() error {
return ca.conn.Close()
}

// IsUsable returns true if the connection is safe to use for new commands.
func (ca *connectionAdapter) IsUsable() bool {
return ca.conn.IsUsable()
}

// GetPoolConnection returns the underlying pool connection.
func (ca *connectionAdapter) GetPoolConnection() *pool.Conn {
return ca.conn
}

// SetRelaxedTimeout sets relaxed timeouts for this connection during hitless upgrades.
// These timeouts remain active until explicitly cleared.
func (ca *connectionAdapter) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration) {
ca.conn.SetRelaxedTimeout(readTimeout, writeTimeout)
}

// SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline.
// After the deadline, timeouts automatically revert to normal values.
func (ca *connectionAdapter) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time) {
ca.conn.SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout, deadline)
}

// ClearRelaxedTimeout clears relaxed timeouts for this connection.
func (ca *connectionAdapter) ClearRelaxedTimeout() {
ca.conn.ClearRelaxedTimeout()
}

// pushProcessorAdapter adapts a push.NotificationProcessor to implement interfaces.NotificationProcessor.
type pushProcessorAdapter struct {
processor push.NotificationProcessor
}

// RegisterHandler registers a handler for a specific push notification name.
func (ppa *pushProcessorAdapter) RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error {
if pushHandler, ok := handler.(push.NotificationHandler); ok {
return ppa.processor.RegisterHandler(pushNotificationName, pushHandler, protected)
}
return errors.New("handler must implement push.NotificationHandler")
}

// UnregisterHandler removes a handler for a specific push notification name.
func (ppa *pushProcessorAdapter) UnregisterHandler(pushNotificationName string) error {
return ppa.processor.UnregisterHandler(pushNotificationName)
}

// GetHandler returns the handler for a specific push notification name.
func (ppa *pushProcessorAdapter) GetHandler(pushNotificationName string) interface{} {
return ppa.processor.GetHandler(pushNotificationName)
}
Loading
Loading