Skip to content

Commit e3b7052

Browse files
committed
wip
1 parent 49e7df6 commit e3b7052

File tree

8 files changed

+204
-696
lines changed

8 files changed

+204
-696
lines changed

hitless/README.md

Lines changed: 143 additions & 495 deletions
Large diffs are not rendered by default.

hitless/example_config_usage.go

Lines changed: 0 additions & 72 deletions
This file was deleted.

hitless/example_hooks.go

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package hitless
22

33
import (
44
"context"
5-
"fmt"
65
"strings"
76
"time"
87

@@ -202,62 +201,3 @@ func (vh *ValidationHook) PreHook(ctx context.Context, notificationType string,
202201
func (vh *ValidationHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
203202
// No post-processing needed
204203
}
205-
206-
// ExampleUsage demonstrates how to use the hooks and notification types with HitlessManager.
207-
func ExampleUsage() {
208-
// This is just an example - in real usage, you'd have actual client and config
209-
fmt.Println("Example of using hooks and notification types with HitlessManager:")
210-
fmt.Println()
211-
212-
fmt.Println("1. Using notification type constants:")
213-
fmt.Printf(" MOVING: %s\n", NotificationMoving)
214-
fmt.Printf(" MIGRATING: %s\n", NotificationMigrating)
215-
fmt.Printf(" MIGRATED: %s\n", NotificationMigrated)
216-
fmt.Printf(" FAILING_OVER: %s\n", NotificationFailingOver)
217-
fmt.Printf(" FAILED_OVER: %s\n", NotificationFailedOver)
218-
fmt.Println()
219-
220-
fmt.Println("2. Using notification type sets:")
221-
fmt.Println(" // Register handlers for all notification types")
222-
fmt.Println(" manager.RegisterSelectiveHandlers(AllNotificationTypes())")
223-
fmt.Println()
224-
fmt.Println(" // Register handlers only for MOVING notifications")
225-
fmt.Println(" manager.RegisterSelectiveHandlers(MovingOnlyNotifications())")
226-
fmt.Println()
227-
fmt.Println(" // Register handlers only for migration-related notifications")
228-
fmt.Println(" manager.RegisterSelectiveHandlers(MigrationNotifications())")
229-
fmt.Println()
230-
fmt.Println(" // Register handlers only for failover-related notifications")
231-
fmt.Println(" manager.RegisterSelectiveHandlers(FailoverNotifications())")
232-
fmt.Println()
233-
fmt.Println(" // Register handlers for custom set of notifications")
234-
fmt.Println(" customTypes := NewNotificationTypeSet(NotificationMoving, NotificationMigrated)")
235-
fmt.Println(" manager.RegisterSelectiveHandlers(customTypes)")
236-
fmt.Println()
237-
238-
fmt.Println("3. Create hooks:")
239-
fmt.Println(" metricsHook := NewMetricsHook()")
240-
fmt.Println(" rewriteHook := NewEndpointRewriteHook(map[string]string{")
241-
fmt.Println(" \"old-redis:6379\": \"new-redis:6379\",")
242-
fmt.Println(" })")
243-
fmt.Println(" throttleHook := NewThrottleHook(10) // 10 notifications per second")
244-
fmt.Println(" validationHook := NewValidationHook(true) // strict mode")
245-
fmt.Println()
246-
247-
fmt.Println("4. Add hooks to manager:")
248-
fmt.Println(" manager.AddHook(validationHook) // Validate first")
249-
fmt.Println(" manager.AddHook(throttleHook) // Then throttle")
250-
fmt.Println(" manager.AddHook(rewriteHook) // Then rewrite")
251-
fmt.Println(" manager.AddHook(metricsHook) // Finally collect metrics")
252-
fmt.Println()
253-
254-
fmt.Println("5. Hooks will be called in order for each notification:")
255-
fmt.Println(" - ValidationHook validates the notification")
256-
fmt.Println(" - ThrottleHook may skip processing if rate limit exceeded")
257-
fmt.Println(" - EndpointRewriteHook may modify the endpoint")
258-
fmt.Println(" - MetricsHook collects processing statistics")
259-
fmt.Println()
260-
261-
fmt.Println("6. Remove hooks when no longer needed:")
262-
fmt.Println(" manager.RemoveHook(throttleHook)")
263-
}

hitless/hitless_manager.go

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -271,47 +271,6 @@ func (hm *HitlessManager) processPostHooks(ctx context.Context, notificationType
271271
}
272272
}
273273

274-
// LoggingHook is an example hook implementation that logs all notifications.
275-
type LoggingHook struct {
276-
LogLevel int
277-
}
278-
279-
// PreHook logs the notification before processing and allows modification.
280-
func (lh *LoggingHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
281-
if lh.LogLevel >= 2 { // Info level
282-
internal.Logger.Printf(ctx, "hitless: processing %s notification: %v", notificationType, notification)
283-
}
284-
return notification, true // Continue processing with unmodified notification
285-
}
286-
287-
// PostHook logs the result after processing.
288-
func (lh *LoggingHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
289-
if result != nil && lh.LogLevel >= 1 { // Warning level
290-
internal.Logger.Printf(ctx, "hitless: %s notification processing failed: %v", notificationType, result)
291-
} else if lh.LogLevel >= 3 { // Debug level
292-
internal.Logger.Printf(ctx, "hitless: %s notification processed successfully", notificationType)
293-
}
294-
}
295-
296-
// FilterHook is an example hook that can filter out certain notifications.
297-
type FilterHook struct {
298-
BlockedTypes map[string]bool
299-
}
300-
301-
// PreHook filters notifications based on type.
302-
func (fh *FilterHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
303-
if fh.BlockedTypes[notificationType] {
304-
internal.Logger.Printf(ctx, "hitless: filtering out %s notification", notificationType)
305-
return notification, false // Skip processing
306-
}
307-
return notification, true
308-
}
309-
310-
// PostHook does nothing for filter hook.
311-
func (fh *FilterHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
312-
// No post-processing needed for filter hook
313-
}
314-
315274
// createPoolHook creates a pool hook with this manager already set.
316275
func (hm *HitlessManager) createPoolHook(baseDialer func(context.Context, string, string) (net.Conn, error)) *PoolHook {
317276
if hm.poolHooksRef != nil {

hitless/hooks.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package hitless
2+
3+
import (
4+
"context"
5+
6+
"github.com/redis/go-redis/v9/internal"
7+
)
8+
9+
// LoggingHook is an example hook implementation that logs all notifications.
10+
type LoggingHook struct {
11+
LogLevel int
12+
}
13+
14+
// PreHook logs the notification before processing and allows modification.
15+
func (lh *LoggingHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
16+
if lh.LogLevel >= 2 { // Info level
17+
internal.Logger.Printf(ctx, "hitless: processing %s notification: %v", notificationType, notification)
18+
}
19+
return notification, true // Continue processing with unmodified notification
20+
}
21+
22+
// PostHook logs the result after processing.
23+
func (lh *LoggingHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
24+
if result != nil && lh.LogLevel >= 1 { // Warning level
25+
internal.Logger.Printf(ctx, "hitless: %s notification processing failed: %v", notificationType, result)
26+
} else if lh.LogLevel >= 3 { // Debug level
27+
internal.Logger.Printf(ctx, "hitless: %s notification processed successfully", notificationType)
28+
}
29+
}
30+
31+
// FilterHook is an example hook that can filter out certain notifications.
32+
type FilterHook struct {
33+
BlockedTypes map[string]bool
34+
}
35+
36+
// PreHook filters notifications based on type.
37+
func (fh *FilterHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
38+
if fh.BlockedTypes[notificationType] {
39+
internal.Logger.Printf(ctx, "hitless: filtering out %s notification", notificationType)
40+
return notification, false // Skip processing
41+
}
42+
return notification, true
43+
}
44+
45+
// PostHook does nothing for filter hook.
46+
func (fh *FilterHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
47+
// No post-processing needed for filter hook
48+
}

hitless/pool_hook.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, _ bool) error {
138138
// in a handoff state at the moment.
139139

140140
// Check if connection is usable (not in a handoff state)
141+
// Should not happen since the pool will not return a connection that is not usable.
141142
if !conn.IsUsable() {
142143
return ErrConnectionMarkedForHandoff
143144
}

internal/pool/conn.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ type Conn struct {
4545

4646
Inited bool
4747
pooled bool
48-
pubsub bool
4948
createdAt time.Time
5049
expiresAt time.Time
5150

@@ -453,18 +452,6 @@ func (cn *Conn) IncrementAndGetHandoffRetries(n int) int {
453452
return cn.incrementHandoffRetries(n)
454453
}
455454

456-
// Rd returns the connection's reader for protocol-specific processing
457-
func (cn *Conn) Rd() *proto.Reader {
458-
return cn.rd
459-
}
460-
461-
// Reader returns the connection's proto reader for processing notifications
462-
// Note: This method should be used carefully as it returns the raw reader.
463-
// For thread-safe operations, use HasBufferedData() and PeekReplyTypeSafe().
464-
func (cn *Conn) Reader() *proto.Reader {
465-
return cn.rd
466-
}
467-
468455
// HasBufferedData safely checks if the connection has buffered data.
469456
// This method is used to avoid data races when checking for push notifications.
470457
func (cn *Conn) HasBufferedData() bool {
@@ -607,11 +594,3 @@ func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
607594

608595
return noDeadline
609596
}
610-
611-
func (cn *Conn) IsPubSub() bool {
612-
return cn.pubsub
613-
}
614-
615-
func (cn *Conn) IsPooled() bool {
616-
return cn.pooled
617-
}

internal/pool/pool.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ var (
2424
// ErrPoolTimeout timed out waiting to get a connection from the connection pool.
2525
ErrPoolTimeout = errors.New("redis: connection pool timeout")
2626

27+
popAttempts = 10
28+
getAttempts = 3
2729
minTime = time.Unix(-2208988800, 0) // Jan 1, 1900
2830
maxTime = minTime.Add(1<<63 - 1)
2931
noExpiration = maxTime
@@ -356,14 +358,16 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
356358
return nil, err
357359
}
358360

359-
tries := 0
360361
now := time.Now()
362+
attempts := 0
361363
for {
362-
if tries > 10 {
363-
log.Printf("redis: connection pool: failed to get a connection after %d tries", tries)
364+
365+
if attempts >= getAttempts {
366+
log.Printf("redis: connection pool: failed to get an connection accepted by hook after %d attempts", attempts)
364367
break
365368
}
366-
tries++
369+
attempts++
370+
367371
p.connsMu.Lock()
368372
cn, err = p.popIdle()
369373
p.connsMu.Unlock()
@@ -407,6 +411,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
407411
if p.hookManager != nil {
408412
if err := p.hookManager.ProcessOnGet(ctx, newcn, true); err != nil {
409413
// Failed to process connection, discard it
414+
log.Printf("redis: connection pool: failed to process new connection by hook: %v", err)
410415
_ = p.CloseConn(newcn)
411416
return nil, err
412417
}
@@ -467,9 +472,8 @@ func (p *ConnPool) popIdle() (*Conn, error) {
467472

468473
var cn *Conn
469474
attempts := 0
470-
maxAttempts := len(p.idleConns) + 1 // Prevent infinite loop
471475

472-
for attempts < maxAttempts {
476+
for attempts < popAttempts {
473477
if len(p.idleConns) == 0 {
474478
return nil, nil
475479
}
@@ -503,7 +507,8 @@ func (p *ConnPool) popIdle() (*Conn, error) {
503507
}
504508

505509
// If we exhausted all attempts without finding a usable connection, return nil
506-
if attempts >= maxAttempts {
510+
if attempts >= popAttempts {
511+
log.Printf("redis: connection pool: failed to get an usable connection after %d attempts", popAttempts)
507512
return nil, nil
508513
}
509514

0 commit comments

Comments
 (0)