Skip to content

Commit 462e533

Browse files
committed
feat: reconciler integration test
1 parent 54d2534 commit 462e533

File tree

9 files changed

+383
-75
lines changed

9 files changed

+383
-75
lines changed

.github/workflows/ci.yaml

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,15 @@ jobs:
5656
run: make lint
5757
- name: Integration test
5858
run: make integration-test coverage
59-
- name: Archive coverage report
60-
uses: actions/upload-artifact@v4
59+
- name: Upload coverage reports to Codecov
60+
uses: codecov/codecov-action@v5
6161
with:
62-
name: coverage
63-
path: ${{ env.CODECOV_FILE }}
64-
62+
token: ${{ secrets.CODECOV_TOKEN }}
63+
files: ${{ env.CODECOV_FILE }}
6564
e2e:
6665
name: E2E Test
67-
if: github.actor!= 'dependabot-preview[bot]'
66+
if: false
67+
# if: github.actor!= 'dependabot-preview[bot]'
6868
needs:
6969
- verify
7070
- image-build
@@ -111,7 +111,8 @@ jobs:
111111

112112
coverage:
113113
name: Coverage Report
114-
if: github.actor!= 'dependabot-preview[bot]'
114+
if: false
115+
# if: github.actor!= 'dependabot-preview[bot]'
115116
needs:
116117
- verify
117118
- e2e

e2e/cluster_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ func TestCluster(t *testing.T) {
2626
})
2727

2828
t.Run("scale", func(t *testing.T) {
29-
scale(t, kcl, cluster, 5)
30-
poll(t, kcl, cluster, 2*time.Minute, available)
29+
Scale(t, kcl, cluster, 5)
30+
Poll(t, kcl, cluster, 2*time.Minute, Available)
3131

3232
t.Logf("cluster %q scaled up to 5 members", cluster.Name)
3333

34-
scale(t, kcl, cluster, 3)
35-
poll(t, kcl, cluster, time.Minute, available)
34+
Scale(t, kcl, cluster, 3)
35+
Poll(t, kcl, cluster, time.Minute, Available)
3636

3737
t.Logf("cluster %q scaled down to 3 members", cluster.Name)
3838
})
@@ -60,8 +60,8 @@ func TestCluster(t *testing.T) {
6060
}
6161
t.Log("evict pod", key)
6262

