diff --git a/internal/controller/nginx/agent/agent_test.go b/internal/controller/nginx/agent/agent_test.go index 5351fbb944..b0646ee939 100644 --- a/internal/controller/nginx/agent/agent_test.go +++ b/internal/controller/nginx/agent/agent_test.go @@ -66,7 +66,8 @@ func TestUpdateConfig(t *testing.T) { updater.UpdateConfig(deployment, []File{file}) g.Expect(fakeBroadcaster.SendCallCount()).To(Equal(1)) - g.Expect(deployment.GetFile(file.Meta.Name, file.Meta.Hash)).To(Equal(file.Contents)) + fileContents, _ := deployment.GetFile(file.Meta.Name, file.Meta.Hash) + g.Expect(fileContents).To(Equal(file.Contents)) if test.expErr { g.Expect(deployment.GetLatestConfigError()).To(Equal(testErr)) diff --git a/internal/controller/nginx/agent/command.go b/internal/controller/nginx/agent/command.go index 046fb7f313..cc89a47fd5 100644 --- a/internal/controller/nginx/agent/command.go +++ b/internal/controller/nginx/agent/command.go @@ -155,6 +155,8 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error channels := broadcaster.Subscribe() defer broadcaster.CancelSubscription(channels.ID) + var pendingBroadcastRequest *broadcast.NginxAgentMessage + for { // When a message is received over the ListenCh, it is assumed and required that the // deployment object is already LOCKED. This lock is acquired by the event handler before calling @@ -191,6 +193,10 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error return grpcStatus.Error(codes.Internal, err.Error()) } + + // Track this broadcast request to distinguish it from initial config operations. + // Only broadcast operations should signal ResponseCh for coordination. + pendingBroadcastRequest = &msg case err = <-msgr.Errors(): cs.logger.Error(err, "connection error", "pod", conn.PodName) deployment.SetPodErrorStatus(conn.PodName, err) @@ -198,6 +204,9 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error case channels.ResponseCh <- struct{}{}: default: } + if pendingBroadcastRequest != nil { + cs.logger.V(1).Info("Connection error during pending request, operation failed") + } if errors.Is(err, io.EOF) { return grpcStatus.Error(codes.Aborted, err.Error()) @@ -215,7 +224,15 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error } else { deployment.SetPodErrorStatus(conn.PodName, nil) } - channels.ResponseCh <- struct{}{} + + // Signal broadcast completion only for tracked broadcast operations. + // Initial config responses are ignored to prevent spurious success messages. + if pendingBroadcastRequest != nil { + pendingBroadcastRequest = nil + channels.ResponseCh <- struct{}{} + } else { + cs.logger.V(1).Info("Received response for non-broadcast request (likely initial config)", "pod", conn.PodName) + } } } } @@ -265,6 +282,9 @@ func (cs *commandService) setInitialConfig( defer deployment.FileLock.Unlock() fileOverviews, configVersion := deployment.GetFileOverviews() + + cs.logger.Info("Sending initial configuration to agent", "pod", conn.PodName, "configVersion", configVersion) + if err := msgr.Send(ctx, buildRequest(fileOverviews, conn.InstanceID, configVersion)); err != nil { cs.logAndSendErrorStatus(deployment, conn, err) @@ -348,9 +368,11 @@ func (cs *commandService) waitForInitialConfigApply( res := msg.GetCommandResponse() if res.GetStatus() != pb.CommandResponse_COMMAND_STATUS_OK { applyErr := fmt.Errorf("msg: %s; error: %s", res.GetMessage(), res.GetError()) + cs.logger.V(1).Info("Received initial config response with error", "error", applyErr) return applyErr, nil } + cs.logger.V(1).Info("Received successful initial config response") return applyErr, connectionErr } } diff --git a/internal/controller/nginx/agent/command_test.go b/internal/controller/nginx/agent/command_test.go index 7b8b763011..634cc0eddb 100644 --- a/internal/controller/nginx/agent/command_test.go +++ b/internal/controller/nginx/agent/command_test.go @@ -340,25 +340,16 @@ func TestSubscribe(t *testing.T) { mockServer := newMockSubscribeServer(ctx) - // put the requests on the listenCh for the Subscription loop to pick up + // Define the broadcast messages to be sent later loopFile := &pb.File{ FileMeta: &pb.FileMeta{ Name: "some-other.conf", Hash: "56789", }, } - listenCh <- broadcast.NginxAgentMessage{ - Type: broadcast.ConfigApplyRequest, - FileOverviews: []*pb.File{loopFile}, - } - loopAction := &pb.NGINXPlusAction{ Action: &pb.NGINXPlusAction_UpdateStreamServers{}, } - listenCh <- broadcast.NginxAgentMessage{ - Type: broadcast.APIRequest, - NGINXPlusAction: loopAction, - } // start the Subscriber errCh := make(chan error) @@ -366,6 +357,9 @@ func TestSubscribe(t *testing.T) { errCh <- cs.Subscribe(mockServer) }() + // PHASE 1: Initial config is sent by setInitialConfig() BEFORE the event loop starts + // These should NOT signal ResponseCh as they're not broadcast operations + // ensure that the initial config file was sent when the Subscription connected expFile := &pb.File{ FileMeta: &pb.FileMeta{ @@ -374,6 +368,7 @@ func TestSubscribe(t *testing.T) { }, } ensureFileWasSent(g, mockServer, expFile) + // Respond to initial config - this should NOT signal ResponseCh mockServer.recvChan <- &pb.DataPlaneResponse{ CommandResponse: &pb.CommandResponse{ Status: pb.CommandResponse_COMMAND_STATUS_OK, @@ -382,22 +377,40 @@ func TestSubscribe(t *testing.T) { // ensure that the initial API request was sent when the Subscription connected ensureAPIRequestWasSent(g, mockServer, initialAction) + // Respond to initial API request - this should NOT signal ResponseCh mockServer.recvChan <- &pb.DataPlaneResponse{ CommandResponse: &pb.CommandResponse{ Status: pb.CommandResponse_COMMAND_STATUS_OK, }, } + // Wait for status queue to be updated after initial config completes g.Eventually(func() string { obj := cs.statusQueue.Dequeue(ctx) return obj.Deployment.Name }).Should(Equal("nginx-deployment")) - // ensure the second file was sent in the loop + // PHASE 2: Now send broadcast operations to the event loop + // Put the broadcast requests on the listenCh for the Subscription loop to pick up + listenCh <- broadcast.NginxAgentMessage{ + Type: broadcast.ConfigApplyRequest, + FileOverviews: []*pb.File{loopFile}, + } + + // PHASE 2: Broadcast operations from the event loop + // These SHOULD signal ResponseCh as they are broadcast operations + + // ensure the broadcast file was sent in the loop ensureFileWasSent(g, mockServer, loopFile) verifyResponse(g, mockServer, responseCh) - // ensure the second action was sent in the loop + // Send second broadcast operation + listenCh <- broadcast.NginxAgentMessage{ + Type: broadcast.APIRequest, + NGINXPlusAction: loopAction, + } + + // ensure the broadcast action was sent in the loop ensureAPIRequestWasSent(g, mockServer, loopAction) verifyResponse(g, mockServer, responseCh) diff --git a/internal/controller/nginx/agent/deployment.go b/internal/controller/nginx/agent/deployment.go index 48cb69d5af..7e9aa3aba5 100644 --- a/internal/controller/nginx/agent/deployment.go +++ b/internal/controller/nginx/agent/deployment.go @@ -161,14 +161,18 @@ func (d *Deployment) GetNGINXPlusActions() []*pb.NGINXPlusAction { // GetFile gets the requested file for the deployment and returns its contents. // The deployment FileLock MUST already be locked before calling this function. -func (d *Deployment) GetFile(name, hash string) []byte { +func (d *Deployment) GetFile(name, hash string) ([]byte, string) { + var fileFoundHash string for _, file := range d.files { - if name == file.Meta.GetName() && hash == file.Meta.GetHash() { - return file.Contents + if name == file.Meta.GetName() { + fileFoundHash = file.Meta.GetHash() + if hash == file.Meta.GetHash() { + return file.Contents, file.Meta.GetHash() + } } } - return nil + return nil, fileFoundHash } // SetFiles updates the nginx files and fileOverviews for the deployment and returns the message to send. diff --git a/internal/controller/nginx/agent/deployment_test.go b/internal/controller/nginx/agent/deployment_test.go index b3f2744ee6..0560c31600 100644 --- a/internal/controller/nginx/agent/deployment_test.go +++ b/internal/controller/nginx/agent/deployment_test.go @@ -52,11 +52,13 @@ func TestSetAndGetFiles(t *testing.T) { g.Expect(msg.FileOverviews).To(HaveLen(9)) // 1 file + 8 ignored files g.Expect(fileOverviews).To(Equal(msg.FileOverviews)) - file := deployment.GetFile("test.conf", "12345") + file, _ := deployment.GetFile("test.conf", "12345") g.Expect(file).To(Equal([]byte("test content"))) - g.Expect(deployment.GetFile("invalid", "12345")).To(BeNil()) - g.Expect(deployment.GetFile("test.conf", "invalid")).To(BeNil()) + invalidFile, _ := deployment.GetFile("invalid", "12345") + g.Expect(invalidFile).To(BeNil()) + wrongHashFile, _ := deployment.GetFile("test.conf", "invalid") + g.Expect(wrongHashFile).To(BeNil()) // Set the same files again msg = deployment.SetFiles(files) diff --git a/internal/controller/nginx/agent/file.go b/internal/controller/nginx/agent/file.go index ec2a5b1013..fdcdc2ae8f 100644 --- a/internal/controller/nginx/agent/file.go +++ b/internal/controller/nginx/agent/file.go @@ -143,12 +143,22 @@ func (fs *fileService) getFileContents(req *pb.GetFileRequest, connKey string) ( } filename := req.GetFileMeta().GetName() - contents := deployment.GetFile(filename, req.GetFileMeta().GetHash()) + contents, fileFoundHash := deployment.GetFile(filename, req.GetFileMeta().GetHash()) if len(contents) == 0 { + fs.logger.V(1).Info("Error getting file for agent", "file", filename) + if fileFoundHash != "" { + fs.logger.V(1).Info( + "File found had wrong hash", + "hashWanted", + req.GetFileMeta().GetHash(), + "hashFound", + fileFoundHash, + ) + } return nil, status.Errorf(codes.NotFound, "file not found") } - fs.logger.V(1).Info("Getting file for agent", "file", filename) + fs.logger.V(1).Info("Getting file for agent", "file", filename, "fileHash", fileFoundHash) return contents, nil }