Skip to content

Commit dbce689

Browse files
authored
Merge pull request #2063 from jotak/tolerations
NETOBSERV-2434: fix default tolerations, and a few other issues
2 parents 8b503cc + fed9281 commit dbce689

File tree

9 files changed

+236
-14
lines changed

9 files changed

+236
-14
lines changed

api/flowcollector/v1beta2/flowcollector_validation_webhook.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"net"
8+
"reflect"
89
"slices"
910
"strconv"
1011
"strings"
@@ -214,12 +215,29 @@ func validatePortString(s string) (uint16, error) {
214215
}
215216

216217
func (v *validator) validateFLP() {
218+
v.validateScheduling()
217219
v.validateFLPLogTypes()
218220
v.validateFLPFilters()
219221
v.validateFLPAlerts()
220222
v.validateFLPMetricsForAlerts()
221223
}
222224

225+
func (v *validator) validateScheduling() {
226+
if v.fc.DeploymentModel == DeploymentModelDirect {
227+
// In direct mode, agent and FLP scheduling should be consistent, to ensure the 1-1 relation
228+
var agent, flp *SchedulingConfig
229+
if v.fc.Agent.EBPF.Advanced != nil {
230+
agent = v.fc.Agent.EBPF.Advanced.Scheduling
231+
}
232+
if v.fc.Processor.Advanced != nil {
233+
flp = v.fc.Processor.Advanced.Scheduling
234+
}
235+
if !reflect.DeepEqual(agent, flp) {
236+
v.warnings = append(v.warnings, "Mismatch detected between spec.agent.ebpf.advanced.scheduling and spec.processor.advanced.scheduling. In Direct mode, it can lead to inconsistent pod scheduling that would result in errors in the flow collection process.")
237+
}
238+
}
239+
}
240+
223241
func (v *validator) validateFLPLogTypes() {
224242
if v.fc.Processor.LogTypes != nil && *v.fc.Processor.LogTypes == LogTypeAll {
225243
v.warnings = append(v.warnings, "Enabling all log types (in spec.processor.logTypes) has a high impact on resources footprint")

api/flowcollector/v1beta2/flowcollector_validation_webhook_test.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/netobserv/network-observability-operator/internal/pkg/cluster"
88
"github.com/stretchr/testify/assert"
9+
corev1 "k8s.io/api/core/v1"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1011
"k8s.io/apimachinery/pkg/util/intstr"
1112
"k8s.io/utils/ptr"
@@ -728,6 +729,165 @@ func TestValidateFLP(t *testing.T) {
728729
}
729730
}
730731

732+
func TestValidateScheduling(t *testing.T) {
733+
mismatchWarning := "Mismatch detected between spec.agent.ebpf.advanced.scheduling and spec.processor.advanced.scheduling. In Direct mode, it can lead to inconsistent pod scheduling that would result in errors in the flow collection process."
734+
tests := []struct {
735+
name string
736+
fc *FlowCollector
737+
expectedError string
738+
expectedWarnings admission.Warnings
739+
ocpVersion string
740+
}{
741+
{
742+
name: "Valid default (Direct)",
743+
fc: &FlowCollector{
744+
ObjectMeta: metav1.ObjectMeta{
745+
Name: "cluster",
746+
},
747+
Spec: FlowCollectorSpec{
748+
DeploymentModel: DeploymentModelDirect,
749+
},
750+
},
751+
},
752+
{
753+
name: "Invalid Agent scheduling",
754+
fc: &FlowCollector{
755+
ObjectMeta: metav1.ObjectMeta{
756+
Name: "cluster",
757+
},
758+
Spec: FlowCollectorSpec{
759+
DeploymentModel: DeploymentModelDirect,
760+
Agent: FlowCollectorAgent{
761+
EBPF: FlowCollectorEBPF{
762+
Advanced: &AdvancedAgentConfig{
763+
Scheduling: &SchedulingConfig{
764+
Tolerations: []corev1.Toleration{{Key: "key"}},
765+
},
766+
},
767+
},
768+
},
769+
},
770+
},
771+
expectedWarnings: admission.Warnings{mismatchWarning},
772+
},
773+
{
774+
name: "Invalid FLP scheduling",
775+
fc: &FlowCollector{
776+
ObjectMeta: metav1.ObjectMeta{
777+
Name: "cluster",
778+
},
779+
Spec: FlowCollectorSpec{
780+
DeploymentModel: DeploymentModelDirect,
781+
Processor: FlowCollectorFLP{
782+
Advanced: &AdvancedProcessorConfig{
783+
Scheduling: &SchedulingConfig{
784+
Tolerations: []corev1.Toleration{{Key: "key"}},
785+
},
786+
},
787+
},
788+
},
789+
},
790+
expectedWarnings: admission.Warnings{mismatchWarning},
791+
},
792+
{
793+
name: "Invalid FLP and Agent scheduling",
794+
fc: &FlowCollector{
795+
ObjectMeta: metav1.ObjectMeta{
796+
Name: "cluster",
797+
},
798+
Spec: FlowCollectorSpec{
799+
DeploymentModel: DeploymentModelDirect,
800+
Agent: FlowCollectorAgent{
801+
EBPF: FlowCollectorEBPF{
802+
Advanced: &AdvancedAgentConfig{
803+
Scheduling: &SchedulingConfig{
804+
Tolerations: []corev1.Toleration{{Key: "key1"}},
805+
},
806+
},
807+
},
808+
},
809+
Processor: FlowCollectorFLP{
810+
Advanced: &AdvancedProcessorConfig{
811+
Scheduling: &SchedulingConfig{
812+
Tolerations: []corev1.Toleration{{Key: "key2"}},
813+
},
814+
},
815+
},
816+
},
817+
},
818+
expectedWarnings: admission.Warnings{mismatchWarning},
819+
},
820+
{
821+
name: "Valid FLP and Agent scheduling",
822+
fc: &FlowCollector{
823+
ObjectMeta: metav1.ObjectMeta{
824+
Name: "cluster",
825+
},
826+
Spec: FlowCollectorSpec{
827+
DeploymentModel: DeploymentModelDirect,
828+
Agent: FlowCollectorAgent{
829+
EBPF: FlowCollectorEBPF{
830+
Advanced: &AdvancedAgentConfig{
831+
Scheduling: &SchedulingConfig{
832+
Tolerations: []corev1.Toleration{{Key: "same_key"}},
833+
},
834+
},
835+
},
836+
},
837+
Processor: FlowCollectorFLP{
838+
Advanced: &AdvancedProcessorConfig{
839+
Scheduling: &SchedulingConfig{
840+
Tolerations: []corev1.Toleration{{Key: "same_key"}},
841+
},
842+
},
843+
},
844+
},
845+
},
846+
},
847+
{
848+
name: "Valid default (Kafka)",
849+
fc: &FlowCollector{
850+
ObjectMeta: metav1.ObjectMeta{
851+
Name: "cluster",
852+
},
853+
Spec: FlowCollectorSpec{
854+
DeploymentModel: DeploymentModelKafka,
855+
},
856+
},
857+
},
858+
{
859+
name: "No inconsistent scheduling with Kafka",
860+
fc: &FlowCollector{
861+
ObjectMeta: metav1.ObjectMeta{
862+
Name: "cluster",
863+
},
864+
Spec: FlowCollectorSpec{
865+
Processor: FlowCollectorFLP{
866+
Advanced: &AdvancedProcessorConfig{
867+
Scheduling: &SchedulingConfig{
868+
Tolerations: []corev1.Toleration{{Key: "key"}},
869+
},
870+
},
871+
},
872+
},
873+
},
874+
},
875+
}
876+
877+
CurrentClusterInfo = &cluster.Info{}
878+
r := FlowCollector{}
879+
for _, test := range tests {
880+
CurrentClusterInfo.MockOpenShiftVersion(test.ocpVersion)
881+
warnings, err := r.Validate(context.TODO(), test.fc)
882+
if test.expectedError == "" {
883+
assert.NoError(t, err, test.name)
884+
} else {
885+
assert.ErrorContains(t, err, test.expectedError, test.name)
886+
}
887+
assert.Equal(t, test.expectedWarnings, warnings, test.name)
888+
}
889+
}
890+
731891
func TestElligibleMetrics(t *testing.T) {
732892
met, tot := GetElligibleMetricsForAlert(AlertPacketDropsByKernel, &AlertVariant{
733893
GroupBy: GroupByNamespace,

internal/controller/consoleplugin/consoleplugin_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,19 @@ func TestContainerUpdateCheck(t *testing.T) {
191191
assert.Contains(report.String(), "Volumes changed")
192192
old = nEw
193193

194+
// new toleration
195+
spec.ConsolePlugin.Advanced = &flowslatest.AdvancedPluginConfig{
196+
Scheduling: &flowslatest.SchedulingConfig{
197+
Tolerations: []corev1.Toleration{{Key: "dummy-key", Operator: corev1.TolerationOpExists}},
198+
},
199+
}
200+
builder = getBuilder(&spec, &loki)
201+
nEw = builder.deployment(constants.PluginName, "digest")
202+
report = helper.NewChangeReport("")
203+
assert.True(helper.PodChanged(&old.Spec.Template, &nEw.Spec.Template, constants.PluginName, &report))
204+
assert.Contains(report.String(), "Toleration changed")
205+
old = nEw
206+
194207
// test again no change
195208
loki.LokiManualParams.TLS.CACert.Name = "cm-name-2"
196209
builder = getBuilder(&spec, &loki)

internal/controller/ebpf/agent_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ func (c *AgentController) envConfig(ctx context.Context, coll *flowslatest.FlowC
460460
}
461461
} else {
462462
config = append(config, corev1.EnvVar{Name: envExport, Value: exportGRPC})
463-
advancedConfig := helper.GetAdvancedProcessorConfig(coll.Spec.Processor.Advanced)
463+
advancedConfig := helper.GetAdvancedProcessorConfig(&coll.Spec)
464464
// When flowlogs-pipeline is deployed as a daemonset, each agent must send
465465
// data to the pod that is deployed in the same host
466466
config = append(config, corev1.EnvVar{

internal/controller/flp/flp_common_objects.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ const (
3535

3636
func newGRPCPipeline(desired *flowslatest.FlowCollectorSpec) config.PipelineBuilderStage {
3737
return config.NewGRPCPipeline("grpc", api.IngestGRPCProto{
38-
Port: int(*helper.GetAdvancedProcessorConfig(desired.Processor.Advanced).Port),
38+
Port: int(*helper.GetAdvancedProcessorConfig(desired).Port),
3939
})
4040
}
4141

@@ -80,7 +80,7 @@ func podTemplate(
8080
hasHostPort, hostNetwork bool,
8181
annotations map[string]string,
8282
) corev1.PodTemplateSpec {
83-
advancedConfig := helper.GetAdvancedProcessorConfig(desired.Processor.Advanced)
83+
advancedConfig := helper.GetAdvancedProcessorConfig(desired)
8484
var ports []corev1.ContainerPort
8585
if hasHostPort {
8686
ports = []corev1.ContainerPort{{
@@ -231,7 +231,7 @@ func metricsSettings(desired *flowslatest.FlowCollectorSpec, vol *volumes.Builde
231231

232232
func getStaticJSONConfig(desired *flowslatest.FlowCollectorSpec, vol *volumes.Builder, promTLS *flowslatest.CertificateReference, pipeline *PipelineBuilder, dynCMName string) (string, error) {
233233
metricsSettings := metricsSettings(desired, vol, promTLS)
234-
advancedConfig := helper.GetAdvancedProcessorConfig(desired.Processor.Advanced)
234+
advancedConfig := helper.GetAdvancedProcessorConfig(desired)
235235
config := map[string]interface{}{
236236
"log-level": desired.Processor.LogLevel,
237237
"health": map[string]interface{}{

internal/controller/flp/flp_pipeline_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ func (b *PipelineBuilder) addConnectionTracking(lastStage config.PipelineBuilder
553553
// Connection tracking stage (only if LogTypes is not FLOWS)
554554
if b.desired.Processor.HasConntrack() {
555555
outputRecordTypes := helper.GetRecordTypes(&b.desired.Processor)
556-
advancedConfig := helper.GetAdvancedProcessorConfig(b.desired.Processor.Advanced)
556+
advancedConfig := helper.GetAdvancedProcessorConfig(b.desired)
557557
lastStage = lastStage.ConnTrack("extract_conntrack", api.ConnTrack{
558558
KeyDefinition: api.KeyDefinition{
559559
FieldGroups: []api.FieldGroup{

internal/controller/flp/flp_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,3 +794,23 @@ func TestLabels(t *testing.T) {
794794
assert.Equal("flowlogs-pipeline-transformer-monitor", smTrans.Name)
795795
assert.Equal("flowlogs-pipeline-transformer", smTrans.Spec.Selector.MatchLabels["app"])
796796
}
797+
798+
func TestToleration(t *testing.T) {
799+
assert := assert.New(t)
800+
801+
cfg := getConfig()
802+
cfgKafka := cfg
803+
cfgKafka.DeploymentModel = flowslatest.DeploymentModelKafka
804+
info := reconcilers.Common{Namespace: "ns", ClusterInfo: &cluster.Info{}}
805+
builder, _ := newMonolithBuilder(info.NewInstance(image, status.Instance{}), &cfg, &metricslatest.FlowMetricList{}, nil)
806+
tBuilder, _ := newTransfoBuilder(info.NewInstance(image, status.Instance{}), &cfgKafka, &metricslatest.FlowMetricList{}, nil)
807+
808+
// Deployment: no specific toleration
809+
depl := tBuilder.deployment(annotate("digest"))
810+
assert.Len(depl.Spec.Template.Spec.Tolerations, 0)
811+
812+
// DaemonSet: has toleration
813+
ds := builder.daemonSet(annotate("digest"))
814+
assert.Len(ds.Spec.Template.Spec.Tolerations, 1)
815+
assert.Equal(corev1.Toleration{Operator: "Exists"}, ds.Spec.Template.Spec.Tolerations[0])
816+
}

internal/pkg/helper/comparators.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ func assignationChanged(old, n *corev1.PodTemplateSpec, report *ChangeReport) bo
102102
}
103103
return true
104104
}
105+
if !deepEqual(n.Spec.Tolerations, old.Spec.Tolerations) {
106+
if report != nil {
107+
report.Add("Toleration changed")
108+
}
109+
return true
110+
}
105111
if !deepDerivative(n.Spec.Affinity, old.Spec.Affinity) {
106112
if report != nil {
107113
report.Add("Affinity changed")

internal/pkg/helper/flowcollector.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,10 @@ func GetAdvancedAgentConfig(specConfig *flowslatest.AdvancedAgentConfig) flowsla
132132
cfg.Env = specConfig.Env
133133
}
134134
if specConfig.Scheduling != nil {
135-
if len(specConfig.Scheduling.NodeSelector) > 0 {
135+
if specConfig.Scheduling.NodeSelector != nil {
136136
cfg.Scheduling.NodeSelector = specConfig.Scheduling.NodeSelector
137137
}
138-
if len(specConfig.Scheduling.Tolerations) > 0 {
138+
if specConfig.Scheduling.Tolerations != nil {
139139
cfg.Scheduling.Tolerations = specConfig.Scheduling.Tolerations
140140
}
141141
if specConfig.Scheduling.Affinity != nil {
@@ -150,7 +150,11 @@ func GetAdvancedAgentConfig(specConfig *flowslatest.AdvancedAgentConfig) flowsla
150150
return cfg
151151
}
152152

153-
func GetAdvancedProcessorConfig(specConfig *flowslatest.AdvancedProcessorConfig) flowslatest.AdvancedProcessorConfig {
153+
func GetAdvancedProcessorConfig(spec *flowslatest.FlowCollectorSpec) flowslatest.AdvancedProcessorConfig {
154+
var defaultToleration []corev1.Toleration
155+
if !UseKafka(spec) { // TODO: with coming-soon Service deploymentModel, this should be replaced with "UseHostNetwork" (conflict should pop up while rebasing).
156+
defaultToleration = []corev1.Toleration{{Operator: corev1.TolerationOpExists}}
157+
}
154158
cfg := flowslatest.AdvancedProcessorConfig{
155159
Env: map[string]string{},
156160
Port: ptr.To(GetFieldDefaultInt32(ProcessorAdvancedPath, "port")),
@@ -162,12 +166,13 @@ func GetAdvancedProcessorConfig(specConfig *flowslatest.AdvancedProcessorConfig)
162166
ConversationTerminatingTimeout: ptr.To(GetFieldDefaultDuration(ProcessorAdvancedPath, "conversationTerminatingTimeout")),
163167
Scheduling: &flowslatest.SchedulingConfig{
164168
NodeSelector: map[string]string{},
165-
Tolerations: []corev1.Toleration{{Operator: corev1.TolerationOpExists}},
169+
Tolerations: defaultToleration,
166170
Affinity: nil,
167171
PriorityClassName: "",
168172
},
169173
}
170174

175+
specConfig := spec.Processor.Advanced
171176
if specConfig != nil {
172177
if len(specConfig.Env) > 0 {
173178
cfg.Env = specConfig.Env
@@ -197,10 +202,10 @@ func GetAdvancedProcessorConfig(specConfig *flowslatest.AdvancedProcessorConfig)
197202
cfg.ConversationTerminatingTimeout = specConfig.ConversationTerminatingTimeout
198203
}
199204
if specConfig.Scheduling != nil {
200-
if len(specConfig.Scheduling.NodeSelector) > 0 {
205+
if specConfig.Scheduling.NodeSelector != nil {
201206
cfg.Scheduling.NodeSelector = specConfig.Scheduling.NodeSelector
202207
}
203-
if len(specConfig.Scheduling.Tolerations) > 0 {
208+
if specConfig.Scheduling.Tolerations != nil {
204209
cfg.Scheduling.Tolerations = specConfig.Scheduling.Tolerations
205210
}
206211
if specConfig.Scheduling.Affinity != nil {
@@ -249,7 +254,7 @@ func GetAdvancedPluginConfig(specConfig *flowslatest.AdvancedPluginConfig) flows
249254
Port: ptr.To(GetFieldDefaultInt32(PluginAdvancedPath, "port")),
250255
Scheduling: &flowslatest.SchedulingConfig{
251256
NodeSelector: map[string]string{},
252-
Tolerations: []corev1.Toleration{{Operator: corev1.TolerationOpExists}},
257+
Tolerations: nil,
253258
Affinity: nil,
254259
PriorityClassName: "",
255260
},
@@ -269,10 +274,10 @@ func GetAdvancedPluginConfig(specConfig *flowslatest.AdvancedPluginConfig) flows
269274
cfg.Port = specConfig.Port
270275
}
271276
if specConfig.Scheduling != nil {
272-
if len(specConfig.Scheduling.NodeSelector) > 0 {
277+
if specConfig.Scheduling.NodeSelector != nil {
273278
cfg.Scheduling.NodeSelector = specConfig.Scheduling.NodeSelector
274279
}
275-
if len(specConfig.Scheduling.Tolerations) > 0 {
280+
if specConfig.Scheduling.Tolerations != nil {
276281
cfg.Scheduling.Tolerations = specConfig.Scheduling.Tolerations
277282
}
278283
if specConfig.Scheduling.Affinity != nil {

0 commit comments

Comments
 (0)