Skip to content

Commit 478875d

Browse files
committed
refactor processor
1 parent a1461e9 commit 478875d

14 files changed

+696
-470
lines changed

async_handoff_integration_test.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,20 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
4141
}
4242

4343
// Create processor with event-driven handoff support
44-
processor := hitless.NewRedisConnectionProcessor(3, baseDialer, nil, nil)
44+
processor := hitless.NewPoolHook(3, baseDialer, nil, nil)
4545
defer processor.Shutdown(context.Background())
4646

47-
// Create a test pool with the processor
47+
// Create a test pool with hooks
48+
hookManager := pool.NewPoolHookManager()
49+
hookManager.AddHook(processor)
50+
4851
testPool := pool.NewConnPool(&pool.Options{
4952
Dialer: func(ctx context.Context) (net.Conn, error) {
5053
return &mockNetConn{addr: "original:6379"}, nil
5154
},
52-
ConnectionProcessor: processor,
53-
PoolSize: 5,
54-
PoolTimeout: time.Second,
55+
PoolHooks: hookManager,
56+
PoolSize: 5,
57+
PoolTimeout: time.Second,
5558
})
5659
defer testPool.Close()
5760

@@ -128,16 +131,20 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
128131
return &mockNetConn{addr: addr}, nil
129132
}
130133

131-
processor := hitless.NewRedisConnectionProcessor(3, baseDialer, nil, nil)
134+
processor := hitless.NewPoolHook(3, baseDialer, nil, nil)
132135
defer processor.Shutdown(context.Background())
133136

137+
// Create hooks manager and add processor as hook
138+
hookManager := pool.NewPoolHookManager()
139+
hookManager.AddHook(processor)
140+
134141
testPool := pool.NewConnPool(&pool.Options{
135142
Dialer: func(ctx context.Context) (net.Conn, error) {
136143
return &mockNetConn{addr: "original:6379"}, nil
137144
},
138-
ConnectionProcessor: processor,
139-
PoolSize: 10,
140-
PoolTimeout: time.Second,
145+
PoolHooks: hookManager,
146+
PoolSize: 10,
147+
PoolTimeout: time.Second,
141148
})
142149
defer testPool.Close()
143150

@@ -193,16 +200,20 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
193200
return nil, &net.OpError{Op: "dial", Err: &net.DNSError{Name: addr}}
194201
}
195202

196-
processor := hitless.NewRedisConnectionProcessor(3, failingDialer, nil, nil)
203+
processor := hitless.NewPoolHook(3, failingDialer, nil, nil)
197204
defer processor.Shutdown(context.Background())
198205

206+
// Create hooks manager and add processor as hook
207+
hookManager := pool.NewPoolHookManager()
208+
hookManager.AddHook(processor)
209+
199210
testPool := pool.NewConnPool(&pool.Options{
200211
Dialer: func(ctx context.Context) (net.Conn, error) {
201212
return &mockNetConn{addr: "original:6379"}, nil
202213
},
203-
ConnectionProcessor: processor,
204-
PoolSize: 3,
205-
PoolTimeout: time.Second,
214+
PoolHooks: hookManager,
215+
PoolSize: 3,
216+
PoolTimeout: time.Second,
206217
})
207218
defer testPool.Close()
208219

@@ -249,15 +260,19 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
249260
return &mockNetConn{addr: addr}, nil
250261
}
251262

252-
processor := hitless.NewRedisConnectionProcessor(3, slowDialer, nil, nil)
263+
processor := hitless.NewPoolHook(3, slowDialer, nil, nil)
264+
265+
// Create hooks manager and add processor as hook
266+
hookManager := pool.NewPoolHookManager()
267+
hookManager.AddHook(processor)
253268

254269
testPool := pool.NewConnPool(&pool.Options{
255270
Dialer: func(ctx context.Context) (net.Conn, error) {
256271
return &mockNetConn{addr: "original:6379"}, nil
257272
},
258-
ConnectionProcessor: processor,
259-
PoolSize: 2,
260-
PoolTimeout: time.Second,
273+
PoolHooks: hookManager,
274+
PoolSize: 2,
275+
PoolTimeout: time.Second,
261276
})
262277
defer testPool.Close()
263278

hitless/config_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func TestProcessorWithConfig(t *testing.T) {
249249
return &mockNetConn{addr: addr}, nil
250250
}
251251

252-
processor := NewRedisConnectionProcessor(3, baseDialer, config, nil)
252+
processor := NewPoolHook(3, baseDialer, config, nil)
253253
defer processor.Shutdown(context.Background())
254254

