Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions internal/controller/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
return agentgrpc.ErrStatusInvalidConnection
}
defer cs.connTracker.RemoveConnection(gi.IPAddress)
defer cs.logger.V(1).Info("Cleaned up connection tracking", "ipAddress", gi.IPAddress)

cs.logger.V(1).Info("Starting subscription for agent", "ipAddress", gi.IPAddress)

// wait for the agent to report itself and nginx
conn, deployment, err := cs.waitForConnection(ctx, gi)
Expand All @@ -139,11 +142,15 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
return err
}
defer deployment.RemovePodStatus(conn.PodName)
defer cs.logger.V(1).Info("Cleaned up pod status", "podName", conn.PodName)

cs.logger.Info(fmt.Sprintf("Successfully connected to nginx agent %s", conn.PodName))

msgr := messenger.New(in)
go msgr.Run(ctx)
// Create a cancellable context for the messenger to ensure proper cleanup
messengerCtx, cancelMessenger := context.WithCancel(ctx)
defer cancelMessenger() // Ensure messenger goroutines are canceled on exit
go msgr.Run(messengerCtx)

// apply current config before starting event loop
if err := cs.setInitialConfig(ctx, deployment, conn, msgr); err != nil {
Expand All @@ -154,6 +161,7 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
broadcaster := deployment.GetBroadcaster()
channels := broadcaster.Subscribe()
defer broadcaster.CancelSubscription(channels.ID)
defer cs.logger.V(1).Info("Canceled broadcaster subscription", "subscriptionID", channels.ID)

for {
// When a message is received over the ListenCh, it is assumed and required that the
Expand All @@ -165,12 +173,18 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
// which releases the lock.
select {
case <-ctx.Done():
// Ensure we signal the response channel when context is canceled
select {
case channels.ResponseCh <- struct{}{}:
default:
}
return grpcStatus.Error(codes.Canceled, context.Cause(ctx).Error())
case <-cs.resetConnChan:
// Ensure we signal the response channel when connection is reset
select {
case channels.ResponseCh <- struct{}{}:
default:
}
return grpcStatus.Error(codes.Unavailable, "TLS files updated")
case msg := <-channels.ListenCh:
var req *pb.ManagementPlaneRequest
Expand All @@ -187,7 +201,12 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
if err := msgr.Send(ctx, req); err != nil {
cs.logger.Error(err, "error sending request to agent")
deployment.SetPodErrorStatus(conn.PodName, err)
channels.ResponseCh <- struct{}{}
// Use non-blocking send to prevent deadlock if broadcaster is waiting for
// responses from zombie connections that can't process the response
select {
case channels.ResponseCh <- struct{}{}:
default:
}
Comment on lines +204 to +209
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First thing I want to try to understand is why there's a zombie connection.

Second thing, if this was blocking, it means that the broadcaster isn't reading at all from the channel. The language in the comment confused me a bit.

So if the connection was dropped, but we're still tracking it somehow, how would we get into a state where we've sent a request but the broadcaster isn't waiting for the response?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is a zombie connection occurs when the gRPC stream from an agent is terminated (e.g., network issue, agent crash, or control plane restart), the cleanup logic (removing the connection/subscription) for some reason does not run, but the control plane’s broadcaster still has the subscriber registered, so the broadcaster continues to expect responses from this dead connection. I can see this in the logs - this particular connection is created, but never cleaned up when a new one is spawned, and we even see tons of error messages coming from this particular connection long after it should be dead showing it's still subscribed:

2025-07-25T17:26:25.256355078Z time=2025-07-25T17:26:25.256Z level=ERROR msg="Failed to receive message from subscribe stream" error="rpc error: code = Canceled desc = grpc: the client connection is closing" correlation_id=c8fbba24-6975-11f0-92bb-124e1d2c32aa

Normally, the broadcaster sends a request and waits for a response from each subscriber via their response channel. If the subscriber’s goroutine is dead (e.g., due to a dropped connection), it will never read from the channel. The broadcaster’s send to that channel will block if the channel is unbuffered or full, causing a deadlock. I think initially, the sends succeed until the buffer fills up (since the subscriber isn’t reading). Once full, any further sends will block, causing a deadlock. The non-blocking select prevents this by skipping the send if the channel can’t accept more data

Copy link
Collaborator

@sjberman sjberman Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the term "send" might be a bit conflated here, because we have

broadcaster -send-> subscriber
subscriber -respond-> broadcaster

and your select statement is on the response.

I do think there could definitely be a deadlock on these channels, which would make sense why config isn't sent, and why restarting agent fixes things, because that should close the previous connection and start a new one.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to make sure if that's the case, we unblock in the right place.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, restarting the control plane should also fix it, but apparently it doesn't...

Copy link

@stutommi stutommi Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick comment (sorry if this throws off more than helps!) but in our problem mentioned in the other issue - restarting control plane fixed our problems everytime!

Referring to this - seems like I didn't bring this crucial info there, sorry!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know, thanks!


return grpcStatus.Error(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -215,7 +234,11 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
} else {
deployment.SetPodErrorStatus(conn.PodName, nil)
}
channels.ResponseCh <- struct{}{}
// Ensure we signal the response channel
select {
case channels.ResponseCh <- struct{}{}:
default:
}
}
}
}
Expand Down
Loading