diff --git a/internal/controller/nginx/agent/command.go b/internal/controller/nginx/agent/command.go index 046fb7f313..eb82e88a91 100644 --- a/internal/controller/nginx/agent/command.go +++ b/internal/controller/nginx/agent/command.go @@ -131,6 +131,9 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error return agentgrpc.ErrStatusInvalidConnection } defer cs.connTracker.RemoveConnection(gi.IPAddress) + defer cs.logger.V(1).Info("Cleaned up connection tracking", "ipAddress", gi.IPAddress) + + cs.logger.V(1).Info("Starting subscription for agent", "ipAddress", gi.IPAddress) // wait for the agent to report itself and nginx conn, deployment, err := cs.waitForConnection(ctx, gi) @@ -139,11 +142,15 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error return err } defer deployment.RemovePodStatus(conn.PodName) + defer cs.logger.V(1).Info("Cleaned up pod status", "podName", conn.PodName) cs.logger.Info(fmt.Sprintf("Successfully connected to nginx agent %s", conn.PodName)) msgr := messenger.New(in) - go msgr.Run(ctx) + // Create a cancellable context for the messenger to ensure proper cleanup + messengerCtx, cancelMessenger := context.WithCancel(ctx) + defer cancelMessenger() // Ensure messenger goroutines are canceled on exit + go msgr.Run(messengerCtx) // apply current config before starting event loop if err := cs.setInitialConfig(ctx, deployment, conn, msgr); err != nil { @@ -154,6 +161,7 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error broadcaster := deployment.GetBroadcaster() channels := broadcaster.Subscribe() defer broadcaster.CancelSubscription(channels.ID) + defer cs.logger.V(1).Info("Canceled broadcaster subscription", "subscriptionID", channels.ID) for { // When a message is received over the ListenCh, it is assumed and required that the @@ -165,12 +173,18 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error // which releases the lock. select { case <-ctx.Done(): + // Ensure we signal the response channel when context is canceled select { case channels.ResponseCh <- struct{}{}: default: } return grpcStatus.Error(codes.Canceled, context.Cause(ctx).Error()) case <-cs.resetConnChan: + // Ensure we signal the response channel when connection is reset + select { + case channels.ResponseCh <- struct{}{}: + default: + } return grpcStatus.Error(codes.Unavailable, "TLS files updated") case msg := <-channels.ListenCh: var req *pb.ManagementPlaneRequest @@ -187,7 +201,12 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error if err := msgr.Send(ctx, req); err != nil { cs.logger.Error(err, "error sending request to agent") deployment.SetPodErrorStatus(conn.PodName, err) - channels.ResponseCh <- struct{}{} + // Use non-blocking send to prevent deadlock if broadcaster is waiting for + // responses from zombie connections that can't process the response + select { + case channels.ResponseCh <- struct{}{}: + default: + } return grpcStatus.Error(codes.Internal, err.Error()) } @@ -215,7 +234,11 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error } else { deployment.SetPodErrorStatus(conn.PodName, nil) } - channels.ResponseCh <- struct{}{} + // Ensure we signal the response channel + select { + case channels.ResponseCh <- struct{}{}: + default: + } } } }