255255
// The processor should be created successfully with custom config
@@ -269,7 +269,7 @@ func TestProcessorWithConfig(t *testing.T) {
269269
return &mockNetConn{addr: addr}, nil
270270
}
271271

272-
processor := NewRedisConnectionProcessor(3, baseDialer, config, nil)
272+
processor := NewPoolHook(3, baseDialer, config, nil)
273273
defer processor.Shutdown(context.Background())
274274

275275
// Should work with partial config (defaults applied)
@@ -283,7 +283,7 @@ func TestProcessorWithConfig(t *testing.T) {
283283
return &mockNetConn{addr: addr}, nil
284284
}
285285

286-
processor := NewRedisConnectionProcessor(3, baseDialer, nil, nil)
286+
processor := NewPoolHook(3, baseDialer, nil, nil)
287287
defer processor.Shutdown(context.Background())
288288

289289
// Should use default config when nil is passed
@@ -308,7 +308,7 @@ func TestIntegrationWithApplyDefaults(t *testing.T) {
308308
}
309309

310310
// Create processor - should apply defaults to missing fields
311-
processor := NewRedisConnectionProcessor(3, baseDialer, partialConfig, nil)
311+
processor := NewPoolHook(3, baseDialer, partialConfig, nil)
312312
defer processor.Shutdown(context.Background())
313313

314314
// Processor should be created successfully

hitless/hitless_manager.go

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -98,27 +98,9 @@ func NewHitlessManager(client interfaces.ClientInterface, config *Config) (*Hitl
9898
return hm, nil
9999
}
100100

