Skip to content

Commit 4466e47

Browse files
authored
Controlling Rolling Updates with the Partition Controller (#862)
* add partition-update-interval option Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * add description of --partition-update-interval option Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * add new params Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * refactor create manager process Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * refactor test suite Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * delete finalizers before test execution Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * add test suite with updateInterval Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * refactor test suites Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * fix log if retry partition update Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * fix variable name Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * add nil check Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * refactor partition-update-interval Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * refactor mutex lock/unlock Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * refactor to use RateLimiter Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * add a comparison of creationtimestamp test Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * delete duplicate log Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * delete unnecessary condition Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * add retry count metrics Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * fix metrics name Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * Update docs/moco-controller.md Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * register new metrics Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * add metrics test Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * fix variable name Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * update option's description Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> * fix comment Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp> --------- Signed-off-by: Kento Takeuchi <kento-takeuchi@cybozu.co.jp>
1 parent fd856f0 commit 4466e47

File tree

9 files changed

+186
-115
lines changed

9 files changed

+186
-115
lines changed

cmd/moco-controller/cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ var config struct {
3939
interval time.Duration
4040
maxConcurrentReconciles int
4141
mySQLConfigMapHistoryLimit int
42+
partitionUpdateInterval time.Duration
4243
qps int
4344
zapOpts zap.Options
4445
}
@@ -111,6 +112,7 @@ func init() {
111112
fs.DurationVar(&config.interval, "check-interval", 1*time.Minute, "Interval of cluster maintenance")
112113
fs.IntVar(&config.maxConcurrentReconciles, "max-concurrent-reconciles", 8, "The maximum number of concurrent reconciles which can be run")
113114
fs.IntVar(&config.mySQLConfigMapHistoryLimit, "mysql-configmap-history-limit", 10, "The maximum number of MySQLConfigMap's history to be kept")
115+
fs.DurationVar(&config.partitionUpdateInterval, "partition-update-interval", 0*time.Millisecond, "The minimum update interval for partitions (e.g., 5s, 100ms)")
114116
// The default QPS is 20.
115117
// https://github.com/kubernetes-sigs/controller-runtime/blob/a26de2d610c3cf4b2a02688534aaf5a65749c743/pkg/client/config/config.go#L84-L85
116118
fs.IntVar(&config.qps, "apiserver-qps-throttle", 20, "The maximum QPS to the API server.")

cmd/moco-controller/cmd/run.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/cybozu-go/moco/pkg/cert"
1212
"github.com/cybozu-go/moco/pkg/dbop"
1313
"github.com/cybozu-go/moco/pkg/metrics"
14+
"golang.org/x/time/rate"
1415
corev1 "k8s.io/api/core/v1"
1516
"k8s.io/apimachinery/pkg/runtime"
1617
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -124,6 +125,8 @@ func subMain(ns, addr string, port int) error {
124125
Client: mgr.GetClient(),
125126
Recorder: mgr.GetEventRecorderFor("moco-controller"),
126127
MaxConcurrentReconciles: config.maxConcurrentReconciles,
128+
UpdateInterval: config.partitionUpdateInterval,
129+
RateLimiter: rate.NewLimiter(rate.Every(config.partitionUpdateInterval), 1),
127130
}).SetupWithManager(mgr); err != nil {
128131
setupLog.Error(err, "unable to create controller", "controller", "Partition")
129132
return err

controllers/partition_controller.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strings"
88
"time"
99

10+
"golang.org/x/time/rate"
1011
appsv1 "k8s.io/api/apps/v1"
1112
corev1 "k8s.io/api/core/v1"
1213
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -37,6 +38,8 @@ type StatefulSetPartitionReconciler struct {
3738
client.Client
3839
Recorder record.EventRecorder
3940
MaxConcurrentReconciles int
41+
UpdateInterval time.Duration
42+
RateLimiter *rate.Limiter
4043
}
4144

4245
//+kubebuilder:rbac:groups=moco.cybozu.com,resources=mysqlclusters,verbs=get;list;watch
@@ -51,6 +54,11 @@ type StatefulSetPartitionReconciler struct {
5154
func (r *StatefulSetPartitionReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
5255
log := crlog.FromContext(ctx)
5356

57+
if !r.RateLimiter.Allow() {
58+
metrics.PartitionUpdateRetriesTotalVec.WithLabelValues(req.Namespace).Inc()
59+
return reconcile.Result{RequeueAfter: r.UpdateInterval}, nil
60+
}
61+
5462
sts := &appsv1.StatefulSet{}
5563
err := r.Get(ctx, req.NamespacedName, sts)
5664
if err != nil {
@@ -93,7 +101,6 @@ func (r *StatefulSetPartitionReconciler) Reconcile(ctx context.Context, req reco
93101
return reconcile.Result{}, err
94102
}
95103

96-
log.Info("partition is updated")
97104
metrics.LastPartitionUpdatedVec.WithLabelValues(cluster.Name, cluster.Namespace).SetToCurrentTime()
98105

99106
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil

controllers/partition_controller_test.go

Lines changed: 153 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,22 @@ import (
77
"sort"
88
"time"
99

10+
mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
11+
"github.com/cybozu-go/moco/pkg/constants"
12+
"github.com/cybozu-go/moco/pkg/metrics"
1013
. "github.com/onsi/ginkgo/v2"
1114
. "github.com/onsi/gomega"
15+
prometheusutil "github.com/prometheus/client_golang/prometheus/testutil"
16+
"golang.org/x/time/rate"
1217
appsv1 "k8s.io/api/apps/v1"
1318
corev1 "k8s.io/api/core/v1"
1419
"k8s.io/apimachinery/pkg/api/meta"
1520
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1621
"k8s.io/client-go/util/retry"
1722
"k8s.io/utils/ptr"
18-
19-
mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
20-
"github.com/cybozu-go/moco/pkg/constants"
2123
ctrl "sigs.k8s.io/controller-runtime"
2224
"sigs.k8s.io/controller-runtime/pkg/client"
25+
"sigs.k8s.io/controller-runtime/pkg/config"
2326
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
2427
)
2528

@@ -156,131 +159,172 @@ func rolloutPods(ctx context.Context, sts *appsv1.StatefulSet, rev1 int, rev2 in
156159
}
157160
}
158161

162+
func setupNewManager(ctx context.Context, updateInterval time.Duration) context.CancelFunc {
163+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
164+
Scheme: scheme,
165+
LeaderElection: false,
166+
Metrics: metricsserver.Options{
167+
BindAddress: "0",
168+
},
169+
Controller: config.Controller{
170+
SkipNameValidation: ptr.To(true),
171+
},
172+
})
173+
Expect(err).ToNot(HaveOccurred())
174+
175+
r := &StatefulSetPartitionReconciler{
176+
Client: mgr.GetClient(),
177+
Recorder: mgr.GetEventRecorderFor("moco-controller"),
178+
UpdateInterval: updateInterval,
179+
RateLimiter: rate.NewLimiter(rate.Every(updateInterval), 1),
180+
}
181+
err = r.SetupWithManager(mgr)
182+
Expect(err).ToNot(HaveOccurred())
183+
184+
ctx, cancel := context.WithCancel(ctx)
185+
go func() {
186+
err := mgr.Start(ctx)
187+
if err != nil {
188+
panic(err)
189+
}
190+
}()
191+
return cancel
192+
}
193+
194+
func testUpdatePartition(ctx context.Context, updateInterval time.Duration) {
195+
cluster := testNewMySQLCluster("partition")
196+
err := k8sClient.Create(ctx, cluster)
197+
Expect(err).NotTo(HaveOccurred())
198+
meta.SetStatusCondition(&cluster.Status.Conditions,
199+
metav1.Condition{
200+
Type: mocov1beta2.ConditionHealthy,
201+
Status: metav1.ConditionTrue,
202+
Reason: "healthy",
203+
},
204+
)
205+
cluster.Status.ReconcileInfo.Generation = 1
206+
err = k8sClient.Status().Update(ctx, cluster)
207+
Expect(err).NotTo(HaveOccurred())
208+
209+
sts := testNewStatefulSet(cluster)
210+
err = k8sClient.Create(ctx, sts)
211+
Expect(err).NotTo(HaveOccurred())
212+
sts.Status = appsv1.StatefulSetStatus{
213+
ObservedGeneration: 1,
214+
CurrentRevision: "rev1",
215+
UpdateRevision: "rev1",
216+
Replicas: 3,
217+
UpdatedReplicas: 3,
218+
}
219+
err = k8sClient.Status().Update(ctx, sts)
220+
Expect(err).NotTo(HaveOccurred())
221+
222+
for _, pod := range testNewPods(sts) {
223+
err = k8sClient.Create(ctx, pod)
224+
Expect(err).NotTo(HaveOccurred())
225+
pod.Status = corev1.PodStatus{
226+
Phase: corev1.PodRunning,
227+
Conditions: []corev1.PodCondition{
228+
{
229+
Type: corev1.PodReady,
230+
Status: corev1.ConditionTrue,
231+
Reason: "PodReady",
232+
LastTransitionTime: metav1.Time{
233+
Time: time.Now().Add(-24 * time.Hour),
234+
},
235+
},
236+
},
237+
}
238+
err = k8sClient.Status().Update(ctx, pod)
239+
Expect(err).NotTo(HaveOccurred())
240+
}
241+
242+
Eventually(func() error {
243+
sts := &appsv1.StatefulSet{}
244+
key := client.ObjectKey{Namespace: "partition", Name: "moco-test"}
245+
if err := k8sClient.Get(ctx, key, sts); err != nil {
246+
return err
247+
}
248+
if sts.Spec.UpdateStrategy.RollingUpdate == nil {
249+
return errors.New("partition is nil")
250+
}
251+
252+
switch *sts.Spec.UpdateStrategy.RollingUpdate.Partition {
253+
case 3:
254+
rolloutPods(ctx, sts, 2, 1)
255+
case 2:
256+
rolloutPods(ctx, sts, 1, 2)
257+
case 1:
258+
rolloutPods(ctx, sts, 0, 3)
259+
case 0:
260+
return nil
261+
}
262+
263+
return fmt.Errorf("unexpected partition: %d", *sts.Spec.UpdateStrategy.RollingUpdate.Partition)
264+
}).Should(Succeed())
265+
266+
events := &corev1.EventList{}
267+
err = k8sClient.List(ctx, events, client.InNamespace("partition"))
268+
Expect(err).NotTo(HaveOccurred())
269+
sort.Slice(events.Items, func(i, j int) bool {
270+
return events.Items[i].CreationTimestamp.Before(&events.Items[j].CreationTimestamp)
271+
})
272+
Expect(events.Items).To(HaveLen(3))
273+
Expect(events.Items[0].Message).To(Equal("Updated partition from 3 to 2"))
274+
Expect(events.Items[1].Message).To(Equal("Updated partition from 2 to 1"))
275+
Expect(events.Items[2].Message).To(Equal("Updated partition from 1 to 0"))
276+
277+
if updateInterval > 0 {
278+
for i := 1; i < len(events.Items); i++ {
279+
interval := events.Items[i].CreationTimestamp.Sub(events.Items[i-1].CreationTimestamp.Time)
280+
Expect(interval).To(BeNumerically(">=", updateInterval))
281+
}
282+
retryCount := prometheusutil.CollectAndCount(metrics.PartitionUpdateRetriesTotalVec)
283+
Expect(retryCount).To(BeNumerically(">", 0))
284+
}
285+
}
286+
159287
var _ = Describe("StatefulSet reconciler", func() {
160288
ctx := context.Background()
161289
var stopFunc func()
162290

163291
BeforeEach(func() {
164-
err := k8sClient.DeleteAllOf(ctx, &mocov1beta2.MySQLCluster{}, client.InNamespace("partition"))
292+
cs := &mocov1beta2.MySQLClusterList{}
293+
err := k8sClient.List(ctx, cs, client.InNamespace("partition"))
294+
Expect(err).NotTo(HaveOccurred())
295+
for _, cluster := range cs.Items {
296+
cluster.Finalizers = nil
297+
err := k8sClient.Update(ctx, &cluster)
298+
Expect(err).NotTo(HaveOccurred())
299+
}
300+
301+
err = k8sClient.DeleteAllOf(ctx, &corev1.Event{}, client.InNamespace("partition"))
302+
Expect(err).NotTo(HaveOccurred())
303+
err = k8sClient.DeleteAllOf(ctx, &mocov1beta2.MySQLCluster{}, client.InNamespace("partition"))
165304
Expect(err).NotTo(HaveOccurred())
166305
err = k8sClient.DeleteAllOf(ctx, &appsv1.StatefulSet{}, client.InNamespace("partition"))
167306
Expect(err).NotTo(HaveOccurred())
168307
err = k8sClient.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace("partition"))
169308
Expect(err).NotTo(HaveOccurred())
170309

171-
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
172-
Scheme: scheme,
173-
LeaderElection: false,
174-
Metrics: metricsserver.Options{
175-
BindAddress: "0",
176-
},
177-
})
178-
Expect(err).ToNot(HaveOccurred())
179-
180-
r := &StatefulSetPartitionReconciler{
181-
Client: mgr.GetClient(),
182-
Recorder: mgr.GetEventRecorderFor("moco-controller"),
183-
}
184-
err = r.SetupWithManager(mgr)
185-
Expect(err).ToNot(HaveOccurred())
186-
187-
ctx, cancel := context.WithCancel(ctx)
188-
stopFunc = cancel
189-
go func() {
190-
err := mgr.Start(ctx)
191-
if err != nil {
192-
panic(err)
193-
}
194-
}()
195310
time.Sleep(100 * time.Millisecond)
196311
})
197312

198313
AfterEach(func() {
199-
stopFunc()
314+
if stopFunc != nil {
315+
stopFunc()
316+
}
200317
time.Sleep(100 * time.Millisecond)
201318
})
202319

203-
It("should partition to 0", func() {
204-
cluster := testNewMySQLCluster("partition")
205-
err := k8sClient.Create(ctx, cluster)
206-
Expect(err).NotTo(HaveOccurred())
207-
meta.SetStatusCondition(&cluster.Status.Conditions,
208-
metav1.Condition{
209-
Type: mocov1beta2.ConditionHealthy,
210-
Status: metav1.ConditionTrue,
211-
Reason: "healthy",
320+
Context("with different update intervals", func() {
321+
DescribeTable("should partition to 0",
322+
func(updateInterval time.Duration) {
323+
stopFunc = setupNewManager(ctx, updateInterval)
324+
testUpdatePartition(ctx, updateInterval)
212325
},
326+
Entry("without interval", 0*time.Millisecond),
327+
Entry("with 1000ms interval", 1000*time.Millisecond),
213328
)
214-
cluster.Status.ReconcileInfo.Generation = 1
215-
err = k8sClient.Status().Update(ctx, cluster)
216-
Expect(err).NotTo(HaveOccurred())
217-
218-
sts := testNewStatefulSet(cluster)
219-
err = k8sClient.Create(ctx, sts)
220-
Expect(err).NotTo(HaveOccurred())
221-
sts.Status = appsv1.StatefulSetStatus{
222-
ObservedGeneration: 1,
223-
CurrentRevision: "rev1",
224-
UpdateRevision: "rev1",
225-
Replicas: 3,
226-
UpdatedReplicas: 3,
227-
}
228-
err = k8sClient.Status().Update(ctx, sts)
229-
Expect(err).NotTo(HaveOccurred())
230-
231-
for _, pod := range testNewPods(sts) {
232-
err = k8sClient.Create(ctx, pod)
233-
Expect(err).NotTo(HaveOccurred())
234-
pod.Status = corev1.PodStatus{
235-
Phase: corev1.PodRunning,
236-
Conditions: []corev1.PodCondition{
237-
{
238-
Type: corev1.PodReady,
239-
Status: corev1.ConditionTrue,
240-
Reason: "PodReady",
241-
LastTransitionTime: metav1.Time{
242-
Time: time.Now().Add(-24 * time.Hour),
243-
},
244-
},
245-
},
246-
}
247-
err = k8sClient.Status().Update(ctx, pod)
248-
Expect(err).NotTo(HaveOccurred())
249-
}
250-
251-
Eventually(func() error {
252-
sts := &appsv1.StatefulSet{}
253-
key := client.ObjectKey{Namespace: "partition", Name: "moco-test"}
254-
if err := k8sClient.Get(ctx, key, sts); err != nil {
255-
return err
256-
}
257-
if sts.Spec.UpdateStrategy.RollingUpdate == nil {
258-
return errors.New("partition is nil")
259-
}
260-
261-
switch *sts.Spec.UpdateStrategy.RollingUpdate.Partition {
262-
case 3:
263-
rolloutPods(ctx, sts, 2, 1)
264-
case 2:
265-
rolloutPods(ctx, sts, 1, 2)
266-
case 1:
267-
rolloutPods(ctx, sts, 0, 3)
268-
case 0:
269-
return nil
270-
}
271-
272-
return fmt.Errorf("unexpected partition: %d", *sts.Spec.UpdateStrategy.RollingUpdate.Partition)
273-
}).Should(Succeed())
274-
275-
events := &corev1.EventList{}
276-
err = k8sClient.List(ctx, events, client.InNamespace("partition"))
277-
Expect(err).NotTo(HaveOccurred())
278-
sort.Slice(events.Items, func(i, j int) bool {
279-
return events.Items[i].CreationTimestamp.Before(&events.Items[j].CreationTimestamp)
280-
})
281-
Expect(events.Items).To(HaveLen(3))
282-
Expect(events.Items[0].Message).To(Equal("Updated partition from 3 to 2"))
283-
Expect(events.Items[1].Message).To(Equal("Updated partition from 2 to 1"))
284-
Expect(events.Items[2].Message).To(Equal("Updated partition from 1 to 0"))
285329
})
286330
})

docs/metrics.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ Aside from [the standard Go runtime and process metrics][standard], it exposes m
1717
All these metrics are prefixed with `moco_cluster_` and have `name` and `namespace` labels.
1818

1919
| Name | Description | Type |
20-
| ----------------------------------- | ---------------------------------------------------------------------- | --------- |
20+
|-------------------------------------|------------------------------------------------------------------------|-----------|
2121
| `checks_total` | The number of times MOCO checked the cluster | Counter |
2222
| `errors_total` | The number of times MOCO encountered errors when managing the cluster | Counter |
2323
| `available` | 1 if the cluster is available, 0 otherwise | Gauge |
@@ -29,6 +29,7 @@ All these metrics are prefixed with `moco_cluster_` and have `name` and `namespa
2929
| `current_replicas` | The number of current replicas | Gauge |
3030
| `updated_replicas` | The number of updated replicas | Gauge |
3131
| `last_partition_updated` | The timestamp of the last successful partition update | Gauge |
32+
| `partition_update_retries_total` | The number of retries for partition updates | Counter |
3233
| `clustering_stopped` | 1 if the cluster is clustering stopped, 0 otherwise | Gauge |
3334
| `reconciliation_stopped` | 1 if the cluster is reconciliation stopped, 0 otherwise | Gauge |
3435
| `errant_replicas` | The number of mysqld instances that have [errant transactions][errant] | Gauge |

docs/moco-controller.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Flags:
4040
--skip_headers If true, avoid header prefixes in the log messages
4141
--skip_log_headers If true, avoid headers when opening log files (no effect when -logtostderr=true)
4242
--stderrthreshold severity logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=true) (default 2)
43+
--partition-update-interval duration The minimum update interval for partitions (e.g., 5s, 100ms) (default: 0 ms)
4344
-v, --v Level number for the log level verbosity
4445
--version version for moco-controller
4546
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging

0 commit comments

Comments
 (0)