Skip to content
This repository was archived by the owner on Dec 2, 2025. It is now read-only.

Commit 9b57691

Browse files
authored
feat: apply kstatus feature from ocm-controller (#76)
1 parent 5b7bd85 commit 9b57691

File tree

6 files changed

+281
-150
lines changed

6 files changed

+281
-150
lines changed

api/v1alpha1/componentsubscription_types.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package v1alpha1
66

77
import (
8+
"fmt"
89
"time"
910

1011
"github.com/fluxcd/pkg/apis/meta"
@@ -104,6 +105,18 @@ type ComponentSubscriptionStatus struct {
104105
Conditions []metav1.Condition `json:"conditions,omitempty"`
105106
}
106107

108+
func (in *ComponentSubscription) GetVID() map[string]string {
109+
vid := fmt.Sprintf("%s:%s", in.Status.LastAttemptedVersion, in.Status.LastAppliedVersion)
110+
metadata := make(map[string]string)
111+
metadata[GroupVersion.Group+"/component_subscription"] = vid
112+
113+
return metadata
114+
}
115+
116+
func (in *ComponentSubscription) SetObservedGeneration(v int64) {
117+
in.Status.ObservedGeneration = v
118+
}
119+
107120
// GetConditions returns the conditions of the ComponentVersion.
108121
func (in *ComponentSubscription) GetConditions() []metav1.Condition {
109122
return in.Status.Conditions

controllers/componentsubscription_controller.go

Lines changed: 146 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,128 @@ import (
1111
"time"
1212

1313
"github.com/Masterminds/semver/v3"
14+
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
1415
"github.com/fluxcd/pkg/apis/meta"
1516
"github.com/fluxcd/pkg/runtime/conditions"
1617
"github.com/fluxcd/pkg/runtime/patch"
18+
rreconcile "github.com/fluxcd/pkg/runtime/reconcile"
19+
"github.com/open-component-model/ocm-controller/pkg/event"
20+
"github.com/open-component-model/ocm-controller/pkg/status"
21+
"github.com/open-component-model/replication-controller/api/v1alpha1"
22+
"github.com/open-component-model/replication-controller/pkg/ocm"
23+
corev1 "k8s.io/api/core/v1"
1724
apierrors "k8s.io/apimachinery/pkg/api/errors"
25+
"k8s.io/apimachinery/pkg/fields"
1826
"k8s.io/apimachinery/pkg/runtime"
19-
"k8s.io/klog/v2"
27+
"k8s.io/apimachinery/pkg/types"
28+
"k8s.io/client-go/tools/record"
2029
ctrl "sigs.k8s.io/controller-runtime"
2130
"sigs.k8s.io/controller-runtime/pkg/builder"
2231
"sigs.k8s.io/controller-runtime/pkg/client"
23-
"sigs.k8s.io/controller-runtime/pkg/log"
32+
"sigs.k8s.io/controller-runtime/pkg/handler"
2433
"sigs.k8s.io/controller-runtime/pkg/predicate"
25-
26-
"github.com/open-component-model/replication-controller/api/v1alpha1"
27-
"github.com/open-component-model/replication-controller/pkg/ocm"
34+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
35+
"sigs.k8s.io/controller-runtime/pkg/source"
2836
)
2937

3038
// ComponentSubscriptionReconciler reconciles a ComponentSubscription object
3139
type ComponentSubscriptionReconciler struct {
3240
client.Client
3341
Scheme *runtime.Scheme
3442

35-
OCMClient ocm.Contract
43+
OCMClient ocm.Contract
44+
EventRecorder record.EventRecorder
45+
}
46+
47+
// SetupWithManager sets up the controller with the Manager.
48+
func (r *ComponentSubscriptionReconciler) SetupWithManager(mgr ctrl.Manager) error {
49+
const (
50+
sourceKey = ".metadata.source.secretRef"
51+
destinationKey = ".metadata.destination.secretRef"
52+
)
53+
54+
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &v1alpha1.ComponentSubscription{}, sourceKey, func(rawObj client.Object) []string {
55+
obj, ok := rawObj.(*v1alpha1.ComponentSubscription)
56+
if !ok {
57+
return []string{}
58+
}
59+
if obj.Spec.Source.SecretRef == nil {
60+
return []string{}
61+
}
62+
63+
ns := obj.GetNamespace()
64+
return []string{fmt.Sprintf("%s/%s", ns, obj.Spec.Source.SecretRef.Name)}
65+
}); err != nil {
66+
return fmt.Errorf("failed setting index fields: %w", err)
67+
}
68+
69+
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &v1alpha1.ComponentSubscription{}, sourceKey, func(rawObj client.Object) []string {
70+
obj, ok := rawObj.(*v1alpha1.ComponentSubscription)
71+
if !ok {
72+
return []string{}
73+
}
74+
if obj.Spec.Destination.SecretRef == nil {
75+
return []string{}
76+
}
77+
78+
ns := obj.GetNamespace()
79+
return []string{fmt.Sprintf("%s/%s", ns, obj.Spec.Destination.SecretRef.Name)}
80+
}); err != nil {
81+
return fmt.Errorf("failed setting index fields: %w", err)
82+
}
83+
84+
return ctrl.NewControllerManagedBy(mgr).
85+
For(&v1alpha1.ComponentSubscription{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
86+
Watches(
87+
&source.Kind{Type: &corev1.Secret{}},
88+
handler.EnqueueRequestsFromMapFunc(r.findObjects(sourceKey, destinationKey))).
89+
Complete(r)
90+
}
91+
92+
// findObjects finds component versions that have a key for the secret that triggered this watch event.
93+
func (r *ComponentSubscriptionReconciler) findObjects(sourceKey string, destinationKey string) handler.MapFunc {
94+
return func(obj client.Object) []reconcile.Request {
95+
sourceList := &v1alpha1.ComponentSubscriptionList{}
96+
if err := r.List(context.Background(), sourceList, &client.ListOptions{
97+
FieldSelector: fields.OneTermEqualSelector(sourceKey, client.ObjectKeyFromObject(obj).String()),
98+
}); err != nil {
99+
return []reconcile.Request{}
100+
}
101+
102+
destinationList := &v1alpha1.ComponentSubscriptionList{}
103+
if err := r.List(context.Background(), destinationList, &client.ListOptions{
104+
FieldSelector: fields.OneTermEqualSelector(destinationKey, client.ObjectKeyFromObject(obj).String()),
105+
}); err != nil {
106+
return []reconcile.Request{}
107+
}
108+
109+
// deduplicate the two secret lists
110+
requestMap := make(map[reconcile.Request]struct{})
111+
for _, item := range sourceList.Items {
112+
requestMap[reconcile.Request{
113+
NamespacedName: types.NamespacedName{
114+
Name: item.GetName(),
115+
Namespace: item.GetNamespace(),
116+
},
117+
}] = struct{}{}
118+
}
119+
120+
for _, item := range destinationList.Items {
121+
requestMap[reconcile.Request{
122+
NamespacedName: types.NamespacedName{
123+
Name: item.GetName(),
124+
Namespace: item.GetNamespace(),
125+
},
126+
}] = struct{}{}
127+
}
128+
129+
requests := make([]reconcile.Request, len(requestMap))
130+
for k := range requestMap {
131+
requests = append(requests, k)
132+
}
133+
134+
return requests
135+
}
36136
}
37137

38138
//+kubebuilder:rbac:groups=delivery.ocm.software,resources=componentsubscriptions,verbs=get;list;watch;create;update;patch;delete
@@ -44,7 +144,6 @@ type ComponentSubscriptionReconciler struct {
44144
// Reconcile is part of the main kubernetes reconciliation loop which aims to
45145
// move the current state of the cluster closer to the desired state.
46146
func (r *ComponentSubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
47-
logger := log.FromContext(ctx)
48147
obj := &v1alpha1.ComponentSubscription{}
49148
if err = r.Get(ctx, req.NamespacedName, obj); err != nil {
50149
if apierrors.IsNotFound(err) {
@@ -54,12 +153,7 @@ func (r *ComponentSubscriptionReconciler) Reconcile(ctx context.Context, req ctr
54153
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
55154
}
56155

57-
logger = logger.WithValues("subscription", klog.KObj(obj))
58-
logger.V(4).Info("starting reconcile loop")
59-
60156
if obj.DeletionTimestamp != nil {
61-
logger.Info("subscription is being deleted...")
62-
63157
return
64158
}
65159

@@ -69,47 +163,64 @@ func (r *ComponentSubscriptionReconciler) Reconcile(ctx context.Context, req ctr
69163

70164
// Always attempt to patch the object and status after each reconciliation.
71165
defer func() {
72-
if perr := patchHelper.Patch(ctx, obj); perr != nil {
73-
err = errors.Join(err, perr)
166+
// Patching has not been set up, or the controller errored earlier.
167+
if patchHelper == nil {
168+
return
169+
}
170+
171+
if derr := status.UpdateStatus(ctx, patchHelper, obj, r.EventRecorder, obj.GetRequeueAfter()); derr != nil {
172+
err = errors.Join(err, derr)
74173
}
75174
}()
76175

176+
// Starts the progression by setting ReconcilingCondition.
177+
// This will be checked in defer.
178+
// Should only be deleted on a success.
179+
rreconcile.ProgressiveStatus(false, obj, meta.ProgressingReason, "reconciliation in progress for resource: %s", obj.Name)
180+
77181
return r.reconcile(ctx, obj)
78182
}
79183

80-
func (r *ComponentSubscriptionReconciler) reconcile(ctx context.Context, obj *v1alpha1.ComponentSubscription) (ctrl.Result, error) {
81-
logger := log.FromContext(ctx)
184+
func (r *ComponentSubscriptionReconciler) reconcile(ctx context.Context, obj *v1alpha1.ComponentSubscription) (_ ctrl.Result, err error) {
185+
if obj.Generation != obj.Status.ObservedGeneration {
186+
rreconcile.ProgressiveStatus(
187+
false,
188+
obj,
189+
meta.ProgressingReason,
190+
"processing object: new generation %d -> %d",
191+
obj.Status.ObservedGeneration,
192+
obj.Generation,
193+
)
194+
}
82195

83196
octx, err := r.OCMClient.CreateAuthenticatedOCMContext(ctx, obj)
84197
if err != nil {
85198
err := fmt.Errorf("failed to authenticate OCM context: %w", err)
86-
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.AuthenticationFailedReason, err.Error())
199+
status.MarkAsStalled(r.EventRecorder, obj, v1alpha1.AuthenticationFailedReason, err.Error())
87200

88-
return ctrl.Result{}, err
201+
return ctrl.Result{}, nil
89202
}
90203

91204
version, err := r.OCMClient.GetLatestSourceComponentVersion(ctx, octx, obj)
92205
if err != nil {
93206
err := fmt.Errorf("failed to get latest component version: %w", err)
94-
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.PullingLatestVersionFailedReason, err.Error())
207+
status.MarkNotReady(r.EventRecorder, obj, v1alpha1.PullingLatestVersionFailedReason, err.Error())
95208

96209
// we don't want to fail but keep searching until it's there. But we do mark the subscription as failed.
97210
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
98211
}
99-
logger.V(4).Info("got newest version from component", "version", version)
100212

101213
// Because of the predicate, this subscription will be reconciled again once there is an update to its status field.
102214
if version == obj.Status.LastAppliedVersion {
103-
logger.Info("latest version and last applied version are a match and not empty")
104-
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Reconciliation success")
215+
r.markAsDone(obj)
105216

106217
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
107218
}
108219

109220
latestSourceComponentVersion, err := semver.NewVersion(version)
110221
if err != nil {
111222
err := fmt.Errorf("failed to parse source component version: %w", err)
112-
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.SemverConversionFailedReason, err.Error())
223+
status.MarkNotReady(r.EventRecorder, obj, v1alpha1.SemverConversionFailedReason, err.Error())
113224

114225
return ctrl.Result{}, err
115226
}
@@ -122,16 +233,13 @@ func (r *ComponentSubscriptionReconciler) reconcile(ctx context.Context, obj *v1
122233
lastAppliedVersion, err := semver.NewVersion(lastAppliedOriginal)
123234
if err != nil {
124235
err := fmt.Errorf("failed to parse latest version: %w", err)
125-
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.SemverConversionFailedReason, err.Error())
236+
status.MarkNotReady(r.EventRecorder, obj, v1alpha1.SemverConversionFailedReason, err.Error())
126237

127238
return ctrl.Result{}, err
128239
}
129240

130-
logger.V(4).Info("latest applied version is", "version", lastAppliedVersion.Original())
131-
132241
if latestSourceComponentVersion.LessThan(lastAppliedVersion) || latestSourceComponentVersion.Equal(lastAppliedVersion) {
133-
logger.Info("no new version found", "version", latestSourceComponentVersion.Original(), "latest", lastAppliedVersion.Original())
134-
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Reconciliation success")
242+
r.markAsDone(obj)
135243

136244
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
137245
}
@@ -142,48 +250,42 @@ func (r *ComponentSubscriptionReconciler) reconcile(ctx context.Context, obj *v1
142250
sourceComponentVersion, err := r.OCMClient.GetComponentVersion(ctx, octx, obj, latestSourceComponentVersion.Original())
143251
if err != nil {
144252
err := fmt.Errorf("failed to get latest component version: %w", err)
145-
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.GetComponentDescriptorFailedReason, err.Error())
253+
status.MarkNotReady(r.EventRecorder, obj, v1alpha1.GetComponentDescriptorFailedReason, err.Error())
146254

147255
return ctrl.Result{}, err
148256
}
149257

150258
defer func() {
151-
if err := sourceComponentVersion.Close(); err != nil {
152-
logger.Error(err, "failed to close source component version, context might be leaking...")
259+
if cerr := sourceComponentVersion.Close(); cerr != nil {
260+
err = errors.Join(err, cerr)
153261
}
154262
}()
155263

156-
logger.V(4).Info("pulling", "component-name", sourceComponentVersion.GetName())
157-
158264
if obj.Spec.Destination != nil {
265+
rreconcile.ProgressiveStatus(false, obj, meta.ProgressingReason, "transferring component to target repository: %s", obj.Spec.Destination.URL)
266+
159267
if err := r.OCMClient.TransferComponent(ctx, octx, obj, sourceComponentVersion, latestSourceComponentVersion.Original()); err != nil {
160268
err := fmt.Errorf("failed to transfer components: %w", err)
161-
conditions.MarkFalse(obj, meta.ReadyCondition, v1alpha1.TransferFailedReason, err.Error())
269+
status.MarkNotReady(r.EventRecorder, obj, v1alpha1.TransferFailedReason, err.Error())
162270

163-
logger.Error(err, "transferring components failed")
164271
return ctrl.Result{}, err
165272
}
166273

167274
obj.Status.ReplicatedRepositoryURL = obj.Spec.Destination.URL
168275
} else {
169-
logger.Info("skipping transferring as no destination is provided for source component", "component-name", sourceComponentVersion.GetName())
170-
171276
obj.Status.ReplicatedRepositoryURL = obj.Spec.Source.URL
172277
}
173278

174279
// Update the replicated version to the latest version
175280
obj.Status.LastAppliedVersion = latestSourceComponentVersion.Original()
176281

177-
logger.Info("resource is ready")
178-
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Reconciliation success")
282+
r.markAsDone(obj)
179283

180284
// Always requeue to constantly check for new versions.
181285
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
182286
}
183287

184-
// SetupWithManager sets up the controller with the Manager.
185-
func (r *ComponentSubscriptionReconciler) SetupWithManager(mgr ctrl.Manager) error {
186-
return ctrl.NewControllerManagedBy(mgr).
187-
For(&v1alpha1.ComponentSubscription{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
188-
Complete(r)
288+
func (r *ComponentSubscriptionReconciler) markAsDone(obj *v1alpha1.ComponentSubscription) {
289+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Reconciliation success")
290+
event.New(r.EventRecorder, obj, eventv1.EventSeverityInfo, "Reconciliation success", nil)
189291
}

controllers/componentsubscription_controller_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
1616
"k8s.io/apimachinery/pkg/types"
17+
"k8s.io/client-go/tools/record"
1718
ctrl "sigs.k8s.io/controller-runtime"
1819

1920
"github.com/open-component-model/ocm/pkg/contexts/ocm"
@@ -187,12 +188,17 @@ func TestComponentSubscriptionReconciler(t *testing.T) {
187188
cv := tt.subscription()
188189
client := env.FakeKubeClient(WithObjets(cv))
189190
fakeOcm := &fakes.MockFetcher{}
191+
recorder := &record.FakeRecorder{
192+
Events: make(chan string, 32),
193+
IncludeObject: true,
194+
}
190195
tt.setupMock(fakeOcm)
191196

192197
cvr := ComponentSubscriptionReconciler{
193-
Scheme: env.scheme,
194-
Client: client,
195-
OCMClient: fakeOcm,
198+
Scheme: env.scheme,
199+
Client: client,
200+
OCMClient: fakeOcm,
201+
EventRecorder: recorder,
196202
}
197203

198204
_, err := cvr.Reconcile(context.Background(), ctrl.Request{

0 commit comments

Comments
 (0)