Skip to content

Commit b80f6d5

Browse files
committed
one more additional check for shutdown
1 parent c0d63fa commit b80f6d5

File tree

3 files changed

+17
-5
lines changed

3 files changed

+17
-5
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ test.ci:
2929
(cd "$${dir}" && \
3030
go mod tidy -compat=1.18 && \
3131
go vet && \
32-
go test -v -coverprofile=coverage.txt -covermode=atomic ./... -race -skip Example); \
32+
go test -v -coverprofile=coverage.txt -covermode=atomic ./... -race -timeout 20m -skip Example); \
3333
done
3434
cd internal/customvet && go build .
3535
go vet -vettool ./internal/customvet/customvet

hitless/pool_hook.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,16 @@ func (ph *PoolHook) handoffWorker() {
327327
return // Exit this worker
328328
}
329329

330-
ph.processHandoffRequest(request)
330+
// Check for shutdown before processing
331+
select {
332+
case <-ph.shutdown:
333+
// Clean up the request before exiting
334+
ph.pending.Delete(request.ConnID)
335+
return
336+
default:
337+
// Continue with processing
338+
ph.processHandoffRequest(request)
339+
}
331340
case <-ph.shutdown:
332341
return
333342
}
@@ -505,7 +514,6 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn *
505514
// Keep the handoff state for retry
506515
return err
507516
}
508-
// Note: CLIENT MAINT_NOTIFICATIONS is sent during client initialization, not per connection
509517
defer func() {
510518
if oldConn != nil {
511519
oldConn.Close()
@@ -542,7 +550,9 @@ func (ph *PoolHook) createEndpointDialer(endpoint string) func(context.Context)
542550
if err != nil {
543551
// If no port specified, assume default Redis port
544552
host = endpoint
545-
port = "6379"
553+
if port == "" {
554+
port = "6379"
555+
}
546556
}
547557

548558
// Use the base dialer to connect to the new endpoint

pubsub.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int
493493
// Receive returns a message as a Subscription, Message, Pong or error.
494494
// See PubSub example for details. This is low-level API and in most cases
495495
// Channel should be used instead.
496+
// This will block until a message is received.
496497
func (c *PubSub) Receive(ctx context.Context) (interface{}, error) {
497498
return c.ReceiveTimeout(ctx, 0)
498499
}
@@ -575,7 +576,8 @@ func (c *PubSub) ChannelWithSubscriptions(opts ...ChannelOption) <-chan interfac
575576
}
576577

577578
func (c *PubSub) processPendingPushNotificationWithReader(ctx context.Context, cn *pool.Conn, rd *proto.Reader) error {
578-
if c.pushProcessor == nil {
579+
// Only process push notifications for RESP3 connections with a processor
580+
if c.opt.Protocol != 3 || c.pushProcessor == nil {
579581
return nil
580582
}
581583

0 commit comments

Comments
 (0)