Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ func (h *eventHandlerImpl) sendNginxConfig(ctx context.Context, logger logr.Logg
panic("expected deployment, got nil")
}

nginxImage, _ := provisioner.DetermineNginxImageName(
gw.EffectiveNginxProxy,
h.cfg.plus,
h.cfg.gatewayPodConfig.Version,
)
deployment.SetImageVersion(nginxImage)

cfg := dataplane.BuildConfiguration(ctx, logger, gr, gw, h.cfg.serviceResolver, h.cfg.plus)
depCtx, getErr := h.getDeploymentContext(ctx)
if getErr != nil {
Expand Down
66 changes: 54 additions & 12 deletions internal/controller/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (cs *commandService) CreateConnection(
podName := resource.GetContainerInfo().GetHostname()
cs.logger.Info(fmt.Sprintf("Creating connection for nginx pod: %s", podName))

owner, err := cs.getPodOwner(podName)
owner, _, err := cs.getPodOwner(podName)
if err != nil {
response := &pb.CreateConnectionResponse{
Response: &pb.CommandResponse{
Expand Down Expand Up @@ -281,6 +281,17 @@ func (cs *commandService) setInitialConfig(
deployment.FileLock.Lock()
defer deployment.FileLock.Unlock()

_, pod, err := cs.getPodOwner(conn.PodName)
if err != nil {
cs.logAndSendErrorStatus(deployment, conn, err)

return grpcStatus.Error(codes.Internal, err.Error())
}
if err := cs.validatePodImageVersion(pod, deployment.imageVersion); err != nil {
cs.logAndSendErrorStatus(deployment, conn, err)
return grpcStatus.Errorf(codes.FailedPrecondition, "nginx image version validation failed: %s", err.Error())
}

fileOverviews, configVersion := deployment.GetFileOverviews()

cs.logger.Info("Sending initial configuration to agent", "pod", conn.PodName, "configVersion", configVersion)
Expand Down Expand Up @@ -443,7 +454,7 @@ func buildPlusAPIRequest(action *pb.NGINXPlusAction, instanceID string) *pb.Mana
}
}

func (cs *commandService) getPodOwner(podName string) (types.NamespacedName, error) {
func (cs *commandService) getPodOwner(podName string) (types.NamespacedName, *v1.Pod, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand All @@ -452,30 +463,31 @@ func (cs *commandService) getPodOwner(podName string) (types.NamespacedName, err
FieldSelector: fields.SelectorFromSet(fields.Set{"metadata.name": podName}),
}
if err := cs.k8sReader.List(ctx, &pods, listOpts); err != nil {
return types.NamespacedName{}, fmt.Errorf("error listing pods: %w", err)
return types.NamespacedName{}, nil, fmt.Errorf("error listing pods: %w", err)
}

if len(pods.Items) == 0 {
return types.NamespacedName{}, fmt.Errorf("no pods found with name %q", podName)
return types.NamespacedName{}, nil, fmt.Errorf("no pods found with name %q", podName)
}

if len(pods.Items) > 1 {
return types.NamespacedName{}, fmt.Errorf("should only be one pod with name %q", podName)
return types.NamespacedName{}, nil, fmt.Errorf("should only be one pod with name %q", podName)
}
pod := pods.Items[0]
pod := &pods.Items[0]

podOwnerRefs := pod.GetOwnerReferences()
if len(podOwnerRefs) != 1 {
return types.NamespacedName{}, fmt.Errorf("expected one owner reference of the nginx Pod, got %d", len(podOwnerRefs))
tooManyOwnersError := "expected one owner reference of the nginx Pod, got %d"
return types.NamespacedName{}, nil, fmt.Errorf(tooManyOwnersError, len(podOwnerRefs))
}

if podOwnerRefs[0].Kind != "ReplicaSet" && podOwnerRefs[0].Kind != "DaemonSet" {
err := fmt.Errorf("expected pod owner reference to be ReplicaSet or DaemonSet, got %s", podOwnerRefs[0].Kind)
return types.NamespacedName{}, err
return types.NamespacedName{}, nil, err
}

if podOwnerRefs[0].Kind == "DaemonSet" {
return types.NamespacedName{Namespace: pod.Namespace, Name: podOwnerRefs[0].Name}, nil
return types.NamespacedName{Namespace: pod.Namespace, Name: podOwnerRefs[0].Name}, pod, nil
}

var replicaSet appsv1.ReplicaSet
Expand All @@ -497,16 +509,46 @@ func (cs *commandService) getPodOwner(podName string) (types.NamespacedName, err
return true, nil
},
); err != nil {
return types.NamespacedName{}, fmt.Errorf("failed to get nginx Pod's ReplicaSet: %w", replicaSetErr)
return types.NamespacedName{}, nil, fmt.Errorf("failed to get nginx Pod's ReplicaSet: %w", replicaSetErr)
}

replicaOwnerRefs := replicaSet.GetOwnerReferences()
if len(replicaOwnerRefs) != 1 {
err := fmt.Errorf("expected one owner reference of the nginx ReplicaSet, got %d", len(replicaOwnerRefs))
return types.NamespacedName{}, err
return types.NamespacedName{}, nil, err
}

return types.NamespacedName{Namespace: pod.Namespace, Name: replicaOwnerRefs[0].Name}, pod, nil
}

// validatePodImageVersion checks if the pod's nginx container image version matches the expected version
// from its deployment. Returns an error if versions don't match.
func (cs *commandService) validatePodImageVersion(
pod *v1.Pod,
expectedImage string,
) error {
var podNginxImage string

for _, container := range pod.Spec.Containers {
if container.Name == "nginx" {
podNginxImage = container.Image
break
}
}
if podNginxImage == "" {
return fmt.Errorf("nginx container not found in pod %q", pod.Name)
}

// Compare images
if podNginxImage != expectedImage {
return fmt.Errorf("nginx image version mismatch: pod has %q but expected %q", podNginxImage, expectedImage)
}

return types.NamespacedName{Namespace: pod.Namespace, Name: replicaOwnerRefs[0].Name}, nil
cs.logger.V(1).Info("Pod nginx image version validated successfully",
"podName", pod.Name,
"image", podNginxImage)

return nil
}

// UpdateDataPlaneStatus is called by agent on startup and upon any change in agent metadata,
Expand Down
114 changes: 75 additions & 39 deletions internal/controller/nginx/agent/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,48 @@ func createGrpcContextWithCancel() (context.Context, context.CancelFunc) {
}), cancel
}

func getDefaultPodList() []runtime.Object {
pod := &v1.PodList{
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-pod",
Namespace: "test",
OwnerReferences: []metav1.OwnerReference{
{
Kind: "ReplicaSet",
Name: "nginx-replicaset",
},
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Image: "nginx:v1.0.0",
Name: "nginx",
},
},
},
},
},
}

