Skip to content

Commit 7c2f311

Browse files
committed
Periodically cycle to reassert cluster configuration
This the first step to dropping the drift controller.
1 parent a6c0672 commit 7c2f311

File tree

2 files changed

+47
-24
lines changed

2 files changed

+47
-24
lines changed

operator/internal/controller/vectorized/cluster_controller.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func (r *ClusterReconciler) Reconcile(
275275
return ctrl.Result{}, err
276276
}
277277

278-
err = r.reconcileConfiguration(
278+
delay, err := r.reconcileConfiguration(
279279
ctx,
280280
&vectorizedCluster,
281281
cm,
@@ -327,7 +327,13 @@ func (r *ClusterReconciler) Reconcile(
327327
r.decommissionGhostBrokers(ctx, &vectorizedCluster, log, ar)
328328
}
329329

330-
return ctrl.Result{}, nil
330+
// Finally: re-enqueue for another pass
331+
if delay == 0 {
332+
delay = time.Minute
333+
}
334+
return ctrl.Result{
335+
RequeueAfter: delay,
336+
}, nil
331337
}
332338

333339
// SetupWithManager sets up the controller with the Manager.

operator/internal/controller/vectorized/cluster_controller_configuration.go

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"fmt"
1515
"net/http"
1616
"strings"
17+
"time"
1718

1819
"github.com/cockroachdb/errors"
1920
"github.com/go-logr/logr"
@@ -40,52 +41,52 @@ func (r *ClusterReconciler) reconcileConfiguration(
4041
pki *certmanager.PkiReconciler,
4142
fqdn string,
4243
l logr.Logger,
43-
) error {
44+
) (time.Duration, error) {
4445
log := l.WithName("reconcileConfiguration")
4546
errorWithContext := newErrorWithContext(redpandaCluster.Namespace, redpandaCluster.Name)
4647
if !featuregates.CentralizedConfiguration(redpandaCluster.Spec.Version) {
4748
log.Info("Cluster is not using centralized configuration, skipping...")
48-
return nil
49+
return 0, nil
4950
}
5051

5152
if added, err := r.ensureConditionPresent(ctx, redpandaCluster, log); err != nil || added {
5253
// If condition is added or error returned, we wait for another reconcile loop
53-
return err
54+
return 0, err
5455
}
5556

56-
if redpandaCluster.Status.GetConditionStatus(vectorizedv1alpha1.ClusterConfiguredConditionType) == corev1.ConditionTrue {
57-
log.Info("Cluster configuration is synchronized")
58-
return nil
57+
if delay := r.ratelimitCondition(redpandaCluster, vectorizedv1alpha1.ClusterConfiguredConditionType); delay > 0 {
58+
log.Info("Waiting to reassert cluster configuration")
59+
return delay, nil
5960
}
6061

6162
config, err := configMapResource.CreateConfiguration(ctx)
6263
if err != nil {
63-
return errorWithContext(err, "error while creating the configuration")
64+
return 0, errorWithContext(err, "error while creating the configuration")
6465
}
6566

6667
adminAPI, err := r.AdminAPIClientFactory(ctx, r, redpandaCluster, fqdn, pki.AdminAPIConfigProvider(), r.Dialer)
6768
if err != nil {
68-
return errorWithContext(err, "error creating the admin API client")
69+
return 0, errorWithContext(err, "error creating the admin API client")
6970
}
7071

7172
schema, _, _, err := r.retrieveClusterState(ctx, redpandaCluster, adminAPI)
7273
if err != nil {
73-
return err
74+
return 0, err
7475
}
7576

7677
lastAppliedConfiguration, err := r.getOrInitLastAppliedConfiguration(ctx, configMapResource, config, redpandaCluster.Namespace, schema)
7778
if err != nil {
78-
return errorWithContext(err, "could not load the last applied configuration")
79+
return 0, errorWithContext(err, "could not load the last applied configuration")
7980
}
8081

8182
// Checking if the feature is active because in the initial stages of cluster creation, it takes time for the feature to be activated
8283
// and the API returns the same error (400) that is returned in case of malformed input, which causes a stop of the reconciliation
8384
var centralConfigActive bool
8485
if centralConfigActive, err = adminutils.IsFeatureActive(ctx, adminAPI, adminutils.CentralConfigFeatureName); err != nil {
85-
return errorWithContext(err, "could not determine if central config is active in the cluster")
86+
return 0, errorWithContext(err, "could not determine if central config is active in the cluster")
8687
} else if !centralConfigActive {
8788
log.Info("Waiting for the centralized configuration feature to be active in the cluster")
88-
return &resources.RequeueAfterError{
89+
return 0, &resources.RequeueAfterError{
8990
RequeueAfter: resources.RequeueDuration,
9091
Msg: "centralized configuration feature not active",
9192
}
@@ -94,7 +95,7 @@ func (r *ClusterReconciler) reconcileConfiguration(
9495
patchSuccess, err := r.applyPatchIfNeeded(ctx, redpandaCluster, adminAPI, config, schema, log)
9596
if err != nil || !patchSuccess {
9697
// patchSuccess=false indicates an error set on the condition that should not be propagated (but we terminate reconciliation anyway)
97-
return err
98+
return 0, err
9899
}
99100

100101
// TODO a failure and restart here (after successful patch, before setting the last applied configuration) may lead to inconsistency if the user
@@ -108,40 +109,55 @@ func (r *ClusterReconciler) reconcileConfiguration(
108109
}
109110
hash, hashChanged, err := r.checkCentralizedConfigurationHashChange(ctx, redpandaCluster, config, schema, lastAppliedConfiguration, statefulSetResource)
110111
if err != nil {
111-
return err
112+
return 0, err
112113
} else if hashChanged {
113114
// Definitely needs restart
114115
log.Info("Centralized configuration hash has changed")
115116
if err = statefulSetResource.SetCentralizedConfigurationHashInCluster(ctx, hash); err != nil {
116-
return errorWithContext(err, "could not update config hash on statefulset")
117+
return 0, errorWithContext(err, "could not update config hash on statefulset")
117118
}
118119
}
119120
}
120121

121122
// Now we can mark the new lastAppliedConfiguration for next update
122123
properties, err := config.ConcreteConfiguration(ctx, r.Client, redpandaCluster.Namespace, schema)
123124
if err != nil {
124-
return errorWithContext(err, "could not concretize configuration to store last applied configuration in the cluster")
125+
return 0, errorWithContext(err, "could not concretize configuration to store last applied configuration in the cluster")
125126
}
126127
if err = configMapResource.SetLastAppliedConfigurationInCluster(ctx, properties); err != nil {
127-
return errorWithContext(err, "could not store last applied configuration in the cluster")
128+
return 0, errorWithContext(err, "could not store last applied configuration in the cluster")
128129
}
129130

130131
// Synchronized status with cluster, including triggering a restart if needed
131132
conditionData, err := r.synchronizeStatusWithCluster(ctx, redpandaCluster, statefulSetResources, adminAPI, log)
132133
if err != nil {
133-
return err
134+
return 0, err
134135
}
135136

136137
// If condition is not met, we need to reschedule, waiting for the cluster to heal.
137138
if conditionData.Status != corev1.ConditionTrue {
138-
return &resources.RequeueAfterError{
139+
return 0, &resources.RequeueAfterError{
139140
RequeueAfter: resources.RequeueDuration,
140141
Msg: fmt.Sprintf("cluster configuration is not in sync (%s): %s", conditionData.Reason, conditionData.Message),
141142
}
142143
}
143144

144-
return nil
145+
return 0, nil
146+
}
147+
148+
// ratelimitCondition ensures that the reassertion of cluster configuration is done
149+
// once every minute or so, but no more rapidly than that.
150+
// (This is modelled on the v2 operator - we should look to merge these utilities.)
151+
// Rather than boolean blindness, if we should rate-limit then this will return a
152+
// non-zero minimum wait duration.
153+
func (r *ClusterReconciler) ratelimitCondition(rp *vectorizedv1alpha1.Cluster, conditionType vectorizedv1alpha1.ClusterConditionType) time.Duration {
154+
cond := rp.Status.GetCondition(conditionType)
155+
if cond == nil {
156+
return 0
157+
}
158+
recheckAfter := time.Minute - time.Since(cond.LastTransitionTime.Time)
159+
160+
return max(0, recheckAfter)
145161
}
146162

147163
// getOrInitLastAppliedConfiguration gets the last applied configuration to the cluster or creates it when missing.
@@ -409,8 +425,9 @@ func mapStatusToCondition(
409425
if condition == nil {
410426
// Everything is ok
411427
condition = &vectorizedv1alpha1.ClusterCondition{
412-
Type: vectorizedv1alpha1.ClusterConfiguredConditionType,
413-
Status: corev1.ConditionTrue,
428+
Type: vectorizedv1alpha1.ClusterConfiguredConditionType,
429+
Status: corev1.ConditionTrue,
430+
Message: fmt.Sprintf("Cluster configuration reasserted at %s", time.Now().UTC().Format(time.DateTime)),
414431
}
415432
}
416433
return *condition

0 commit comments

Comments
 (0)