Skip to content

Commit 6fb87ca

Browse files
committed
wip
1 parent a8ef7e9 commit 6fb87ca

File tree

9 files changed

+116
-100
lines changed

9 files changed

+116
-100
lines changed

adapters.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ func (oa *optionsAdapter) GetWriteTimeout() time.Duration {
5252
return oa.options.WriteTimeout
5353
}
5454

55+
// GetNetwork returns the network type.
56+
func (oa *optionsAdapter) GetNetwork() string {
57+
return oa.options.Network
58+
}
59+
5560
// GetAddr returns the connection address.
5661
func (oa *optionsAdapter) GetAddr() string {
5762
return oa.options.Addr

async_handoff_integration_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
4141
}
4242

4343
// Create processor with event-driven handoff support
44-
processor := hitless.NewPoolHook(baseDialer, nil, nil)
44+
processor := hitless.NewPoolHook(baseDialer, "tcp", nil, nil)
4545
defer processor.Shutdown(context.Background())
4646

4747
// Create a test pool with hooks
@@ -133,7 +133,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
133133
return &mockNetConn{addr: addr}, nil
134134
}
135135

136-
processor := hitless.NewPoolHook(baseDialer, nil, nil)
136+
processor := hitless.NewPoolHook(baseDialer, "tcp", nil, nil)
137137
defer processor.Shutdown(context.Background())
138138

139139
// Create hooks manager and add processor as hook
@@ -205,7 +205,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
205205
return nil, &net.OpError{Op: "dial", Err: &net.DNSError{Name: addr}}
206206
}
207207

208-
processor := hitless.NewPoolHook(failingDialer, nil, nil)
208+
processor := hitless.NewPoolHook(failingDialer, "tcp", nil, nil)
209209
defer processor.Shutdown(context.Background())
210210

211211
// Create hooks manager and add processor as hook
@@ -268,7 +268,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
268268
return &mockNetConn{addr: addr}, nil
269269
}
270270

271-
processor := hitless.NewPoolHook(slowDialer, nil, nil)
271+
processor := hitless.NewPoolHook(slowDialer, "tcp", nil, nil)
272272

273273
// Create hooks manager and add processor as hook
274274
hookManager := pool.NewPoolHookManager()

hitless/config_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func TestProcessorWithConfig(t *testing.T) {
249249
return &mockNetConn{addr: addr}, nil
250250
}
251251

252-
processor := NewPoolHook(baseDialer, config, nil)
252+
processor := NewPoolHook(baseDialer, "tcp", config, nil)
253253
defer processor.Shutdown(context.Background())
254254

255255
// The processor should be created successfully with custom config
@@ -269,7 +269,7 @@ func TestProcessorWithConfig(t *testing.T) {
269269
return &mockNetConn{addr: addr}, nil
270270
}
271271

272-
processor := NewPoolHook(baseDialer, config, nil)
272+
processor := NewPoolHook(baseDialer, "tcp", config, nil)
273273
defer processor.Shutdown(context.Background())
274274

275275
// Should work with partial config (defaults applied)
@@ -283,7 +283,7 @@ func TestProcessorWithConfig(t *testing.T) {
283283
return &mockNetConn{addr: addr}, nil
284284
}
285285

286-
processor := NewPoolHook(baseDialer, nil, nil)
286+
processor := NewPoolHook(baseDialer, "tcp", nil, nil)
287287
defer processor.Shutdown(context.Background())
288288

289289
// Should use default config when nil is passed
@@ -308,7 +308,7 @@ func TestIntegrationWithApplyDefaults(t *testing.T) {
308308
}
309309

310310
// Create processor - should apply defaults to missing fields
311-
processor := NewPoolHook(baseDialer, partialConfig, nil)
311+
processor := NewPoolHook(baseDialer, "tcp", partialConfig, nil)
312312
defer processor.Shutdown(context.Background())
313313

314314
// Processor should be created successfully

