-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathcontroller.go
More file actions
353 lines (302 loc) · 12.6 KB
/
controller.go
File metadata and controls
353 lines (302 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
/*
Copyright Kurator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package application
import (
"context"
"fmt"
helmv2b1 "github.com/fluxcd/helm-controller/api/v2beta1"
kustomizev1beta2 "github.com/fluxcd/kustomize-controller/api/v1beta2"
sourcev1beta2 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1"
fleetapi "kurator.dev/kurator/pkg/apis/fleet/v1alpha1"
fleetmanager "kurator.dev/kurator/pkg/fleet-manager"
)
const (
GitRepoKind = sourcev1beta2.GitRepositoryKind
HelmRepoKind = sourcev1beta2.HelmRepositoryKind
OCIRepoKind = sourcev1beta2.OCIRepositoryKind
KustomizationKind = kustomizev1beta2.KustomizationKind
HelmReleaseKind = helmv2b1.HelmReleaseKind
ApplicationLabel = "apps.kurator.dev/app-name"
ApplicationKind = "Application"
ApplicationFinalizer = "apps.kurator.dev"
)
// ApplicationManager reconciles an Application object
type ApplicationManager struct {
client.Client
Scheme *runtime.Scheme
}
// SetupWithManager sets up the controller with the Manager.
func (a *ApplicationManager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
c, err := ctrl.NewControllerManagedBy(mgr).
For(&applicationapi.Application{}).
Build(a)
if err != nil {
return err
}
// Set up watches for the updates to application's status.
if err := c.Watch(
source.Kind(mgr.GetCache(), &sourcev1beta2.GitRepository{}),
handler.EnqueueRequestsFromMapFunc(a.objectToApplicationFunc),
); err != nil {
return fmt.Errorf("failed to add a Watch for GitRepository: %v", err)
}
if err := c.Watch(
source.Kind(mgr.GetCache(), &sourcev1beta2.HelmRepository{}),
handler.EnqueueRequestsFromMapFunc(a.objectToApplicationFunc),
); err != nil {
return fmt.Errorf("failed to add a Watch for HelmRepository: %v", err)
}
if err := c.Watch(
source.Kind(mgr.GetCache(), &sourcev1beta2.OCIRepository{}),
handler.EnqueueRequestsFromMapFunc(a.objectToApplicationFunc),
); err != nil {
return fmt.Errorf("failed to add a Watch for OCIRepository: %v", err)
}
if err := c.Watch(
source.Kind(mgr.GetCache(), &kustomizev1beta2.Kustomization{}),
handler.EnqueueRequestsFromMapFunc(a.objectToApplicationFunc),
); err != nil {
return fmt.Errorf("failed to add a Watch for Kustomization: %v", err)
}
if err := c.Watch(
source.Kind(mgr.GetCache(), &helmv2b1.HelmRelease{}),
handler.EnqueueRequestsFromMapFunc(a.objectToApplicationFunc),
); err != nil {
return fmt.Errorf("failed to add a Watch for HelmRelease: %v", err)
}
return nil
}
func (a *ApplicationManager) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
log := ctrl.LoggerFrom(ctx).WithValues("application", req.NamespacedName)
app := &applicationapi.Application{}
if err := a.Get(ctx, req.NamespacedName, app); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, errors.Wrapf(err, "failed to get application %s", req.NamespacedName)
}
patchHelper, err := patch.NewHelper(app, a.Client)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to init patch helper for application %s", req.NamespacedName)
}
defer func() {
if err := patchHelper.Patch(ctx, app); err != nil {
reterr = utilerrors.NewAggregate([]error{reterr, errors.Wrapf(err, "failed to patch application %s", req.NamespacedName)})
}
}()
// Add finalizer if not exist to void the race condition.
if !controllerutil.ContainsFinalizer(app, ApplicationFinalizer) {
controllerutil.AddFinalizer(app, ApplicationFinalizer)
}
// there only one fleet, so pre-fetch it here.
fleetKey := generateFleetKey(app)
fleet := &fleetapi.Fleet{}
// Retrieve fleet object based on the defined fleet key
if err := a.Client.Get(ctx, fleetKey, fleet); err != nil {
if apierrors.IsNotFound(err) {
log.Info("fleet does not exist", "fleet", fleetKey)
return ctrl.Result{RequeueAfter: fleetmanager.RequeueAfter}, nil
}
// Log error and requeue request if error occurred during fleet retrieval
log.Error(err, "failed to find fleet", "fleet", fleetKey)
return ctrl.Result{}, err
}
// Handle deletion reconciliation loop.
if app.DeletionTimestamp != nil {
return a.reconcileDelete(ctx, app, fleet)
}
// Handle normal loop.
return a.reconcile(ctx, app, fleet)
}
func (a *ApplicationManager) reconcile(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
result, err := a.reconcileApplicationResources(ctx, app, fleet)
if err != nil {
log.Error(err, "failed to reconcileSyncResources")
}
if err != nil || result.RequeueAfter > 0 {
return result, err
}
if err := a.reconcileStatus(ctx, app, fleet); err != nil {
log.Error(err, "failed to reconcile status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// reconcileApplicationResources handles the synchronization of resources associated with the current Application resource.
// The associated resources are categorized as 'source' and 'policy'.
// 'source' could be one of gitRepo, helmRepo, or ociRepo while 'policy' can be either kustomizations or helmReleases.
// Any change in Application configuration could potentially lead to creation, deletion, or modification of associated resources in the Kubernetes cluster.
func (a *ApplicationManager) reconcileApplicationResources(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet) (ctrl.Result, error) {
// Synchronize source resource based on application configuration
if result, err := a.syncSourceResource(ctx, app); err != nil || result.RequeueAfter > 0 {
return result, err
}
// Iterate over each policy in the application's spec.SyncPolicy
for index, policy := range app.Spec.SyncPolicies {
policyName := generatePolicyName(app, index)
// A policy has a fleet, and a fleet has many clusters. Therefore, a policy may need to create or update multiple kustomizations/helmReleases for each cluster.
// Synchronize policy resource based on current application, fleet, and policy configuration
if result, err := a.syncPolicyResource(ctx, app, fleet, policy, policyName); err != nil || result.RequeueAfter > 0 {
return result, err
}
}
return ctrl.Result{}, nil
}
// reconcileStatus updates the status of resources associated with the current Application resource.
// It does this by fetching the current status of the source (either GitRepoKind or HelmRepoKind) and the sync policy from the API server,
// and updating the Application's status to reflect these current statuses.
func (a *ApplicationManager) reconcileStatus(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet) error {
if err := a.reconcileSourceStatus(ctx, app); err != nil {
return err
}
if err := a.reconcileSyncStatus(ctx, app, fleet); err != nil {
return err
}
return nil
}
// reconcileSourceStatus reconciles the source status of the given application by fetching the status of the source resource (e.g. GitRepository, HelmRepository)
func (a *ApplicationManager) reconcileSourceStatus(ctx context.Context, app *applicationapi.Application) error {
log := ctrl.LoggerFrom(ctx)
sourceKey := client.ObjectKey{
Name: generateSourceName(app),
Namespace: app.GetNamespace(),
}
if app.Status.SourceStatus == nil {
app.Status.SourceStatus = &applicationapi.ApplicationSourceStatus{}
}
sourceKind := findSourceKind(app)
// Depending on source kind in application specifications, fetch resource status and update application's source status
switch sourceKind {
case GitRepoKind:
currentResource := &sourcev1beta2.GitRepository{}
err := a.Client.Get(ctx, sourceKey, currentResource)
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "failed to get GitRepository from the API server when reconciling status")
return err
}
// if not found, return directly. new created GitRepository will be watched in subsequent loop
if apierrors.IsNotFound(err) {
return nil
}
app.Status.SourceStatus.GitRepoStatus = ¤tResource.Status
case HelmRepoKind:
currentResource := &sourcev1beta2.HelmRepository{}
err := a.Client.Get(ctx, sourceKey, currentResource)
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "failed to get HelmRepository from the API server when reconciling status")
return err
}
// if not found, return directly. new created HelmRepository will be watched in subsequent loop
if apierrors.IsNotFound(err) {
return nil
}
app.Status.SourceStatus.HelmRepoStatus = ¤tResource.Status
}
return nil
}
// reconcileSyncStatus reconciles the sync status of the given application by finding all Kustomizations and HelmReleases associated with it,
// and updating the sync status of each resource in the application's SyncStatus field.
func (a *ApplicationManager) reconcileSyncStatus(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet) error {
var syncStatus []*applicationapi.ApplicationSyncStatus
// find all kustomization
kustomizationList, err := a.getKustomizationList(ctx, app)
if err != nil {
return nil
}
// sync all kustomization status
for _, kustomization := range kustomizationList.Items {
kustomizationStatus := &applicationapi.ApplicationSyncStatus{
Name: kustomization.Name,
KustomizationStatus: &kustomization.Status,
}
syncStatus = append(syncStatus, kustomizationStatus)
}
// find all helmRelease
helmReleaseList, err := a.getHelmReleaseList(ctx, app)
if err != nil {
return err
}
// sync all helmRelease status
for _, helmRelease := range helmReleaseList.Items {
helmReleaseStatus := &applicationapi.ApplicationSyncStatus{
Name: helmRelease.Name,
HelmReleaseStatus: &helmRelease.Status,
}
syncStatus = append(syncStatus, helmReleaseStatus)
}
rolloutStatus := make(map[string]*applicationapi.RolloutStatus)
// Get rollout status from member clusters
for index, syncPolicy := range app.Spec.SyncPolicies {
policyName := generatePolicyName(app, index)
if syncPolicy.Rollout != nil {
status, _, err := a.reconcileRolloutSyncStatus(ctx, app, fleet, syncPolicy, policyName)
if err != nil {
return errors.Wrapf(err, "failed to reconcil rollout status")
}
rolloutStatus = mergeMap(status, rolloutStatus)
}
}
// update rollout status
for index, policyStatus := range syncStatus {
if _, exist := rolloutStatus[policyStatus.Name]; exist {
syncStatus[index].RolloutStatus = rolloutStatus[policyStatus.Name]
}
}
app.Status.SyncStatus = syncStatus
return nil
}
func (a *ApplicationManager) reconcileDelete(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
fleetKey := generateFleetKey(app)
if err := a.Client.Get(ctx, fleetKey, fleet); err != nil {
if apierrors.IsNotFound(err) {
log.Info("delete failed, fleet does not exist", "fleet", fleetKey)
return ctrl.Result{RequeueAfter: fleetmanager.RequeueAfter}, nil
}
log.Error(err, "delete failed, fleet does not found", "fleet", fleetKey)
return ctrl.Result{}, err
}
if deleteErr := a.deleteResourcesInMemberClusters(ctx, app, fleet); deleteErr != nil {
return ctrl.Result{}, errors.Wrapf(deleteErr, "failed to delete rollout resource in cluster")
}
controllerutil.RemoveFinalizer(app, ApplicationFinalizer)
return ctrl.Result{}, nil
}
func (a *ApplicationManager) objectToApplicationFunc(ctx context.Context, o client.Object) []ctrl.Request {
labels := o.GetLabels()
if labels[ApplicationLabel] != "" {
return []ctrl.Request{
{
NamespacedName: types.NamespacedName{
Namespace: o.GetNamespace(),
Name: labels[ApplicationLabel],
},
},
}
}
return nil
}