From 5095edf05ae390bfd2532b105aec377557890c05 Mon Sep 17 00:00:00 2001 From: Ciara Stacke Date: Thu, 28 Aug 2025 11:10:32 +0100 Subject: [PATCH] fix: prevent broadcast deadlock from zombie gRPC connections - Add explicit messenger context cancellation to ensure goroutine cleanup - Use non-blocking channel sends for response synchronization to prevent deadlocks when zombie connections can't process responses - Enhanced cleanup logging for connection tracking and subscriptions Fixes issue where terminated gRPC streams remained subscribed to the broadcaster, causing the deployment broadcast system to hang indefinitely waiting for responses from dead connections. This resulted in NGINX reloads stopping completely and upstream server IP synchronization failures. --- internal/controller/nginx/agent/command.go | 29 +++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/internal/controller/nginx/agent/command.go b/internal/controller/nginx/agent/command.go index 046fb7f313..eb82e88a91 100644 --- a/internal/controller/nginx/agent/command.go +++ b/internal/controller/nginx/agent/command.go @@ -131,6 +131,9 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error return agentgrpc.ErrStatusInvalidConnection } defer cs.connTracker.RemoveConnection(gi.IPAddress) + defer cs.logger.V(1).Info("Cleaned up connection tracking", "ipAddress", gi.IPAddress) + + cs.logger.V(1).Info("Starting subscription for agent", "ipAddress", gi.IPAddress) // wait for the agent to report itself and nginx conn, deployment, err := cs.waitForConnection(ctx, gi) @@ -139,11 +142,15 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error return err } defer deployment.RemovePodStatus(conn.PodName) + defer cs.logger.V(1).Info("Cleaned up pod status", "podName", conn.PodName) cs.logger.Info(fmt.Sprintf("Successfully connected to nginx agent %s", conn.PodName)) msgr := messenger.New(in) - go msgr.Run(ctx) + // Create a cancellable context for the messenger to ensure proper cleanup + messengerCtx, cancelMessenger := context.WithCancel(ctx) + defer cancelMessenger() // Ensure messenger goroutines are canceled on exit + go msgr.Run(messengerCtx) // apply current config before starting event loop if err := cs.setInitialConfig(ctx, deployment, conn, msgr); err != nil { @@ -154,6 +161,7 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error broadcaster := deployment.GetBroadcaster() channels := broadcaster.Subscribe() defer broadcaster.CancelSubscription(channels.ID) + defer cs.logger.V(1).Info("Canceled broadcaster subscription", "subscriptionID", channels.ID) for { // When a message is received over the ListenCh, it is assumed and required that the @@ -165,12 +173,18 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error // which releases the lock. select { case <-ctx.Done(): + // Ensure we signal the response channel when context is canceled select { case channels.ResponseCh <- struct{}{}: default: } return grpcStatus.Error(codes.Canceled, context.Cause(ctx).Error()) case <-cs.resetConnChan: + // Ensure we signal the response channel when connection is reset + select { + case channels.ResponseCh <- struct{}{}: + default: + } return grpcStatus.Error(codes.Unavailable, "TLS files updated") case msg := <-channels.ListenCh: var req *pb.ManagementPlaneRequest @@ -187,7 +201,12 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error if err := msgr.Send(ctx, req); err != nil { cs.logger.Error(err, "error sending request to agent") deployment.SetPodErrorStatus(conn.PodName, err) - channels.ResponseCh <- struct{}{} + // Use non-blocking send to prevent deadlock if broadcaster is waiting for + // responses from zombie connections that can't process the response + select { + case channels.ResponseCh <- struct{}{}: + default: + } return grpcStatus.Error(codes.Internal, err.Error()) } @@ -215,7 +234,11 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error } else { deployment.SetPodErrorStatus(conn.PodName, nil) } - channels.ResponseCh <- struct{}{} + // Ensure we signal the response channel + select { + case channels.ResponseCh <- struct{}{}: + default: + } } } }