replicaSet := &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-replicaset",
Namespace: "test",
OwnerReferences: []metav1.OwnerReference{
{
Kind: "Deployment",
Name: "nginx-deployment",
},
},
},
}

return []runtime.Object{pod, replicaSet}
}

func TestCreateConnection(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -165,40 +207,9 @@ func TestCreateConnection(t *testing.T) {
connTracker := agentgrpcfakes.FakeConnectionsTracker{}

var objs []runtime.Object
if test.errString == "" {
pod := &v1.PodList{
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-pod",
Namespace: "test",
OwnerReferences: []metav1.OwnerReference{
{
Kind: "ReplicaSet",
Name: "nginx-replicaset",
},
},
},
},
},
}

replicaSet := &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-replicaset",
Namespace: "test",
OwnerReferences: []metav1.OwnerReference{
{
Kind: "Deployment",
Name: "nginx-deployment",
},
},
},
}

objs = []runtime.Object{pod, replicaSet}
if test.errString != "error getting pod owner" {
objs = getDefaultPodList()
}

fakeClient, err := createFakeK8sClient(objs...)
g.Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -298,10 +309,13 @@ func TestSubscribe(t *testing.T) {
}
connTracker.GetConnectionReturns(conn)

fakeClient, err := createFakeK8sClient(getDefaultPodList()...)
g.Expect(err).ToNot(HaveOccurred())

store := NewDeploymentStore(&connTracker)
cs := newCommandService(
logr.Discard(),
fake.NewFakeClient(),
fakeClient,
store,
&connTracker,
status.NewQueue(),
Expand Down Expand Up @@ -329,6 +343,7 @@ func TestSubscribe(t *testing.T) {
},
}
deployment.SetFiles(files)
deployment.SetImageVersion("nginx:v1.0.0")

