Skip to content

Commit 4e080ba

Browse files
committed
feat(resource): implement ResourceReconciler and associated subroutines for managing OCI and Helm repositories
Signed-off-by: Bastian Echterhölter <[email protected]> On-behalf-of: @SAP <[email protected]>
1 parent 1555500 commit 4e080ba

File tree

4 files changed

+325
-1
lines changed

4 files changed

+325
-1
lines changed

cmd/operator.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ func RunController(_ *cobra.Command, _ []string) { // coverage-ignore
113113
os.Exit(1)
114114
}
115115

116+
resourceReconciler := controller.NewResourceReconciler(log, mgr, &operatorCfg)
117+
if err := resourceReconciler.SetupWithManager(mgr, defaultCfg, log); err != nil {
118+
setupLog.Error(err, "unable to create controller", "controller", "PlatformMesh")
119+
os.Exit(1)
120+
}
121+
116122
if operatorCfg.PatchOIDCControllerEnabled {
117123
realmReconciler := controller.NewRealmReconciler(mgr, log, &operatorCfg)
118124
if err := realmReconciler.SetupWithManager(mgr, defaultCfg, log); err != nil {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/platform-mesh/platform-mesh-operator
33
go 1.25.1
44

55
replace sigs.k8s.io/controller-runtime => github.com/kcp-dev/controller-runtime v0.19.0-kcp.1
6-
6+
replace ocm.software/open-component-model/kubernetes/controller => github.com/open-component-model/open-component-model/kubernetes/controller v0.0.0-20251104083903-c6d40af9889f
77
replace (
88
k8s.io/api => k8s.io/api v0.34.1
99
k8s.io/apimachinery => k8s.io/apimachinery v0.34.1
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Copyright 2025.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"context"
21+
22+
pmconfig "github.com/platform-mesh/golang-commons/config"
23+
"github.com/platform-mesh/golang-commons/controller/lifecycle/controllerruntime"
24+
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
25+
"github.com/platform-mesh/golang-commons/logger"
26+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27+
"k8s.io/apimachinery/pkg/runtime/schema"
28+
29+
ctrl "sigs.k8s.io/controller-runtime"
30+
"sigs.k8s.io/controller-runtime/pkg/predicate"
31+
32+
"github.com/platform-mesh/platform-mesh-operator/internal/config"
33+
"github.com/platform-mesh/platform-mesh-operator/pkg/subroutines/resource"
34+
)
35+
36+
var (
37+
resourceReconcilerName = "ResourceReconciler"
38+
)
39+
40+
// ResourceReconciler reconciles a PlatformMesh object
41+
type ResourceReconciler struct {
42+
lifecycle *controllerruntime.LifecycleManager
43+
}
44+
45+
var gvk = schema.GroupVersionKind{
46+
Group: "delivery.ocm.software",
47+
Version: "v1alpha1",
48+
Kind: "Resource",
49+
}
50+
51+
func (r *ResourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
52+
obj := &unstructured.Unstructured{}
53+
obj.SetGroupVersionKind(gvk)
54+
return r.lifecycle.Reconcile(ctx, req, obj)
55+
}
56+
57+
// SetupWithManager sets up the controller with the Manager.
58+
func (r *ResourceReconciler) SetupWithManager(mgr ctrl.Manager, cfg *pmconfig.CommonServiceConfig,
59+
log *logger.Logger, eventPredicates ...predicate.Predicate) error {
60+
61+
obj := &unstructured.Unstructured{}
62+
obj.SetGroupVersionKind(gvk)
63+
64+
mgr.GetScheme().AddKnownTypeWithName(gvk, &unstructured.Unstructured{})
65+
mgr.GetScheme().AddKnownTypeWithName(gvk.GroupVersion().WithKind(gvk.Kind+"List"), &unstructured.UnstructuredList{})
66+
67+
builder, err := r.lifecycle.SetupWithManagerBuilder(mgr, cfg.MaxConcurrentReconciles, resourceReconcilerName, obj,
68+
cfg.DebugLabelValue, log, eventPredicates...)
69+
if err != nil {
70+
return err
71+
}
72+
return builder.Complete(r)
73+
}
74+
75+
func NewResourceReconciler(log *logger.Logger, mgr ctrl.Manager, cfg *config.OperatorConfig) *ResourceReconciler {
76+
var subs []subroutine.Subroutine
77+
78+
subs = append(subs, resource.NewResourceSubroutine(mgr))
79+
80+
return &ResourceReconciler{
81+
lifecycle: controllerruntime.NewLifecycleManager(subs, operatorName,
82+
resourceReconcilerName, mgr.GetClient(), log).WithReadOnly(),
83+
}
84+
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package resource
2+
3+
import (
4+
"context"
5+
"strings"
6+
7+
"github.com/platform-mesh/golang-commons/controller/lifecycle/runtimeobject"
8+
"github.com/platform-mesh/golang-commons/errors"
9+
"github.com/platform-mesh/golang-commons/logger"
10+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
11+
"k8s.io/apimachinery/pkg/runtime/schema"
12+
ctrl "sigs.k8s.io/controller-runtime"
13+
"sigs.k8s.io/controller-runtime/pkg/client"
14+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
15+
"sigs.k8s.io/controller-runtime/pkg/manager"
16+
)
17+
18+
var ociRepoGvk = schema.GroupVersionKind{
19+
Group: "source.toolkit.fluxcd.io",
20+
Version: "v1",
21+
Kind: "OCIRepository",
22+
}
23+
24+
var helmRepoGvk = schema.GroupVersionKind{
25+
Group: "source.toolkit.fluxcd.io",
26+
Version: "v1",
27+
Kind: "HelmRepository",
28+
}
29+
30+
var helmReleaseGvk = schema.GroupVersionKind{
31+
Group: "helm.toolkit.fluxcd.io",
32+
Version: "v2",
33+
Kind: "HelmRelease",
34+
}
35+
36+
type ResourceSubroutine struct {
37+
mgr manager.Manager
38+
}
39+
40+
func NewResourceSubroutine(mgr manager.Manager) *ResourceSubroutine {
41+
return &ResourceSubroutine{mgr: mgr}
42+
}
43+
44+
func (r *ResourceSubroutine) GetName() string {
45+
return "ResourceSubroutine"
46+
}
47+
48+
func (r *ResourceSubroutine) Finalize(_ context.Context, _ runtimeobject.RuntimeObject) (ctrl.Result, errors.OperatorError) {
49+
return ctrl.Result{}, nil
50+
}
51+
52+
func (r *ResourceSubroutine) Finalizers() []string { // coverage-ignore
53+
return []string{}
54+
}
55+
56+
func (r *ResourceSubroutine) Process(ctx context.Context, runtimeObj runtimeobject.RuntimeObject) (ctrl.Result, errors.OperatorError) {
57+
inst := runtimeObj.(*unstructured.Unstructured)
58+
log := logger.LoadLoggerFromContext(ctx).ChildLogger("name", r.GetName())
59+
60+
repo := inst.GetLabels()["repo"]
61+
artifact := inst.GetLabels()["artifact"]
62+
63+
if repo == "oci" && artifact == "chart" {
64+
log.Debug().Msg("Create/Update OCI Repo")
65+
result, err := r.updateOciRepo(ctx, inst, log)
66+
if err != nil {
67+
return result, err
68+
}
69+
}
70+
if repo == "helm" && artifact == "chart" {
71+
log.Debug().Msg("Create/Update Flux Helm Repository Repo")
72+
result, err := r.updateHelmRepository(ctx, inst, log)
73+
if err != nil {
74+
return result, err
75+
}
76+
log.Debug().Msg("Update Flux Helm Release Repo")
77+
result, err = r.updateHelmRelease(ctx, inst, log)
78+
if err != nil {
79+
return result, err
80+
}
81+
}
82+
if repo == "helm" && artifact == "image" {
83+
log.Debug().Msg("Update Helm Release with Image Tag")
84+
result, err := r.updateHelmReleaseWithImageTag(ctx, inst, log)
85+
if err != nil {
86+
return result, err
87+
}
88+
}
89+
return ctrl.Result{}, nil
90+
}
91+
92+
func (r *ResourceSubroutine) updateHelmReleaseWithImageTag(ctx context.Context, inst *unstructured.Unstructured, log *logger.Logger) (ctrl.Result, errors.OperatorError) {
93+
obj := &unstructured.Unstructured{}
94+
obj.SetGroupVersionKind(helmReleaseGvk)
95+
obj.SetName(inst.GetName())
96+
obj.SetNamespace(inst.GetNamespace())
97+
98+
version, found, err := unstructured.NestedString(inst.Object, "status", "resource", "version")
99+
if err != nil || !found {
100+
log.Info().Err(err).Msg("Failed to get version from Resource status")
101+
}
102+
103+
err = r.mgr.GetClient().Get(ctx, client.ObjectKeyFromObject(inst), obj)
104+
if err != nil {
105+
log.Error().Err(err).Msg("Failed to get HelmRelease")
106+
return ctrl.Result{}, errors.NewOperatorError(err, true, true)
107+
}
108+
109+
err = unstructured.SetNestedField(obj.Object, version, "spec", "values", "image", "tag")
110+
if err != nil {
111+
log.Error().Err(err).Msg("Failed to set version in HelmRelease spec")
112+
return ctrl.Result{}, errors.NewOperatorError(err, true, false)
113+
}
114+
115+
err = r.mgr.GetClient().Update(ctx, obj)
116+
if err != nil {
117+
log.Error().Err(err).Msg("Failed to update HelmRelease")
118+
return ctrl.Result{}, errors.NewOperatorError(err, true, true)
119+
}
120+
return ctrl.Result{}, nil
121+
}
122+
123+
func (r *ResourceSubroutine) updateHelmRelease(ctx context.Context, inst *unstructured.Unstructured, log *logger.Logger) (ctrl.Result, errors.OperatorError) {
124+
obj := &unstructured.Unstructured{}
125+
obj.SetGroupVersionKind(helmReleaseGvk)
126+
obj.SetName(inst.GetName())
127+
obj.SetNamespace(inst.GetNamespace())
128+
129+
version, found, err := unstructured.NestedString(inst.Object, "status", "resource", "version")
130+
if err != nil || !found {
131+
log.Info().Err(err).Msg("Failed to get version from Resource status")
132+
}
133+
134+
err = r.mgr.GetClient().Get(ctx, client.ObjectKeyFromObject(inst), obj)
135+
if err != nil {
136+
log.Error().Err(err).Msg("Failed to get HelmRelease")
137+
return ctrl.Result{}, errors.NewOperatorError(err, true, true)
138+
}
139+
140+
err = unstructured.SetNestedField(obj.Object, version, "spec", "chart", "spec", "version")
141+
if err != nil {
142+
log.Error().Err(err).Msg("Failed to set version in HelmRelease spec")
143+
return ctrl.Result{}, errors.NewOperatorError(err, true, false)
144+
}
145+
146+
err = r.mgr.GetClient().Update(ctx, obj)
147+
if err != nil {
148+
log.Error().Err(err).Msg("Failed to update HelmRelease")
149+
return ctrl.Result{}, errors.NewOperatorError(err, true, true)
150+
}
151+
return ctrl.Result{}, nil
152+
}
153+
154+
func (r *ResourceSubroutine) updateHelmRepository(ctx context.Context, inst *unstructured.Unstructured, log *logger.Logger) (ctrl.Result, errors.OperatorError) {
155+
url, found, err := unstructured.NestedString(inst.Object, "status", "resource", "access", "helmRepository")
156+
if err != nil || !found {
157+
log.Info().Err(err).Msg("Failed to get imageReference from Resource status")
158+
return ctrl.Result{}, errors.NewOperatorError(err, true, false)
159+
}
160+
161+
log.Info().Msg("Processing OCI Chart Resource")
162+
obj := &unstructured.Unstructured{}
163+
obj.SetGroupVersionKind(helmRepoGvk)
164+
obj.SetName(inst.GetName())
165+
obj.SetNamespace(inst.GetNamespace())
166+
_, err = controllerutil.CreateOrUpdate(ctx, r.mgr.GetClient(), obj, func() error {
167+
err := unstructured.SetNestedField(obj.Object, url, "spec", "url")
168+
if err != nil {
169+
return err
170+
}
171+
err = unstructured.SetNestedField(obj.Object, "generic", "spec", "provider")
172+
if err != nil {
173+
return err
174+
}
175+
err = unstructured.SetNestedField(obj.Object, "5m", "spec", "interval")
176+
if err != nil {
177+
return err
178+
}
179+
return nil
180+
})
181+
if err != nil {
182+
log.Error().Err(err).Msg("Failed to create or update OCIRepository")
183+
return ctrl.Result{}, errors.NewOperatorError(err, true, true)
184+
}
185+
return ctrl.Result{}, nil
186+
}
187+
188+
func (r *ResourceSubroutine) updateOciRepo(ctx context.Context, inst *unstructured.Unstructured, log *logger.Logger) (ctrl.Result, errors.OperatorError) {
189+
version, found, err := unstructured.NestedString(inst.Object, "status", "resource", "version")
190+
if err != nil || !found {
191+
log.Info().Err(err).Msg("Failed to get version from Resource status")
192+
}
193+
url, found, err := unstructured.NestedString(inst.Object, "status", "resource", "access", "imageReference")
194+
if err != nil || !found {
195+
log.Info().Err(err).Msg("Failed to get imageReference from Resource status")
196+
}
197+
url = "oci://" + url
198+
url = strings.TrimSuffix(url, ":"+version)
199+
200+
// Update or create oci repo
201+
log.Info().Msg("Processing OCI Chart Resource")
202+
obj := &unstructured.Unstructured{}
203+
obj.SetGroupVersionKind(ociRepoGvk)
204+
obj.SetName(inst.GetName())
205+
obj.SetNamespace(inst.GetNamespace())
206+
_, err = controllerutil.CreateOrUpdate(ctx, r.mgr.GetClient(), obj, func() error {
207+
err := unstructured.SetNestedField(obj.Object, version, "spec", "ref", "tag")
208+
if err != nil {
209+
return err
210+
}
211+
err = unstructured.SetNestedField(obj.Object, url, "spec", "url")
212+
if err != nil {
213+
return err
214+
}
215+
err = unstructured.SetNestedField(obj.Object, "generic", "spec", "provider")
216+
if err != nil {
217+
return err
218+
}
219+
err = unstructured.SetNestedField(obj.Object, "1m0s", "spec", "interval")
220+
if err != nil {
221+
return err
222+
}
223+
err = unstructured.SetNestedMap(inst.Object, map[string]interface{}{
224+
"mediaType": "application/vnd.cncf.helm.chart.content.v1.tar+gzip",
225+
"operation": "copy",
226+
}, "spec", "layerSelector")
227+
return nil
228+
})
229+
if err != nil {
230+
log.Error().Err(err).Msg("Failed to create or update OCIRepository")
231+
return ctrl.Result{}, errors.NewOperatorError(err, true, true)
232+
}
233+
return ctrl.Result{}, nil
234+
}

0 commit comments

Comments
 (0)