101-
// AddHook adds a notification hook to the manager.
102-
// Hooks are called in the order they were added.
103-
func (hm *HitlessManager) AddHook(hook NotificationHook) {
104-
hm.hooksMu.Lock()
105-
defer hm.hooksMu.Unlock()
106-
hm.hooks = append(hm.hooks, hook)
107-
}
108-
109-
// RemoveHook removes a notification hook from the manager.
110-
func (hm *HitlessManager) RemoveHook(hook NotificationHook) {
111-
hm.hooksMu.Lock()
112-
defer hm.hooksMu.Unlock()
113-
114-
for i, h := range hm.hooks {
115-
if h == hook {
116-
// Remove hook by swapping with last element and truncating
117-
hm.hooks[i] = hm.hooks[len(hm.hooks)-1]
118-
hm.hooks = hm.hooks[:len(hm.hooks)-1]
119-
break
120-
}
121-
}
101+
// CreatePoolHook creates a pool hook with a custom dialer.
102+
func (hm *HitlessManager) CreatePoolHook(baseDialer func(context.Context, string, string) (net.Conn, error)) *PoolHook {
103+
return hm.createPoolHook(baseDialer)
122104
}
123105

124106
// setupPushNotifications sets up push notification handling by registering with the client's processor.
@@ -246,11 +228,6 @@ func (hm *HitlessManager) GetState() State {
246228
return StateIdle
247229
}
248230

249-
// GetConfig returns the hitless manager configuration.
250-
func (hm *HitlessManager) GetConfig() *Config {
251-
return hm.config.Clone()
252-
}
253-
254231
// processPreHooks calls all pre-hooks and returns the modified notification and whether to continue processing.
255232
func (hm *HitlessManager) processPreHooks(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
256233
hm.hooksMu.RLock()
@@ -320,16 +297,15 @@ func (fh *FilterHook) PostHook(ctx context.Context, notificationType string, not
320297
// No post-processing needed for filter hook
321298
}
322299

323-
// CreateConnectionProcessor creates a connection processor with this manager already set.
324-
// Returns the processor as the shared interface type.
325-
func (hm *HitlessManager) CreateConnectionProcessor(protocol int, baseDialer func(context.Context, string, string) (net.Conn, error)) *RedisConnectionProcessor {
300+
// createPoolHook creates a pool hook with this manager already set.
301+
func (hm *HitlessManager) createPoolHook(baseDialer func(context.Context, string, string) (net.Conn, error)) *PoolHook {
326302
// Get pool size from client options for better worker defaults
327303
poolSize := 0
328304
if hm.options != nil {
329305
poolSize = hm.options.GetPoolSize()
330306
}
331307

332-
processor := NewRedisConnectionProcessorWithPoolSize(protocol, baseDialer, hm.config, hm, poolSize)
308+
hook := NewPoolHookWithPoolSize(baseDialer, hm.config, hm, poolSize)
333309

334-
return processor
310+
return hook
335311
}

hitless/hitless_manager_test.go

Lines changed: 0 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package hitless
33
import (
44
"context"
55
"net"
6-
"sync/atomic"
76
"testing"
87
"time"
98

@@ -71,32 +70,6 @@ func (mo *MockOptions) NewDialer() func(context.Context) (net.Conn, error) {
7170
}
7271
}
7372

74-
// TestHook for testing hook functionality
75-
type TestHook struct {
76-
PreHookCalled atomic.Int32
77-
PostHookCalled atomic.Int32
78-
ModifyNotification bool
79-
BlockProcessing bool
80-
}
81-
82-
func (th *TestHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
83-
th.PreHookCalled.Add(1)
84-
85-
if th.ModifyNotification && notificationType == NotificationMoving && len(notification) > 3 {
86-
// Modify the endpoint in the notification
87-
modifiedNotification := make([]interface{}, len(notification))
88-
copy(modifiedNotification, notification)
89-
modifiedNotification[3] = "modified-endpoint:6379"
90-
return modifiedNotification, !th.BlockProcessing
91-
}
92-
93-
return notification, !th.BlockProcessing
94-
}
95-
96-
func (th *TestHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
97-
th.PostHookCalled.Add(1)
98-
}
99-
10073
func TestHitlessManagerRefactoring(t *testing.T) {
10174
t.Run("AtomicStateTracking", func(t *testing.T) {
10275
config := DefaultConfig()
@@ -201,73 +174,6 @@ func TestHitlessManagerRefactoring(t *testing.T) {
201174
}
202175
})
203176

204-
t.Run("HookFunctionality", func(t *testing.T) {
205-
config := DefaultConfig()
206-
client := &MockClient{options: &MockOptions{}}
207-
208-
manager, err := NewHitlessManager(client, config)
209-
if err != nil {
210-
t.Fatalf("Failed to create hitless manager: %v", err)
211-
}
212-
defer manager.Close()
213-
214-
// Create test hooks
215-
hook1 := &TestHook{}
216-
hook2 := &TestHook{ModifyNotification: true}
217-
hook3 := &TestHook{BlockProcessing: true}
218-
219-
// Add hooks
220-
manager.AddHook(hook1)
221-
manager.AddHook(hook2)
222-
223-
ctx := context.Background()
224-
notification := []interface{}{NotificationMoving, "12345", "30", "original-endpoint:6379"}
225-
226-
// Test pre-hooks
227-
modifiedNotification, shouldContinue := manager.processPreHooks(ctx, NotificationMoving, notification)
228-
if !shouldContinue {
229-
t.Error("Expected processing to continue")
230-
}
231-
232-
if hook1.PreHookCalled.Load() != 1 {
233-
t.Errorf("Expected hook1 PreHook to be called once, got %d", hook1.PreHookCalled.Load())
234-
}
235-
236-
if hook2.PreHookCalled.Load() != 1 {
237-
t.Errorf("Expected hook2 PreHook to be called once, got %d", hook2.PreHookCalled.Load())
238-
}
239-
240-
// Check if notification was modified by hook2
241-
if len(modifiedNotification) < 4 || modifiedNotification[3] != "modified-endpoint:6379" {
242-
t.Errorf("Expected notification to be modified, got %v", modifiedNotification)
243-
}
244-
245-
// Test post-hooks
246-
manager.processPostHooks(ctx, NotificationMoving, modifiedNotification, nil)
247-
248-
if hook1.PostHookCalled.Load() != 1 {
249-
t.Errorf("Expected hook1 PostHook to be called once, got %d", hook1.PostHookCalled.Load())
250-
}
251-
252-
if hook2.PostHookCalled.Load() != 1 {
253-
t.Errorf("Expected hook2 PostHook to be called once, got %d", hook2.PostHookCalled.Load())
254-
}
255-
256-
// Test blocking hook
257-
manager.AddHook(hook3)
258-
_, shouldContinue = manager.processPreHooks(ctx, NotificationMoving, notification)
259-
if shouldContinue {
260-
t.Error("Expected processing to be blocked by hook3")
261-
}
262-
263-
// Test hook removal
264-
manager.RemoveHook(hook3)
265-
_, shouldContinue = manager.processPreHooks(ctx, NotificationMoving, notification)
266-
if !shouldContinue {
267-
t.Error("Expected processing to continue after removing blocking hook")
268-
}
269-
})
270-
271177
t.Run("DuplicateOperationHandling", func(t *testing.T) {
272178
config := DefaultConfig()
273179
client := &MockClient{options: &MockOptions{}}

0 commit comments

Comments
 (0)