Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions controllers/amphoracontroller_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,20 @@ func (r *OctaviaAmphoraControllerReconciler) reconcileNormal(ctx context.Context
}
// create DaemonSet - end

// Handle pod labeling for predictable IPs
ipKeyPrefix := "rsyslog_"
if instance.Spec.Role == "healthmanager" {
ipKeyPrefix = "hm_"
}
config := PodLabelingConfig{
ConfigMapName: octavia.HmConfigMap,
IPKeyPrefix: ipKeyPrefix,
ServiceName: instance.Name,
}
if err := HandlePodLabeling(ctx, helper, instance.Name, instance.Namespace, config); err != nil {
Log.Error(err, "Failed to handle pod labeling")
}

// We reached the end of the Reconcile, update the Ready condition based on
// the sub conditions
if instance.Status.Conditions.AllSubConditionIsTrue() {
Expand Down Expand Up @@ -863,6 +877,7 @@ func (r *OctaviaAmphoraControllerReconciler) SetupWithManager(mgr ctrl.Manager)
Owns(&corev1.Secret{}).
Owns(&corev1.ConfigMap{}).
Owns(&appsv1.DaemonSet{}).
Owns(&corev1.Pod{}).
// watch the secrets we don't own
Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(svcSecretFn)).
Expand Down
63 changes: 63 additions & 0 deletions controllers/octavia_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ import (
"fmt"

topologyv1 "github.com/openstack-k8s-operators/infra-operator/apis/topology/v1beta1"
"github.com/openstack-k8s-operators/lib-common/modules/common"
"github.com/openstack-k8s-operators/lib-common/modules/common/condition"
"github.com/openstack-k8s-operators/lib-common/modules/common/helper"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type conditionUpdater interface {
Expand Down Expand Up @@ -73,3 +78,61 @@ func ensureTopology(
}
return topology, nil
}

// PodLabelingConfig contains configuration for pod labeling
type PodLabelingConfig struct {
ConfigMapName string
IPKeyPrefix string
ServiceName string
}

// HandlePodLabeling adds predictableip labels to all pods owned by the specified instance
func HandlePodLabeling(ctx context.Context, helper *helper.Helper, instanceName, namespace string, config PodLabelingConfig) error {
// Get the ConfigMap once
configMap := &corev1.ConfigMap{}
if err := helper.GetClient().Get(ctx, types.NamespacedName{Name: config.ConfigMapName, Namespace: namespace}, configMap); err != nil {
return fmt.Errorf("failed to get configmap %s: %w", config.ConfigMapName, err)
}

// List all pods owned by this instance
podList := &corev1.PodList{}
listOpts := []client.ListOption{
client.InNamespace(namespace),
client.MatchingLabels(map[string]string{
common.AppSelector: instanceName,
}),
}

if err := helper.GetClient().List(ctx, podList, listOpts...); err != nil {
return fmt.Errorf("failed to list pods: %w", err)
}

// Process each pod
for i := range podList.Items {
pod := &podList.Items[i]

// Skip if already labeled or no node assigned
if (pod.Labels != nil && pod.Labels["predictableip"] != "") || pod.Spec.NodeName == "" {
continue
}

// Get predictable IP from configmap
ipKey := fmt.Sprintf("%s%s", config.IPKeyPrefix, pod.Spec.NodeName)
predictableIP, exists := configMap.Data[ipKey]
if !exists {
continue // Skip pods without predictable IPs
}

// Add the label and update
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels["predictableip"] = predictableIP

if err := helper.GetClient().Update(ctx, pod); err != nil {
log.FromContext(ctx).Error(err, "Failed to update pod", "pod", pod.Name, "predictableIP", predictableIP)
}
}

return nil
}
11 changes: 11 additions & 0 deletions controllers/octaviarsyslog_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,16 @@ func (r *OctaviaRsyslogReconciler) reconcileNormal(ctx context.Context, instance
}
// create DaemonSet - end

// Handle pod labeling for predictable IPs
config := PodLabelingConfig{
ConfigMapName: octavia.HmConfigMap,
IPKeyPrefix: "rsyslog_",
ServiceName: instance.Name,
}
if err := HandlePodLabeling(ctx, helper, instance.Name, instance.Namespace, config); err != nil {
Log.Error(err, "Failed to handle pod labeling")
}

// We reached the end of the Reconcile, update the Ready condition based on
// the sub conditions
if instance.Status.Conditions.AllSubConditionIsTrue() {
Expand Down Expand Up @@ -512,6 +522,7 @@ func (r *OctaviaRsyslogReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&corev1.Service{}).
Owns(&corev1.ConfigMap{}).
Owns(&appsv1.DaemonSet{}).
Owns(&corev1.Pod{}).
Watches(&topologyv1.Topology{},
handler.EnqueueRequestsFromMapFunc(r.findObjectsForSrc),
builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Expand Down
244 changes: 244 additions & 0 deletions tests/functional/pod_labeling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package functional_test

import (
"fmt"

"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2" //revive:disable:dot-imports
. "github.com/onsi/gomega" //revive:disable:dot-imports

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/openstack-k8s-operators/lib-common/modules/common"
"github.com/openstack-k8s-operators/lib-common/modules/common/helper"
octaviav1 "github.com/openstack-k8s-operators/octavia-operator/api/v1beta1"
"github.com/openstack-k8s-operators/octavia-operator/controllers"
"github.com/openstack-k8s-operators/octavia-operator/pkg/octavia"
)

var _ = Describe("Pod Labeling", func() {
var (
configMapName types.NamespacedName
healthManagerPod *corev1.Pod
rsyslogPod *corev1.Pod
existingLabeledPod *corev1.Pod
)

BeforeEach(func() {
configMapName = types.NamespacedName{
Name: "octavia-hmport-map",
Namespace: namespace,
}

// Create configmap with test data
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName.Name,
Namespace: configMapName.Namespace,
},
Data: map[string]string{
"hm_worker-1": "172.23.0.100",
"hm_worker-2": "172.23.0.101",
"rsyslog_worker-1": "172.23.0.200",
"rsyslog_worker-2": "172.23.0.201",
},
}
Expect(k8sClient.Create(ctx, configMap)).To(Succeed())
DeferCleanup(k8sClient.Delete, ctx, configMap)

// Create test pods
healthManagerPod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("healthmanager-pod-%s", uuid.New().String()[:8]),
Namespace: namespace,
Labels: map[string]string{
common.AppSelector: "octavia-healthmanager",
},
},
Spec: corev1.PodSpec{
NodeName: "worker-1",
Containers: []corev1.Container{{
Name: "test-container",
Image: "test-image",
}},
},
}
Expect(k8sClient.Create(ctx, healthManagerPod)).To(Succeed())
DeferCleanup(k8sClient.Delete, ctx, healthManagerPod)

rsyslogPod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("rsyslog-pod-%s", uuid.New().String()[:8]),
Namespace: namespace,
Labels: map[string]string{
common.AppSelector: "octavia-rsyslog",
},
},
Spec: corev1.PodSpec{
NodeName: "worker-2",
Containers: []corev1.Container{{
Name: "test-container",
Image: "test-image",
}},
},
}
Expect(k8sClient.Create(ctx, rsyslogPod)).To(Succeed())
DeferCleanup(k8sClient.Delete, ctx, rsyslogPod)

// Create pod with existing predictableip label
existingLabeledPod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("existing-labeled-pod-%s", uuid.New().String()[:8]),
Namespace: namespace,
Labels: map[string]string{
common.AppSelector: "octavia-rsyslog",
"predictableip": "existing-ip",
},
},
Spec: corev1.PodSpec{
NodeName: "worker-1",
Containers: []corev1.Container{{
Name: "test-container",
Image: "test-image",
}},
},
}
Expect(k8sClient.Create(ctx, existingLabeledPod)).To(Succeed())
DeferCleanup(k8sClient.Delete, ctx, existingLabeledPod)
})

