Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/controller/nginx/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func TestUpdateConfig(t *testing.T) {
updater.UpdateConfig(deployment, []File{file})

g.Expect(fakeBroadcaster.SendCallCount()).To(Equal(1))
g.Expect(deployment.GetFile(file.Meta.Name, file.Meta.Hash)).To(Equal(file.Contents))
fileContents, _ := deployment.GetFile(file.Meta.Name, file.Meta.Hash)
g.Expect(fileContents).To(Equal(file.Contents))

if test.expErr {
g.Expect(deployment.GetLatestConfigError()).To(Equal(testErr))
Expand Down
24 changes: 23 additions & 1 deletion internal/controller/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
channels := broadcaster.Subscribe()
defer broadcaster.CancelSubscription(channels.ID)

var pendingBroadcastRequest *broadcast.NginxAgentMessage

for {
// When a message is received over the ListenCh, it is assumed and required that the
// deployment object is already LOCKED. This lock is acquired by the event handler before calling
Expand Down Expand Up @@ -191,13 +193,20 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error

return grpcStatus.Error(codes.Internal, err.Error())
}

// Track this broadcast request to distinguish it from initial config operations.
// Only broadcast operations should signal ResponseCh for coordination.
pendingBroadcastRequest = &msg
case err = <-msgr.Errors():
cs.logger.Error(err, "connection error", "pod", conn.PodName)
deployment.SetPodErrorStatus(conn.PodName, err)
select {
case channels.ResponseCh <- struct{}{}:
default:
}
if pendingBroadcastRequest != nil {
cs.logger.V(1).Info("Connection error during pending request, operation failed")
}

if errors.Is(err, io.EOF) {
return grpcStatus.Error(codes.Aborted, err.Error())
Expand All @@ -215,7 +224,15 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
} else {
deployment.SetPodErrorStatus(conn.PodName, nil)
}
channels.ResponseCh <- struct{}{}

// Signal broadcast completion only for tracked broadcast operations.
// Initial config responses are ignored to prevent spurious success messages.
if pendingBroadcastRequest != nil {
pendingBroadcastRequest = nil
channels.ResponseCh <- struct{}{}
} else {
cs.logger.V(1).Info("Received response for non-broadcast request (likely initial config)", "pod", conn.PodName)
}
}
}
}
Expand Down Expand Up @@ -265,6 +282,9 @@ func (cs *commandService) setInitialConfig(
defer deployment.FileLock.Unlock()

fileOverviews, configVersion := deployment.GetFileOverviews()

cs.logger.Info("Sending initial configuration to agent", "pod", conn.PodName, "configVersion", configVersion)

if err := msgr.Send(ctx, buildRequest(fileOverviews, conn.InstanceID, configVersion)); err != nil {
cs.logAndSendErrorStatus(deployment, conn, err)

Expand Down Expand Up @@ -348,9 +368,11 @@ func (cs *commandService) waitForInitialConfigApply(
res := msg.GetCommandResponse()
if res.GetStatus() != pb.CommandResponse_COMMAND_STATUS_OK {
applyErr := fmt.Errorf("msg: %s; error: %s", res.GetMessage(), res.GetError())
cs.logger.V(1).Info("Received initial config response with error", "error", applyErr)
return applyErr, nil
}

cs.logger.V(1).Info("Received successful initial config response")
return applyErr, connectionErr
}
}
Expand Down
37 changes: 25 additions & 12 deletions internal/controller/nginx/agent/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,32 +340,26 @@ func TestSubscribe(t *testing.T) {

mockServer := newMockSubscribeServer(ctx)

// put the requests on the listenCh for the Subscription loop to pick up
// Define the broadcast messages to be sent later
loopFile := &pb.File{
FileMeta: &pb.FileMeta{
Name: "some-other.conf",
Hash: "56789",
},
}
listenCh <- broadcast.NginxAgentMessage{
Type: broadcast.ConfigApplyRequest,
FileOverviews: []*pb.File{loopFile},
}

loopAction := &pb.NGINXPlusAction{
Action: &pb.NGINXPlusAction_UpdateStreamServers{},
}
listenCh <- broadcast.NginxAgentMessage{
Type: broadcast.APIRequest,
NGINXPlusAction: loopAction,
}

// start the Subscriber
errCh := make(chan error)
go func() {
errCh <- cs.Subscribe(mockServer)
}()

// PHASE 1: Initial config is sent by setInitialConfig() BEFORE the event loop starts
// These should NOT signal ResponseCh as they're not broadcast operations

// ensure that the initial config file was sent when the Subscription connected
expFile := &pb.File{
FileMeta: &pb.FileMeta{
Expand All @@ -374,6 +368,7 @@ func TestSubscribe(t *testing.T) {
},
}
ensureFileWasSent(g, mockServer, expFile)
// Respond to initial config - this should NOT signal ResponseCh
mockServer.recvChan <- &pb.DataPlaneResponse{
CommandResponse: &pb.CommandResponse{
Status: pb.CommandResponse_COMMAND_STATUS_OK,
Expand All @@ -382,22 +377,40 @@ func TestSubscribe(t *testing.T) {

// ensure that the initial API request was sent when the Subscription connected
ensureAPIRequestWasSent(g, mockServer, initialAction)
// Respond to initial API request - this should NOT signal ResponseCh
mockServer.recvChan <- &pb.DataPlaneResponse{
CommandResponse: &pb.CommandResponse{
Status: pb.CommandResponse_COMMAND_STATUS_OK,
},
}

// Wait for status queue to be updated after initial config completes
g.Eventually(func() string {
obj := cs.statusQueue.Dequeue(ctx)
return obj.Deployment.Name
}).Should(Equal("nginx-deployment"))

// ensure the second file was sent in the loop
// PHASE 2: Now send broadcast operations to the event loop
// Put the broadcast requests on the listenCh for the Subscription loop to pick up
listenCh <- broadcast.NginxAgentMessage{
Type: broadcast.ConfigApplyRequest,
FileOverviews: []*pb.File{loopFile},
}

// PHASE 2: Broadcast operations from the event loop
// These SHOULD signal ResponseCh as they are broadcast operations

// ensure the broadcast file was sent in the loop
ensureFileWasSent(g, mockServer, loopFile)
verifyResponse(g, mockServer, responseCh)

// ensure the second action was sent in the loop
// Send second broadcast operation
listenCh <- broadcast.NginxAgentMessage{
Type: broadcast.APIRequest,
NGINXPlusAction: loopAction,
}

// ensure the broadcast action was sent in the loop
ensureAPIRequestWasSent(g, mockServer, loopAction)
verifyResponse(g, mockServer, responseCh)

Expand Down
12 changes: 8 additions & 4 deletions internal/controller/nginx/agent/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,18 @@ func (d *Deployment) GetNGINXPlusActions() []*pb.NGINXPlusAction {

// GetFile gets the requested file for the deployment and returns its contents.
// The deployment FileLock MUST already be locked before calling this function.
func (d *Deployment) GetFile(name, hash string) []byte {
func (d *Deployment) GetFile(name, hash string) ([]byte, string) {
var fileFoundHash string
for _, file := range d.files {
if name == file.Meta.GetName() && hash == file.Meta.GetHash() {
return file.Contents
if name == file.Meta.GetName() {
fileFoundHash = file.Meta.GetHash()
if hash == file.Meta.GetHash() {
return file.Contents, file.Meta.GetHash()
}
}
}

return nil
return nil, fileFoundHash
}

// SetFiles updates the nginx files and fileOverviews for the deployment and returns the message to send.
Expand Down
8 changes: 5 additions & 3 deletions internal/controller/nginx/agent/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ func TestSetAndGetFiles(t *testing.T) {
g.Expect(msg.FileOverviews).To(HaveLen(9)) // 1 file + 8 ignored files
g.Expect(fileOverviews).To(Equal(msg.FileOverviews))

file := deployment.GetFile("test.conf", "12345")
file, _ := deployment.GetFile("test.conf", "12345")
g.Expect(file).To(Equal([]byte("test content")))

g.Expect(deployment.GetFile("invalid", "12345")).To(BeNil())
g.Expect(deployment.GetFile("test.conf", "invalid")).To(BeNil())
invalidFile, _ := deployment.GetFile("invalid", "12345")
g.Expect(invalidFile).To(BeNil())
wrongHashFile, _ := deployment.GetFile("test.conf", "invalid")
g.Expect(wrongHashFile).To(BeNil())

// Set the same files again
msg = deployment.SetFiles(files)
Expand Down
14 changes: 12 additions & 2 deletions internal/controller/nginx/agent/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,22 @@ func (fs *fileService) getFileContents(req *pb.GetFileRequest, connKey string) (
}

filename := req.GetFileMeta().GetName()
contents := deployment.GetFile(filename, req.GetFileMeta().GetHash())
contents, fileFoundHash := deployment.GetFile(filename, req.GetFileMeta().GetHash())
if len(contents) == 0 {
fs.logger.V(1).Info("Error getting file for agent", "file", filename)
if fileFoundHash != "" {
fs.logger.V(1).Info(
"File found had wrong hash",
"hashWanted",
req.GetFileMeta().GetHash(),
"hashFound",
fileFoundHash,
)
}
return nil, status.Errorf(codes.NotFound, "file not found")
}

fs.logger.V(1).Info("Getting file for agent", "file", filename)
fs.logger.V(1).Info("Getting file for agent", "file", filename, "fileHash", fileFoundHash)

return contents, nil
}
Expand Down