hitless/hitless_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ func (hm *HitlessManager) createPoolHook(baseDialer func(context.Context, string
286286
poolSize = hm.options.GetPoolSize()
287287
}
288288

289-
hm.poolHooksRef = NewPoolHookWithPoolSize(baseDialer, hm.config, hm, poolSize)
289+
hm.poolHooksRef = NewPoolHookWithPoolSize(baseDialer, hm.options.GetNetwork(), hm.config, hm, poolSize)
290290
hm.poolHooksRef.SetPool(hm.pool)
291291

292292
return hm.poolHooksRef

hitless/hitless_manager_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ func (mo *MockOptions) GetPoolSize() int {
6464
return 10
6565
}
6666

67+
func (mo *MockOptions) GetNetwork() string {
68+
return "tcp"
69+
}
70+
6771
func (mo *MockOptions) NewDialer() func(context.Context) (net.Conn, error) {
6872
return func(ctx context.Context) (net.Conn, error) {
6973
return nil, nil

hitless/pool_hook.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
// HitlessManagerInterface defines the interface for completing handoff operations
1616
type HitlessManagerInterface interface {
17+
TrackMovingOperationWithConnID(ctx context.Context, newEndpoint string, deadline time.Time, seqID int64, connID uint64) error
1718
UntrackOperationWithConnID(seqID int64, connID uint64)
1819
}
1920

@@ -31,8 +32,12 @@ type HandoffRequest struct {
3132
// with hitless upgrade support.
3233
type PoolHook struct {
3334
// Base dialer for creating connections to new endpoints during handoffs
35+
// args are network and address
3436
baseDialer func(context.Context, string, string) (net.Conn, error)
3537

38+
// Network type (e.g., "tcp", "unix")
39+
network string
40+
3641
// Event-driven handoff support
3742
handoffQueue chan HandoffRequest // Queue for handoff requests
3843
shutdown chan struct{} // Shutdown signal
@@ -55,7 +60,7 @@ type PoolHook struct {
5560
// Simple state tracking
5661
pending sync.Map // map[uint64]int64 (connID -> seqID)
5762

58-
// Configuration for the processor
63+
// Configuration for the hitless upgrade
5964
config *Config
6065

6166
// Hitless manager for operation completion tracking
@@ -66,19 +71,19 @@ type PoolHook struct {
6671
}
6772

6873
// NewPoolHook creates a new pool hook
69-
func NewPoolHook(baseDialer func(context.Context, string, string) (net.Conn, error), config *Config, hitlessManager HitlessManagerInterface) *PoolHook {
70-
return NewPoolHookWithPoolSize(baseDialer, config, hitlessManager, 0)
74+
func NewPoolHook(baseDialer func(context.Context, string, string) (net.Conn, error), network string, config *Config, hitlessManager HitlessManagerInterface) *PoolHook {
75+
return NewPoolHookWithPoolSize(baseDialer, network, config, hitlessManager, 0)
7176
}
7277

7378
// NewPoolHookWithPoolSize creates a new pool hook with pool size for better worker defaults
74-
func NewPoolHookWithPoolSize(baseDialer func(context.Context, string, string) (net.Conn, error), config *Config, hitlessManager HitlessManagerInterface, poolSize int) *PoolHook {
79+
func NewPoolHookWithPoolSize(baseDialer func(context.Context, string, string) (net.Conn, error), network string, config *Config, hitlessManager HitlessManagerInterface, poolSize int) *PoolHook {
7580
// Apply defaults to any missing configuration fields, using pool size for worker calculations
7681
config = config.ApplyDefaultsWithPoolSize(poolSize)
7782

7883
ph := &PoolHook{
7984
// baseDialer is used to create connections to new endpoints during handoffs
8085
baseDialer: baseDialer,
81-
// Note: CLIENT MAINT_NOTIFICATIONS is handled during client initialization
86+
network: network,
8287
// handoffQueue is a buffered channel for queuing handoff requests
8388
handoffQueue: make(chan HandoffRequest, config.HandoffQueueSize),
8489
// shutdown is a channel for signaling shutdown
@@ -551,7 +556,7 @@ func (ph *PoolHook) createEndpointDialer(endpoint string) func(context.Context)
551556
}
552557

553558
// Use the base dialer to connect to the new endpoint
554-
return ph.baseDialer(ctx, "tcp", net.JoinHostPort(host, port))
559+
return ph.baseDialer(ctx, ph.network, net.JoinHostPort(host, port))
555560
}
556561
}
557562

0 commit comments

Comments
 (0)