Skip to content

Commit 24e0cb6

Browse files
authored
Improve connection reset handling during ServiceAccountToken rotation (#3905)
Problem: During ServiceAccountToken rotation, nginx-gateway-fabric was sometimes experiencing deadlocks due to the Subscribe method incorrectly intercepting initial configuration operations after a ServiceAccountToken rotation and incorrect signaling broadcast completion Solution: Implemented a pendingRequest tracking mechanism to distinguish between initial setup operations and broadcast operations.
1 parent 5c3fc1b commit 24e0cb6

File tree

6 files changed

+75
-23
lines changed

6 files changed

+75
-23
lines changed

internal/controller/nginx/agent/agent_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ func TestUpdateConfig(t *testing.T) {
6666
updater.UpdateConfig(deployment, []File{file})
6767

6868
g.Expect(fakeBroadcaster.SendCallCount()).To(Equal(1))
69-
g.Expect(deployment.GetFile(file.Meta.Name, file.Meta.Hash)).To(Equal(file.Contents))
69+
fileContents, _ := deployment.GetFile(file.Meta.Name, file.Meta.Hash)
70+
g.Expect(fileContents).To(Equal(file.Contents))
7071

7172
if test.expErr {
7273
g.Expect(deployment.GetLatestConfigError()).To(Equal(testErr))

internal/controller/nginx/agent/command.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
155155
channels := broadcaster.Subscribe()
156156
defer broadcaster.CancelSubscription(channels.ID)
157157

158+
var pendingBroadcastRequest *broadcast.NginxAgentMessage
159+
158160
for {
159161
// When a message is received over the ListenCh, it is assumed and required that the
160162
// deployment object is already LOCKED. This lock is acquired by the event handler before calling
@@ -191,13 +193,20 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
191193

192194
return grpcStatus.Error(codes.Internal, err.Error())
193195
}
196+
197+
// Track this broadcast request to distinguish it from initial config operations.
198+
// Only broadcast operations should signal ResponseCh for coordination.
199+
pendingBroadcastRequest = &msg
194200
case err = <-msgr.Errors():
195201
cs.logger.Error(err, "connection error", "pod", conn.PodName)
196202
deployment.SetPodErrorStatus(conn.PodName, err)
197203
select {
198204
case channels.ResponseCh <- struct{}{}:
199205
default:
200206
}
207+
if pendingBroadcastRequest != nil {
208+
cs.logger.V(1).Info("Connection error during pending request, operation failed")
209+
}
201210

202211
if errors.Is(err, io.EOF) {
203212
return grpcStatus.Error(codes.Aborted, err.Error())
@@ -215,7 +224,15 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
215224
} else {
216225
deployment.SetPodErrorStatus(conn.PodName, nil)
217226
}
218-
channels.ResponseCh <- struct{}{}
227+
228+
// Signal broadcast completion only for tracked broadcast operations.
229+
// Initial config responses are ignored to prevent spurious success messages.
230+
if pendingBroadcastRequest != nil {
231+
pendingBroadcastRequest = nil
232+
channels.ResponseCh <- struct{}{}
233+
} else {
234+
cs.logger.V(1).Info("Received response for non-broadcast request (likely initial config)", "pod", conn.PodName)
235+
}
219236
}
220237
}
221238
}
@@ -265,6 +282,9 @@ func (cs *commandService) setInitialConfig(
265282
defer deployment.FileLock.Unlock()
266283

267284
fileOverviews, configVersion := deployment.GetFileOverviews()
285+
286+
cs.logger.Info("Sending initial configuration to agent", "pod", conn.PodName, "configVersion", configVersion)
287+
268288
if err := msgr.Send(ctx, buildRequest(fileOverviews, conn.InstanceID, configVersion)); err != nil {
269289
cs.logAndSendErrorStatus(deployment, conn, err)
270290

@@ -348,9 +368,11 @@ func (cs *commandService) waitForInitialConfigApply(
348368
res := msg.GetCommandResponse()
349369
if res.GetStatus() != pb.CommandResponse_COMMAND_STATUS_OK {
350370
applyErr := fmt.Errorf("msg: %s; error: %s", res.GetMessage(), res.GetError())
371+
cs.logger.V(1).Info("Received initial config response with error", "error", applyErr)
351372
return applyErr, nil
352373
}
353374

375+
cs.logger.V(1).Info("Received successful initial config response")
354376
return applyErr, connectionErr
355377
}
356378
}

internal/controller/nginx/agent/command_test.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -340,32 +340,26 @@ func TestSubscribe(t *testing.T) {
340340

341341
mockServer := newMockSubscribeServer(ctx)
342342

343-
// put the requests on the listenCh for the Subscription loop to pick up
343+
// Define the broadcast messages to be sent later
344344
loopFile := &pb.File{
345345
FileMeta: &pb.FileMeta{
346346
Name: "some-other.conf",
347347
Hash: "56789",
348348
},
349349
}
350-
listenCh <- broadcast.NginxAgentMessage{
351-
Type: broadcast.ConfigApplyRequest,
352-
FileOverviews: []*pb.File{loopFile},
353-
}
354-
355350
loopAction := &pb.NGINXPlusAction{
356351
Action: &pb.NGINXPlusAction_UpdateStreamServers{},
357352
}
358-
listenCh <- broadcast.NginxAgentMessage{
359-
Type: broadcast.APIRequest,
360-
NGINXPlusAction: loopAction,
361-
}
362353

363354
// start the Subscriber
364355
errCh := make(chan error)
365356
go func() {
366357
errCh <- cs.Subscribe(mockServer)
367358
}()
368359

360+
// PHASE 1: Initial config is sent by setInitialConfig() BEFORE the event loop starts
361+
// These should NOT signal ResponseCh as they're not broadcast operations
362+
369363
// ensure that the initial config file was sent when the Subscription connected
370364
expFile := &pb.File{
371365
FileMeta: &pb.FileMeta{
@@ -374,6 +368,7 @@ func TestSubscribe(t *testing.T) {
374368
},
375369
}
376370
ensureFileWasSent(g, mockServer, expFile)
371+
// Respond to initial config - this should NOT signal ResponseCh
377372
mockServer.recvChan <- &pb.DataPlaneResponse{
378373
CommandResponse: &pb.CommandResponse{
379374
Status: pb.CommandResponse_COMMAND_STATUS_OK,
@@ -382,22 +377,40 @@ func TestSubscribe(t *testing.T) {
382377

383378
// ensure that the initial API request was sent when the Subscription connected
384379
ensureAPIRequestWasSent(g, mockServer, initialAction)
380+
// Respond to initial API request - this should NOT signal ResponseCh
385381
mockServer.recvChan <- &pb.DataPlaneResponse{
386382
CommandResponse: &pb.CommandResponse{
387383
Status: pb.CommandResponse_COMMAND_STATUS_OK,
388384
},
389385
}
390386

387+
// Wait for status queue to be updated after initial config completes
391388
g.Eventually(func() string {
392389
obj := cs.statusQueue.Dequeue(ctx)
393390
return obj.Deployment.Name
394391
}).Should(Equal("nginx-deployment"))
395392

396-
// ensure the second file was sent in the loop
393+
// PHASE 2: Now send broadcast operations to the event loop
394+
// Put the broadcast requests on the listenCh for the Subscription loop to pick up
395+
listenCh <- broadcast.NginxAgentMessage{
396+
Type: broadcast.ConfigApplyRequest,
397+
FileOverviews: []*pb.File{loopFile},
398+
}
399+
400+
// PHASE 2: Broadcast operations from the event loop
401+
// These SHOULD signal ResponseCh as they are broadcast operations
402+
403+
// ensure the broadcast file was sent in the loop
397404
ensureFileWasSent(g, mockServer, loopFile)
398405
verifyResponse(g, mockServer, responseCh)
399406

400-
// ensure the second action was sent in the loop
407+
// Send second broadcast operation
408+
listenCh <- broadcast.NginxAgentMessage{
409+
Type: broadcast.APIRequest,
410+
NGINXPlusAction: loopAction,
411+
}
412+
413+
// ensure the broadcast action was sent in the loop
401414
ensureAPIRequestWasSent(g, mockServer, loopAction)
402415
verifyResponse(g, mockServer, responseCh)
403416

internal/controller/nginx/agent/deployment.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,18 @@ func (d *Deployment) GetNGINXPlusActions() []*pb.NGINXPlusAction {
161161

162162
// GetFile gets the requested file for the deployment and returns its contents.
163163
// The deployment FileLock MUST already be locked before calling this function.
164-
func (d *Deployment) GetFile(name, hash string) []byte {
164+
func (d *Deployment) GetFile(name, hash string) ([]byte, string) {
165+
var fileFoundHash string
165166
for _, file := range d.files {
166-
if name == file.Meta.GetName() && hash == file.Meta.GetHash() {
167-
return file.Contents
167+
if name == file.Meta.GetName() {
168+
fileFoundHash = file.Meta.GetHash()
169+
if hash == file.Meta.GetHash() {
170+
return file.Contents, file.Meta.GetHash()
171+
}
168172
}
169173
}
170174

171-
return nil
175+
return nil, fileFoundHash
172176
}
173177

174178
// SetFiles updates the nginx files and fileOverviews for the deployment and returns the message to send.

internal/controller/nginx/agent/deployment_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,13 @@ func TestSetAndGetFiles(t *testing.T) {
5252
g.Expect(msg.FileOverviews).To(HaveLen(9)) // 1 file + 8 ignored files
5353
g.Expect(fileOverviews).To(Equal(msg.FileOverviews))
5454

55-
file := deployment.GetFile("test.conf", "12345")
55+
file, _ := deployment.GetFile("test.conf", "12345")
5656
g.Expect(file).To(Equal([]byte("test content")))
5757

58-
g.Expect(deployment.GetFile("invalid", "12345")).To(BeNil())
59-
g.Expect(deployment.GetFile("test.conf", "invalid")).To(BeNil())
58+
invalidFile, _ := deployment.GetFile("invalid", "12345")
59+
g.Expect(invalidFile).To(BeNil())
60+
wrongHashFile, _ := deployment.GetFile("test.conf", "invalid")
61+
g.Expect(wrongHashFile).To(BeNil())
6062

6163
// Set the same files again
6264
msg = deployment.SetFiles(files)

internal/controller/nginx/agent/file.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,22 @@ func (fs *fileService) getFileContents(req *pb.GetFileRequest, connKey string) (
143143
}
144144

145145
filename := req.GetFileMeta().GetName()
146-
contents := deployment.GetFile(filename, req.GetFileMeta().GetHash())
146+
contents, fileFoundHash := deployment.GetFile(filename, req.GetFileMeta().GetHash())
147147
if len(contents) == 0 {
148+
fs.logger.V(1).Info("Error getting file for agent", "file", filename)
149+
if fileFoundHash != "" {
150+
fs.logger.V(1).Info(
151+
"File found had wrong hash",
152+
"hashWanted",
153+
req.GetFileMeta().GetHash(),
154+
"hashFound",
155+
fileFoundHash,
156+
)
157+
}
148158
return nil, status.Errorf(codes.NotFound, "file not found")
149159
}
150160

151-
fs.logger.V(1).Info("Getting file for agent", "file", filename)
161+
fs.logger.V(1).Info("Getting file for agent", "file", filename, "fileHash", fileFoundHash)
152162

153163
return contents, nil
154164
}

0 commit comments

Comments
 (0)