diff --git a/.gitignore b/.gitignore index 418465b5de..c8da11a0ee 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,9 @@ jsonnet/vendor/ /tmp +# Test artifacts - temporary files generated during test runs +test/**/e2e-test-monitor-*.json + # These are empty target files, created on every docker build. Their sole # purpose is to track the last target execution time to evaluate, whether the # container needs to be rebuilt diff --git a/pkg/alert/cluster_monitoring_controller.go b/pkg/alert/cluster_monitoring_controller.go new file mode 100644 index 0000000000..f545696edb --- /dev/null +++ b/pkg/alert/cluster_monitoring_controller.go @@ -0,0 +1,160 @@ +// Copyright 2025 The Cluster Monitoring Operator Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alert + +import ( + "context" + "fmt" + + configv1alpha1 "github.com/openshift/api/config/v1alpha1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "github.com/openshift/cluster-monitoring-operator/pkg/client" +) + +const ( + controllerName = "cluster-monitoring" +) + +// ClusterMonitoringController is a minimal controller for ClusterMonitoring resources. +type ClusterMonitoringController struct { + client *client.Client + queue workqueue.TypedRateLimitingInterface[string] + informer cache.SharedIndexInformer +} + +// NewClusterMonitoringController returns a new minimal ClusterMonitoringController. +func NewClusterMonitoringController(ctx context.Context, client *client.Client, version string) (*ClusterMonitoringController, error) { + informer := cache.NewSharedIndexInformer( + client.ClusterMonitoringListWatch(), + &configv1alpha1.ClusterMonitoring{}, + resyncPeriod, + cache.Indexers{}, + ) + + queue := workqueue.NewTypedRateLimitingQueueWithConfig[string]( + workqueue.NewTypedItemExponentialFailureRateLimiter[string](queueBaseDelay, queueMaxDelay), + workqueue.TypedRateLimitingQueueConfig[string]{Name: controllerName}, + ) + + controller := &ClusterMonitoringController{ + client: client, + queue: queue, + informer: informer, + } + + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleAdd, + UpdateFunc: controller.handleUpdate, + DeleteFunc: controller.handleDelete, + }) + if err != nil { + return nil, err + } + + return controller, nil +} + +// Run starts the controller. +func (c *ClusterMonitoringController) Run(ctx context.Context, workers int) { + klog.Info("Starting ClusterMonitoring controller") + defer c.queue.ShutDown() + + go c.informer.Run(ctx.Done()) + + if !cache.WaitForNamedCacheSync("ClusterMonitoring controller", ctx.Done(), c.informer.HasSynced) { + klog.Error("Failed to sync ClusterMonitoring controller cache") + return + } + + for i := 0; i < workers; i++ { + go c.worker(ctx) + } + + klog.Info("ClusterMonitoring controller started") + <-ctx.Done() + klog.Info("ClusterMonitoring controller stopped") +} + +func (c *ClusterMonitoringController) worker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *ClusterMonitoringController) processNextWorkItem(ctx context.Context) bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + if err := c.sync(ctx, key); err != nil { + utilruntime.HandleError(fmt.Errorf("error syncing ClusterMonitoring (%s): %w", key, err)) + c.queue.AddRateLimited(key) + return true + } + + klog.V(4).Infof("ClusterMonitoring successfully synced: %s", key) + c.queue.Forget(key) + return true +} + +func (c *ClusterMonitoringController) sync(ctx context.Context, key string) error { + klog.Infof("🎉 ClusterMonitoring controller processing: %s", key) + + // For now, just log that we saw the resource + // Later we'll add the actual reconciliation logic + + return nil +} + +func (c *ClusterMonitoringController) handleAdd(obj interface{}) { + key, ok := c.keyFunc(obj) + if !ok { + return + } + klog.Infof("ClusterMonitoring added: %s", key) + c.queue.Add(key) +} + +func (c *ClusterMonitoringController) handleUpdate(oldObj, newObj interface{}) { + key, ok := c.keyFunc(newObj) + if !ok { + return + } + klog.Infof("ClusterMonitoring updated: %s", key) + c.queue.Add(key) +} + +func (c *ClusterMonitoringController) handleDelete(obj interface{}) { + key, ok := c.keyFunc(obj) + if !ok { + return + } + klog.Infof("ClusterMonitoring deleted: %s", key) + c.queue.Add(key) +} + +func (c *ClusterMonitoringController) keyFunc(obj interface{}) (string, bool) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + klog.Errorf("Creating key for ClusterMonitoring object failed: %v", err) + return key, false + } + return key, true +} diff --git a/pkg/client/client.go b/pkg/client/client.go index e9058276f3..6fd6809219 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -417,6 +417,15 @@ func (c *Client) ClusterOperatorListWatch(ctx context.Context, name string) *cac } } +func (c *Client) ClusterMonitoringListWatch() *cache.ListWatch { + return cache.NewListWatchFromClient( + c.oscclient.ConfigV1alpha1().RESTClient(), + "clustermonitorings", + "", + fields.Everything(), + ) +} + func (c *Client) HasRouteCapability(ctx context.Context) (bool, error) { _, err := c.oscclient.ConfigV1().ClusterOperators().Get(ctx, "ingress", metav1.GetOptions{}) if apierrors.IsNotFound(err) { diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 91f8e6ecc7..6487819c2d 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -202,8 +202,9 @@ type Operator struct { assets *manifests.Assets - ruleController *alert.RuleController - relabelController *alert.RelabelConfigController + ruleController *alert.RuleController + relabelController *alert.RelabelConfigController + clusterMonitoringController *alert.ClusterMonitoringController } func New( @@ -252,6 +253,11 @@ func New( return nil, fmt.Errorf("failed to create alert relabel config controller: %w", err) } + clusterMonitoringController, err := alert.NewClusterMonitoringController(ctx, c, version) + if err != nil { + return nil, fmt.Errorf("failed to create cluster monitoring controller: %w", err) + } + o := &Operator{ images: images, telemetryMatches: telemetryMatches, @@ -265,12 +271,13 @@ func New( workqueue.NewTypedItemExponentialFailureRateLimiter[string](50*time.Millisecond, 3*time.Minute), workqueue.TypedRateLimitingQueueConfig[string]{Name: "cluster-monitoring"}, ), - informers: make([]cache.SharedIndexInformer, 0), - assets: a, - informerFactories: make([]informers.SharedInformerFactory, 0), - controllersToRunFunc: make([]func(context.Context, int), 0), - ruleController: ruleController, - relabelController: relabelController, + informers: make([]cache.SharedIndexInformer, 0), + assets: a, + informerFactories: make([]informers.SharedInformerFactory, 0), + controllersToRunFunc: make([]func(context.Context, int), 0), + ruleController: ruleController, + relabelController: relabelController, + clusterMonitoringController: clusterMonitoringController, } informer := cache.NewSharedIndexInformer( @@ -532,6 +539,7 @@ func New( csrMetricsServerController.Run, o.ruleController.Run, o.relabelController.Run, + o.clusterMonitoringController.Run, ) return o, nil diff --git a/test/e2e/cluster_monitoring_test.go b/test/e2e/cluster_monitoring_test.go new file mode 100644 index 0000000000..66bfa1c544 --- /dev/null +++ b/test/e2e/cluster_monitoring_test.go @@ -0,0 +1,149 @@ +// Copyright 2025 The Cluster Monitoring Operator Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "testing" + "time" + + configv1alpha1 "github.com/openshift/api/config/v1alpha1" + "github.com/openshift/cluster-monitoring-operator/test/e2e/framework" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + clusterMonitoringName = "cluster" +) + +func TestClusterMonitoring(t *testing.T) { + ctx := context.Background() + clusterMonitorings := f.OpenShiftConfigClient.ConfigV1alpha1().ClusterMonitorings() + + // Check if the ClusterMonitoring CRD is available (feature gate enabled) + _, err := clusterMonitorings.List(ctx, metav1.ListOptions{Limit: 1}) + if err != nil { + if apierrors.IsNotFound(err) { + t.Skip("ClusterMonitoring CRD not available - ClusterMonitoringConfig feature gate may not be enabled") + return + } + t.Fatalf("unexpected error checking ClusterMonitoring CRD availability: %v", err) + } + + // Clean up any existing test resource first + _ = clusterMonitorings.Delete(ctx, clusterMonitoringName, metav1.DeleteOptions{}) + + time.Sleep(2 * time.Second) + + cm := &configv1alpha1.ClusterMonitoring{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterMonitoringName, + Labels: map[string]string{ + framework.E2eTestLabelName: framework.E2eTestLabelValue, + }, + }, + Spec: configv1alpha1.ClusterMonitoringSpec{ + AlertmanagerConfig: configv1alpha1.AlertmanagerConfig{ + DeploymentMode: configv1alpha1.AlertManagerDeployModeDefaultConfig, + }, + }, + } + + t.Log("Creating ClusterMonitoring resource...") + createdCM, err := clusterMonitorings.Create(ctx, cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create ClusterMonitoring: %v", err) + } + + defer func() { + t.Log("Cleaning up ClusterMonitoring resource...") + err := clusterMonitorings.Delete(ctx, clusterMonitoringName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + t.Errorf("Failed to delete ClusterMonitoring: %v", err) + } + }() + + t.Logf("✅ Successfully created ClusterMonitoring resource: %s", createdCM.Name) + + err = wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) { + retrievedCM, err := clusterMonitorings.Get(ctx, clusterMonitoringName, metav1.GetOptions{}) + if err != nil { + t.Logf("Waiting for ClusterMonitoring to be available: %v", err) + return false, nil + } + + if retrievedCM.Spec.AlertmanagerConfig.DeploymentMode != configv1alpha1.AlertManagerDeployModeDefaultConfig { + t.Logf("Waiting for correct AlertmanagerConfig.DeploymentMode, got: %s", retrievedCM.Spec.AlertmanagerConfig.DeploymentMode) + return false, nil + } + + t.Logf("✅ ClusterMonitoring resource retrieved successfully with correct spec") + return true, nil + }) + + if err != nil { + t.Fatalf("ClusterMonitoring resource was not properly created or retrieved: %v", err) + } + + t.Log("Testing ClusterMonitoring resource update...") + err = wait.PollImmediate(2*time.Second, 30*time.Second, func() (bool, error) { + currentCM, err := clusterMonitorings.Get(ctx, clusterMonitoringName, metav1.GetOptions{}) + if err != nil { + return false, err + } + + currentCM.Spec.AlertmanagerConfig.DeploymentMode = configv1alpha1.AlertManagerDeployModeCustomConfig + currentCM.Spec.AlertmanagerConfig.CustomConfig = configv1alpha1.AlertmanagerCustomConfig{ + LogLevel: configv1alpha1.LogLevelInfo, + } + + _, err = clusterMonitorings.Update(ctx, currentCM, metav1.UpdateOptions{}) + if err != nil { + t.Logf("Retrying update due to: %v", err) + return false, nil // Retry on conflict + } + + return true, nil + }) + + if err != nil { + t.Fatalf("Failed to update ClusterMonitoring: %v", err) + } + + updatedCM, err := clusterMonitorings.Get(ctx, clusterMonitoringName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get updated ClusterMonitoring: %v", err) + } + + if updatedCM.Spec.AlertmanagerConfig.DeploymentMode != configv1alpha1.AlertManagerDeployModeCustomConfig { + t.Errorf("Expected DeploymentMode to be CustomConfig, got: %s", updatedCM.Spec.AlertmanagerConfig.DeploymentMode) + } + + if updatedCM.Spec.AlertmanagerConfig.CustomConfig.LogLevel != configv1alpha1.LogLevelInfo { + t.Errorf("Expected LogLevel to be Info, got: %s", updatedCM.Spec.AlertmanagerConfig.CustomConfig.LogLevel) + } + + t.Log("✅ ClusterMonitoring resource updated successfully") + + // TODO: Once the controller is integrated into the operator + // - Controller processes the ClusterMonitoring resource + // - Appropriate Alertmanager resources are created/updated/deleted + // - Controller logs show the resource was processed + // For now, this test verifies the CRD CRUD operations + + t.Log("✅ ClusterMonitoring e2e test completed successfully") +}