Context("HandlePodLabeling function", func() {
It("should label healthmanager pods with hm_ IP addresses", func() {
// Create helper
dummyInstance := &octaviav1.OctaviaAmphoraController{
ObjectMeta: metav1.ObjectMeta{
Name: "test-healthmanager",
Namespace: namespace,
},
}
h, err := helper.NewHelper(
dummyInstance,
k8sClient,
nil, // No kclient needed for this test
k8sClient.Scheme(),
zap.New(zap.UseDevMode(true)), // Test logger
)
Expect(err).NotTo(HaveOccurred())

config := controllers.PodLabelingConfig{
ConfigMapName: octavia.HmConfigMap,
IPKeyPrefix: "hm_",
ServiceName: "octavia-healthmanager",
}

err = controllers.HandlePodLabeling(ctx, h, "octavia-healthmanager", namespace, config)
Expect(err).NotTo(HaveOccurred())

// Verify the pod got labeled correctly
Eventually(func(g Gomega) {
pod := &corev1.Pod{}
err := k8sClient.Get(ctx, types.NamespacedName{
Name: healthManagerPod.Name,
Namespace: namespace,
}, pod)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Labels).To(HaveKeyWithValue("predictableip", "172.23.0.100"))
}, timeout, interval).Should(Succeed())
})

It("should label rsyslog pods with rsyslog_ IP addresses", func() {
// Create helper
dummyInstance := &octaviav1.OctaviaAmphoraController{
ObjectMeta: metav1.ObjectMeta{
Name: "test-rsyslog",
Namespace: namespace,
},
}
h, err := helper.NewHelper(
dummyInstance,
k8sClient,
nil, // No kclient needed for this test
k8sClient.Scheme(),
zap.New(zap.UseDevMode(true)), // Test logger
)
Expect(err).NotTo(HaveOccurred())

config := controllers.PodLabelingConfig{
ConfigMapName: octavia.HmConfigMap,
IPKeyPrefix: "rsyslog_",
ServiceName: "octavia-rsyslog",
}

err = controllers.HandlePodLabeling(ctx, h, "octavia-rsyslog", namespace, config)
Expect(err).NotTo(HaveOccurred())

// Verify the pod got labeled correctly
Eventually(func(g Gomega) {
pod := &corev1.Pod{}
err := k8sClient.Get(ctx, types.NamespacedName{
Name: rsyslogPod.Name,
Namespace: namespace,
}, pod)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Labels).To(HaveKeyWithValue("predictableip", "172.23.0.201"))
}, timeout, interval).Should(Succeed())
})

It("should skip pods that already have predictableip labels", func() {
// Create helper
dummyInstance := &octaviav1.OctaviaAmphoraController{
ObjectMeta: metav1.ObjectMeta{
Name: "test-existing",
Namespace: namespace,
},
}
h, err := helper.NewHelper(
dummyInstance,
k8sClient,
nil, // No kclient needed for this test
k8sClient.Scheme(),
zap.New(zap.UseDevMode(true)), // Test logger
)
Expect(err).NotTo(HaveOccurred())

config := controllers.PodLabelingConfig{
ConfigMapName: octavia.HmConfigMap,
IPKeyPrefix: "rsyslog_",
ServiceName: "octavia-rsyslog",
}

err = controllers.HandlePodLabeling(ctx, h, "octavia-rsyslog", namespace, config)
Expect(err).NotTo(HaveOccurred())

// Verify the existing label wasn't changed
Eventually(func(g Gomega) {
pod := &corev1.Pod{}
err := k8sClient.Get(ctx, types.NamespacedName{
Name: existingLabeledPod.Name,
Namespace: namespace,
}, pod)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pod.Labels).To(HaveKeyWithValue("predictableip", "existing-ip"))
}, timeout, interval).Should(Succeed())
})
})
})