initialAction := &pb.NGINXPlusAction{
Action: &pb.NGINXPlusAction_UpdateHttpUpstreamServers{},
Expand Down Expand Up @@ -439,11 +454,14 @@ func TestSubscribe_Reset(t *testing.T) {
}
connTracker.GetConnectionReturns(conn)

fakeClient, err := createFakeK8sClient(getDefaultPodList()...)
g.Expect(err).ToNot(HaveOccurred())

store := NewDeploymentStore(&connTracker)
resetChan := make(chan struct{})
cs := newCommandService(
logr.Discard(),
fake.NewFakeClient(),
fakeClient,
store,
&connTracker,
status.NewQueue(),
Expand Down Expand Up @@ -471,6 +489,7 @@ func TestSubscribe_Reset(t *testing.T) {
},
}
deployment.SetFiles(files)
deployment.SetImageVersion("nginx:v1.0.0")

ctx, cancel := createGrpcContextWithCancel()
defer cancel()
Expand Down Expand Up @@ -590,9 +609,10 @@ func TestSetInitialConfig_Errors(t *testing.T) {
t.Parallel()

tests := []struct {
name string
setup func(msgr *messengerfakes.FakeMessenger, deployment *Deployment)
name string
errString string
podList []runtime.Object
}{
{
name: "error sending initial config",
Expand Down Expand Up @@ -652,6 +672,13 @@ func TestSetInitialConfig_Errors(t *testing.T) {
},
errString: "api apply error",
},
{
name: "error validating nginx version",
setup: func(_ *messengerfakes.FakeMessenger, deployment *Deployment) {
deployment.SetImageVersion("nginx:v2.0.0")
},
errString: "nginx image version mismatch: pod has \"nginx:v1.0.0\" but expected \"nginx:v2.0.0\"",
},
}

for _, test := range tests {
Expand All @@ -662,9 +689,17 @@ func TestSetInitialConfig_Errors(t *testing.T) {
connTracker := agentgrpcfakes.FakeConnectionsTracker{}
msgr := &messengerfakes.FakeMessenger{}

podList := test.podList
if len(podList) == 0 {
podList = getDefaultPodList()
}

fakeClient, err := createFakeK8sClient(podList...)
g.Expect(err).ToNot(HaveOccurred())

cs := newCommandService(
logr.Discard(),
fake.NewFakeClient(),
fakeClient,
NewDeploymentStore(&connTracker),
&connTracker,
status.NewQueue(),
Expand All @@ -678,12 +713,13 @@ func TestSetInitialConfig_Errors(t *testing.T) {
}

deployment := newDeployment(&broadcastfakes.FakeBroadcaster{})
deployment.SetImageVersion("nginx:v1.0.0")

if test.setup != nil {
test.setup(msgr, deployment)
}

err := cs.setInitialConfig(context.Background(), deployment, conn, msgr)
err = cs.setInitialConfig(context.Background(), deployment, conn, msgr)

g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring(test.errString))
Expand Down Expand Up @@ -882,7 +918,7 @@ func TestGetPodOwner(t *testing.T) {
nil,
)

owner, err := cs.getPodOwner(test.podName)
owner, _, err := cs.getPodOwner(test.podName)

if test.errString != "" {
g.Expect(err).To(HaveOccurred())
Expand Down
10 changes: 10 additions & 0 deletions internal/controller/nginx/agent/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Deployment struct {

broadcaster broadcast.Broadcaster

imageVersion string

configVersion string
// error that is set if a ConfigApply call failed for a Pod. This is needed
// because if subsequent upstream API calls are made within the same update event,
Expand Down Expand Up @@ -73,6 +75,14 @@ func (d *Deployment) GetBroadcaster() broadcast.Broadcaster {
return d.broadcaster
}

// SetImageVersion sets the deployment's image version.
func (d *Deployment) SetImageVersion(imageVersion string) {
d.FileLock.Lock()
defer d.FileLock.Unlock()

d.imageVersion = imageVersion
}

// SetLatestConfigError sets the latest config apply error for the deployment.
func (d *Deployment) SetLatestConfigError(err error) {
d.errLock.Lock()
Expand Down
Loading