63-
poll(t, kcl, cluster, 2*time.Minute, func(cluster *apiv1.EtcdCluster) bool {
64-
if !available(cluster) {
63+
Poll(t, kcl, cluster, 2*time.Minute, func(cluster *apiv1.EtcdCluster) bool {
64+
if !Available(cluster) {
6565
return false
6666
}
6767

@@ -85,7 +85,7 @@ func TestCluster(t *testing.T) {
8585
}
8686

8787
version = strings.TrimPrefix(version, "v")
88-
poll(t, kcl, cluster, 5*time.Minute, func(cluster *apiv1.EtcdCluster) bool {
88+
Poll(t, kcl, cluster, 5*time.Minute, func(cluster *apiv1.EtcdCluster) bool {
8989
if !conditions.StatusTrue(cluster.Status.Conditions, apiv1.ClusterAvailable) {
9090
return false
9191
}

e2e/kube_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
autoscalingv1 "k8s.io/api/autoscaling/v1"
1717
batchv1 "k8s.io/api/batch/v1"
1818
corev1 "k8s.io/api/core/v1"
19+
apierrors "k8s.io/apimachinery/pkg/api/errors"
1920
"k8s.io/apimachinery/pkg/api/resource"
2021
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2122
"k8s.io/apimachinery/pkg/runtime"
@@ -147,14 +148,14 @@ func createCluster(t testing.TB, kcl client.Client, timeout time.Duration, spec
147148
key := client.ObjectKeyFromObject(cluster)
148149
t.Logf("cluster %q created", key)
149150

150-
poll(t, kcl, cluster, timeout, available)
151+
Poll(t, kcl, cluster, timeout, Available)
151152

152153
t.Logf("cluster %q available", key)
153154

154155
return cluster
155156
}
156157

157-
func poll[T client.Object](t testing.TB, kcl client.Client, obj T, timeout time.Duration, f func(obj T) bool) {
158+
func Poll[T client.Object](t testing.TB, kcl client.Client, obj T, timeout time.Duration, f func(obj T) bool) {
158159
t.Helper()
159160

160161
ctx, cancel := context.WithTimeout(t.Context(), timeout)
@@ -173,11 +174,11 @@ func poll[T client.Object](t testing.TB, kcl client.Client, obj T, timeout time.
173174
}
174175
}
175176

176-
func available(cluster *apiv1.EtcdCluster) bool {
177+
func Available(cluster *apiv1.EtcdCluster) bool {
177178
return cluster.Status.AvailableReplicas == cluster.Spec.Replicas
178179
}
179180

180-
func scale(t testing.TB, kcl client.Client, obj client.Object, replicas int32) {
181+
func Scale(t testing.TB, kcl client.Client, obj client.Object, replicas int32) {
181182
scale := &autoscalingv1.Scale{}
182183
err := kcl.SubResource("scale").Get(t.Context(), obj, scale)
183184
if err != nil {
@@ -226,7 +227,11 @@ func triggerCronJob(t testing.TB, kcl client.Client, key client.ObjectKey, timeo
226227
t.Logf("created job %q", client.ObjectKeyFromObject(job))
227228

228229
err = wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
229-
if err := kcl.Get(ctx, client.ObjectKeyFromObject(job), job); err != nil {
230+
err = kcl.Get(ctx, client.ObjectKeyFromObject(job), job)
231+
switch {
232+
case apierrors.IsTooManyRequests(err):
233+
return false, err
234+
case err != nil:
230235
return true, fmt.Errorf("get backup job: %w", err)
231236
}
232237

pkg/metrics/observer.go

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"fmt"
66
"sync/atomic"
77

8+
"sigs.k8s.io/controller-runtime/pkg/client"
9+
810
"go.opentelemetry.io/otel/attribute"
911
"go.opentelemetry.io/otel/metric"
1012

@@ -20,16 +22,14 @@ type Observer struct {
2022
backupLastScheduleTime, backupLastSuccessfulTime metric.Int64ObservableGauge
2123
}
2224

23-
func Register(meter metric.Meter, cluster *apiv1.EtcdCluster) (*Observer, error) {
24-
observer := &Observer{
25+
func Register(meter metric.Meter, key client.ObjectKey) (*Observer, error) {
26+
o := &Observer{
2527
attributes: attribute.NewSet(
26-
attribute.String("fleet.etcd.cluster.namespace", cluster.Namespace),
27-
attribute.String("fleet.etcd.cluster.name", cluster.Name),
28+
attribute.String("fleet.etcd.cluster.namespace", key.Namespace),
29+
attribute.String("fleet.etcd.cluster.name", key.Name),
2830
),
2931
}
3032

31-
observer.cluster.Store(cluster)
32-
3333
gauges := []struct {
3434
name string
3535
description string
@@ -38,42 +38,42 @@ func Register(meter metric.Meter, cluster *apiv1.EtcdCluster) (*Observer, error)
3838
{
3939
name: "fleet.etcd.cluster.desired_replicas",
4040
description: "Number of desired replicas",
41-
dest: &observer.desiredReplicas,
41+
dest: &o.desiredReplicas,
4242
},
4343
{
4444
name: "fleet.etcd.cluster.replicas",
4545
description: "Number of replicas",
46-
dest: &observer.replicas,
46+
dest: &o.replicas,
4747
},
4848
{
4949
name: "fleet.etcd.cluster.ready_replicas",
5050
description: "Number of ready replicas",
51-
dest: &observer.readyReplicas,
51+
dest: &o.readyReplicas,
5252
},
5353
{
5454
name: "fleet.etcd.cluster.updated_replicas",
5555
description: "Number of updated replicas",
56-
dest: &observer.updatedReplicas,
56+
dest: &o.updatedReplicas,
5757
},
5858
{
5959
name: "fleet.etcd.cluster.available_replicas",
6060
description: "Number of available replicas",
61-
dest: &observer.availableReplicas,
61+
dest: &o.availableReplicas,
6262
},
6363
{
6464
name: "fleet.etcd.cluster.learner_replicas",
6565
description: "Number of learner replicas",
66-
dest: &observer.learnerReplicas,
66+
dest: &o.learnerReplicas,
6767
},
6868
{
6969
name: "fleet.etcd.cluster.backup.last_schedule_time",
7070
description: "Last backup schedule time",
71-
dest: &observer.backupLastScheduleTime,
71+
dest: &o.backupLastScheduleTime,
7272
},
7373
{
7474
name: "fleet.etcd.cluster.backup.last_successful_time",
7575
description: "Last backup successful time",
76-
dest: &observer.backupLastSuccessfulTime,
76+
dest: &o.backupLastSuccessfulTime,
7777
},
7878
}
7979

@@ -88,40 +88,42 @@ func Register(meter metric.Meter, cluster *apiv1.EtcdCluster) (*Observer, error)
8888
observables = append(observables, observable)
8989
}
9090

91-
registration, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
92-
cluster := observer.cluster.Load()
93-
if cluster == nil {
94-
return nil
95-
}
91+
registration, err := meter.RegisterCallback(o.Observe, observables...)
92+
if err != nil {
93+
return nil, fmt.Errorf("register callback: %w", err)
94+
}
9695

97-
opts := []metric.ObserveOption{
98-
metric.WithAttributeSet(observer.attributes),
99-
}
96+
o.registration = registration
10097

101-
o.ObserveInt64(observer.desiredReplicas, int64(cluster.Spec.Replicas), opts...)
102-
o.ObserveInt64(observer.replicas, int64(cluster.Status.Replicas), opts...)
103-
o.ObserveInt64(observer.readyReplicas, int64(cluster.Status.ReadyReplicas), opts...)
104-
o.ObserveInt64(observer.updatedReplicas, int64(cluster.Status.UpdatedReplicas), opts...)
105-
o.ObserveInt64(observer.availableReplicas, int64(cluster.Status.AvailableReplicas), opts...)
106-
o.ObserveInt64(observer.learnerReplicas, int64(cluster.Status.LearnerReplicas), opts...)
98+
return o, nil
99+
}
107100

108-
if cluster.Status.Backup != nil && cluster.Status.Backup.LastScheduleTime != nil {
109-
o.ObserveInt64(observer.backupLastScheduleTime, cluster.Status.Backup.LastScheduleTime.Unix(), opts...)
110-
}
101+
func (o *Observer) Observe(ctx context.Context, observer metric.Observer) error {
102+
cluster := o.cluster.Load()
103+
if cluster == nil {
104+
return nil
105+
}
111106

112-
if cluster.Status.Backup != nil && cluster.Status.Backup.LastSuccessfulTime != nil {
113-
o.ObserveInt64(observer.backupLastSuccessfulTime, cluster.Status.Backup.LastSuccessfulTime.Unix(), opts...)
114-
}
107+
opts := []metric.ObserveOption{
108+
metric.WithAttributeSet(o.attributes),
109+
}
115110

116-
return nil
117-
}, observables...)
118-
if err != nil {
119-
return nil, fmt.Errorf("register callback: %w", err)
111+
observer.ObserveInt64(o.desiredReplicas, int64(cluster.Spec.Replicas), opts...)
112+
observer.ObserveInt64(o.replicas, int64(cluster.Status.Replicas), opts...)
113+
observer.ObserveInt64(o.readyReplicas, int64(cluster.Status.ReadyReplicas), opts...)
114+
observer.ObserveInt64(o.updatedReplicas, int64(cluster.Status.UpdatedReplicas), opts...)
115+
observer.ObserveInt64(o.availableReplicas, int64(cluster.Status.AvailableReplicas), opts...)
116+
observer.ObserveInt64(o.learnerReplicas, int64(cluster.Status.LearnerReplicas), opts...)
117+
118+
if cluster.Status.Backup != nil && cluster.Status.Backup.LastScheduleTime != nil {
119+
observer.ObserveInt64(o.backupLastScheduleTime, cluster.Status.Backup.LastScheduleTime.Unix(), opts...)
120120
}
121121

122-
observer.registration = registration
122+
if cluster.Status.Backup != nil && cluster.Status.Backup.LastSuccessfulTime != nil {
123+
observer.ObserveInt64(o.backupLastSuccessfulTime, cluster.Status.Backup.LastSuccessfulTime.Unix(), opts...)
124+
}
123125

124-
return observer, nil
126+
return nil
125127
}
126128

127129
func (o *Observer) Update(cluster *apiv1.EtcdCluster) {

pkg/metrics/observer_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1212
"k8s.io/utils/ptr"
13+
"sigs.k8s.io/controller-runtime/pkg/client"
1314
"sigs.k8s.io/yaml"
1415

1516
"go.opentelemetry.io/otel/sdk/metric"
@@ -60,7 +61,9 @@ func TestObserver(t *testing.T) {
6061
},
6162
},
6263
}
63-
observer, err := Register(meter, cluster)
64+
65+
key := client.ObjectKeyFromObject(cluster)
66+
observer, err := Register(meter, key)
6467
if err != nil {
6568
t.Fatal("register:", err)
6669
}
@@ -72,6 +75,7 @@ func TestObserver(t *testing.T) {
7275
})
7376

7477
metrics := &metricdata.ResourceMetrics{}
78+
observer.Update(cluster)
7579
err = reader.Collect(t.Context(), metrics)
7680
if err != nil {
7781
t.Fatal("collect:", err)

pkg/metrics/reconciler.go

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package metrics
22

33
import (
44
"context"
5+
"sync"
56

67
"go.opentelemetry.io/otel/metric"
78
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -17,8 +18,7 @@ type Reconciler struct {
1718
kcl client.Client
1819
meter metric.Meter
1920

20-
// no lock required - workqueue is stingy preventing concurrent reconciles of same resource
21-
// https://pkg.go.dev/k8s.io/client-go/util/workqueue
21+
mtx sync.Mutex
2222
observers map[client.ObjectKey]*Observer
2323
}
2424

@@ -31,6 +31,7 @@ func SetupWithManager(mgr manager.Manager, meterProvider metric.MeterProvider) e
3131
}
3232

3333
return builder.ControllerManagedBy(mgr).
34+
Named("metrics").
3435
For(&apiv1.EtcdCluster{}).
3536
WithOptions(controller.Options{
3637
MaxConcurrentReconciles: 4,
@@ -39,28 +40,58 @@ func SetupWithManager(mgr manager.Manager, meterProvider metric.MeterProvider) e
3940
}
4041

4142
func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
42-
observer := r.observers[req.NamespacedName]
43-
43+
key := req.NamespacedName
4444
cluster := &apiv1.EtcdCluster{}
45-
err := r.kcl.Get(ctx, req.NamespacedName, cluster)
45+
err := r.kcl.Get(ctx, key, cluster)
4646
switch {
4747
// passthrough error
4848
case client.IgnoreNotFound(err) != nil:
4949
return reconcile.Result{}, err
50-
// unregister if cluster not found or deleted
51-
case (err != nil || !cluster.DeletionTimestamp.IsZero()) && observer != nil:
52-
err = observer.Unregister()
50+
// delete observer if cluster not found or deleted
51+
case err != nil || !cluster.DeletionTimestamp.IsZero():
52+
err = r.Delete(key)
53+
return reconcile.Result{}, err
54+
}
55+
56+
observer, err := r.GetOrCreate(key)
57+
if err != nil {
5358
return reconcile.Result{}, err
54-
// register observer
55-
case observer == nil:
56-
observer, err = Register(r.meter, cluster)
57-
if err != nil {
58-
return reconcile.Result{}, err
59-
}
60-
r.observers[req.NamespacedName] = observer
6159
}
6260

6361
observer.Update(cluster)
6462

6563
return reconcile.Result{}, nil
6664
}
65+
66+
func (r *Reconciler) GetOrCreate(key client.ObjectKey) (*Observer, error) {
67+
r.mtx.Lock()
68+
defer r.mtx.Unlock()
69+
70+
observer, ok := r.observers[key]
71+
if ok {
72+
return observer, nil
73+
}
74+
75+
observer, err := Register(r.meter, key)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
r.observers[key] = observer
81+
82+
return observer, nil
83+
}
84+
85+
func (r *Reconciler) Delete(key client.ObjectKey) error {
86+
r.mtx.Lock()
87+
defer r.mtx.Unlock()
88+
89+
observer, ok := r.observers[key]
90+
if !ok {
91+
return nil
92+
}
93+
94+
delete(r.observers, key)
95+
96+
return observer.Unregister()
97+
}

0 commit comments

Comments
 (0)