Skip to content

Commit c2c61b1

Browse files
committed
Implement S3 Bucket controller
1 parent 0e6ae3b commit c2c61b1

File tree

7 files changed

+394
-8
lines changed

7 files changed

+394
-8
lines changed

controllers/bucket_controller.go

Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
/*
2+
Copyright 2020 The Flux CD contributors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controllers
18+
19+
import (
20+
"context"
21+
"crypto/sha1"
22+
"fmt"
23+
"io/ioutil"
24+
"os"
25+
"path/filepath"
26+
"strings"
27+
"time"
28+
29+
"github.com/go-logr/logr"
30+
"github.com/minio/minio-go/v7"
31+
"github.com/minio/minio-go/v7/pkg/credentials"
32+
corev1 "k8s.io/api/core/v1"
33+
"k8s.io/apimachinery/pkg/runtime"
34+
"k8s.io/apimachinery/pkg/types"
35+
kuberecorder "k8s.io/client-go/tools/record"
36+
"k8s.io/client-go/tools/reference"
37+
ctrl "sigs.k8s.io/controller-runtime"
38+
"sigs.k8s.io/controller-runtime/pkg/client"
39+
"sigs.k8s.io/controller-runtime/pkg/controller"
40+
41+
"github.com/fluxcd/pkg/recorder"
42+
"github.com/fluxcd/pkg/runtime/predicates"
43+
44+
sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
45+
)
46+
47+
// BucketReconciler reconciles a Bucket object
48+
type BucketReconciler struct {
49+
client.Client
50+
Log logr.Logger
51+
Scheme *runtime.Scheme
52+
Storage *Storage
53+
EventRecorder kuberecorder.EventRecorder
54+
ExternalEventRecorder *recorder.EventRecorder
55+
}
56+
57+
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
58+
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
59+
60+
func (r *BucketReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
61+
ctx := context.Background()
62+
start := time.Now()
63+
64+
var bucket sourcev1.Bucket
65+
if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil {
66+
return ctrl.Result{}, client.IgnoreNotFound(err)
67+
}
68+
69+
log := r.Log.WithValues("controller", strings.ToLower(sourcev1.BucketKind), "request", req.NamespacedName)
70+
71+
// Examine if the object is under deletion
72+
if bucket.ObjectMeta.DeletionTimestamp.IsZero() {
73+
// The object is not being deleted, so if it does not have our finalizer,
74+
// then lets add the finalizer and update the object. This is equivalent
75+
// registering our finalizer.
76+
if !containsString(bucket.ObjectMeta.Finalizers, sourcev1.SourceFinalizer) {
77+
bucket.ObjectMeta.Finalizers = append(bucket.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
78+
if err := r.Update(ctx, &bucket); err != nil {
79+
log.Error(err, "unable to register finalizer")
80+
return ctrl.Result{}, err
81+
}
82+
}
83+
} else {
84+
// The object is being deleted
85+
if containsString(bucket.ObjectMeta.Finalizers, sourcev1.SourceFinalizer) {
86+
// Our finalizer is still present, so lets handle garbage collection
87+
if err := r.gc(bucket, true); err != nil {
88+
r.event(bucket, recorder.EventSeverityError, fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
89+
// Return the error so we retry the failed garbage collection
90+
return ctrl.Result{}, err
91+
}
92+
// Remove our finalizer from the list and update it
93+
bucket.ObjectMeta.Finalizers = removeString(bucket.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
94+
if err := r.Update(ctx, &bucket); err != nil {
95+
return ctrl.Result{}, err
96+
}
97+
// Stop reconciliation as the object is being deleted
98+
return ctrl.Result{}, nil
99+
}
100+
}
101+
102+
// set initial status
103+
if resetBucket, ok := r.resetStatus(bucket); ok {
104+
bucket = resetBucket
105+
if err := r.Status().Update(ctx, &bucket); err != nil {
106+
log.Error(err, "unable to update status")
107+
return ctrl.Result{Requeue: true}, err
108+
}
109+
}
110+
111+
// purge old artifacts from storage
112+
if err := r.gc(bucket, false); err != nil {
113+
log.Error(err, "unable to purge old artifacts")
114+
}
115+
116+
// reconcile bucket by downloading its content
117+
reconciledBucket, reconcileErr := r.reconcile(ctx, *bucket.DeepCopy())
118+
119+
// update status with the reconciliation result
120+
if err := r.Status().Update(ctx, &reconciledBucket); err != nil {
121+
log.Error(err, "unable to update status")
122+
return ctrl.Result{Requeue: true}, err
123+
}
124+
125+
// if reconciliation failed, record the failure and requeue immediately
126+
if reconcileErr != nil {
127+
r.event(reconciledBucket, recorder.EventSeverityError, reconcileErr.Error())
128+
return ctrl.Result{Requeue: true}, reconcileErr
129+
}
130+
131+
// emit revision change event
132+
if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision {
133+
r.event(reconciledBucket, recorder.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket))
134+
}
135+
136+
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
137+
time.Now().Sub(start).String(),
138+
bucket.GetInterval().Duration.String(),
139+
))
140+
141+
return ctrl.Result{RequeueAfter: bucket.GetInterval().Duration}, nil
142+
}
143+
144+
type BucketReconcilerOptions struct {
145+
MaxConcurrentReconciles int
146+
}
147+
148+
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
149+
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
150+
}
151+
152+
func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts BucketReconcilerOptions) error {
153+
return ctrl.NewControllerManagedBy(mgr).
154+
For(&sourcev1.Bucket{}).
155+
WithEventFilter(predicates.ChangePredicate{}).
156+
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
157+
Complete(r)
158+
}
159+
160+
func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) {
161+
s3Client, err := r.auth(ctx, bucket)
162+
if err != nil {
163+
err = fmt.Errorf("auth error: %w", err)
164+
return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err
165+
}
166+
167+
// create tmp dir
168+
tempDir, err := ioutil.TempDir("", bucket.Name)
169+
if err != nil {
170+
err = fmt.Errorf("tmp dir error: %w", err)
171+
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
172+
}
173+
defer os.RemoveAll(tempDir)
174+
175+
ctxTimeout, cancel := context.WithTimeout(ctx, bucket.GetTimeout())
176+
defer cancel()
177+
178+
// download bucket content
179+
for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{Recursive: true}) {
180+
if object.Err != nil {
181+
err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err)
182+
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
183+
}
184+
185+
if strings.HasSuffix(object.Key, "/") {
186+
continue
187+
}
188+
189+
localPath := filepath.Join(tempDir, object.Key)
190+
err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{})
191+
if err != nil {
192+
err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
193+
return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
194+
}
195+
}
196+
197+
revision, err := r.checksum(tempDir)
198+
if err != nil {
199+
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
200+
}
201+
202+
// return early on unchanged revision
203+
artifact := r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), revision, fmt.Sprintf("%s.tar.gz", revision))
204+
if bucket.GetArtifact() != nil && bucket.GetArtifact().Revision == revision {
205+
if artifact.URL != bucket.GetArtifact().URL {
206+
r.Storage.SetArtifactURL(bucket.GetArtifact())
207+
bucket.Status.URL = r.Storage.SetHostname(bucket.Status.URL)
208+
}
209+
return bucket, nil
210+
}
211+
212+
// create artifact dir
213+
err = r.Storage.MkdirAll(artifact)
214+
if err != nil {
215+
err = fmt.Errorf("mkdir dir error: %w", err)
216+
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
217+
}
218+
219+
// acquire lock
220+
unlock, err := r.Storage.Lock(artifact)
221+
if err != nil {
222+
err = fmt.Errorf("unable to acquire lock: %w", err)
223+
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
224+
}
225+
defer unlock()
226+
227+
// archive artifact and check integrity
228+
if err := r.Storage.Archive(&artifact, tempDir, bucket.Spec.Ignore); err != nil {
229+
err = fmt.Errorf("storage archive error: %w", err)
230+
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
231+
}
232+
233+
// update latest symlink
234+
url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
235+
if err != nil {
236+
err = fmt.Errorf("storage symlink error: %w", err)
237+
return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
238+
}
239+
240+
message := fmt.Sprintf("Fetched revision: %s", artifact.Revision)
241+
return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil
242+
}
243+
244+
func (r *BucketReconciler) auth(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) {
245+
opt := minio.Options{
246+
Region: bucket.Spec.Region,
247+
Secure: !bucket.Spec.Insecure,
248+
}
249+
250+
if bucket.Spec.SecretRef != nil {
251+
secretName := types.NamespacedName{
252+
Namespace: bucket.GetNamespace(),
253+
Name: bucket.Spec.SecretRef.Name,
254+
}
255+
256+
var secret corev1.Secret
257+
if err := r.Get(ctx, secretName, &secret); err != nil {
258+
return nil, fmt.Errorf("credentials secret error: %w", err)
259+
}
260+
261+
accesskey := ""
262+
secretkey := ""
263+
if k, ok := secret.Data["accesskey"]; ok {
264+
accesskey = string(k)
265+
}
266+
if k, ok := secret.Data["secretkey"]; ok {
267+
secretkey = string(k)
268+
}
269+
if accesskey == "" || secretkey == "" {
270+
return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
271+
}
272+
opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "")
273+
} else if bucket.Spec.Provider == "aws" {
274+
opt.Creds = credentials.NewIAM("")
275+
}
276+
277+
return minio.New(bucket.Spec.Endpoint, &opt)
278+
}
279+
280+
func (r *BucketReconciler) checksum(root string) (string, error) {
281+
checksum := ""
282+
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
283+
if err != nil {
284+
return err
285+
}
286+
if !info.Mode().IsRegular() {
287+
return nil
288+
}
289+
data, err := ioutil.ReadFile(path)
290+
if err != nil {
291+
return err
292+
}
293+
checksum += fmt.Sprintf("%x", sha1.Sum(data))
294+
return nil
295+
})
296+
if err != nil {
297+
return "", err
298+
}
299+
300+
return fmt.Sprintf("%x", sha1.Sum([]byte(checksum))), nil
301+
}
302+
303+
// resetStatus returns a modified v1alpha1.Bucket and a boolean indicating
304+
// if the status field has been reset.
305+
func (r *BucketReconciler) resetStatus(bucket sourcev1.Bucket) (sourcev1.Bucket, bool) {
306+
if bucket.GetArtifact() != nil && !r.Storage.ArtifactExist(*bucket.GetArtifact()) {
307+
bucket = sourcev1.BucketProgressing(bucket)
308+
bucket.Status.Artifact = nil
309+
return bucket, true
310+
}
311+
if bucket.Generation != bucket.Status.ObservedGeneration {
312+
return sourcev1.BucketProgressing(bucket), true
313+
}
314+
return bucket, false
315+
}
316+
317+
// gc performs a garbage collection on all but current artifacts of the given bucket.
318+
func (r *BucketReconciler) gc(bucket sourcev1.Bucket, all bool) error {
319+
if all {
320+
return r.Storage.RemoveAll(r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), "", ""))
321+
}
322+
if bucket.GetArtifact() != nil {
323+
return r.Storage.RemoveAllButCurrent(*bucket.GetArtifact())
324+
}
325+
return nil
326+
}
327+
328+
// event emits a Kubernetes event and forwards the event to notification controller if configured
329+
func (r *BucketReconciler) event(bucket sourcev1.Bucket, severity, msg string) {
330+
if r.EventRecorder != nil {
331+
r.EventRecorder.Eventf(&bucket, "Normal", severity, msg)
332+
}
333+
if r.ExternalEventRecorder != nil {
334+
objRef, err := reference.GetReference(r.Scheme, &bucket)
335+
if err != nil {
336+
r.Log.WithValues(
337+
"request",
338+
fmt.Sprintf("%s/%s", bucket.GetNamespace(), bucket.GetName()),
339+
).Error(err, "unable to send event")
340+
return
341+
}
342+
343+
if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
344+
r.Log.WithValues(
345+
"request",
346+
fmt.Sprintf("%s/%s", bucket.GetNamespace(), bucket.GetName()),
347+
).Error(err, "unable to send event")
348+
return
349+
}
350+
}
351+
}

controllers/gitrepository_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,15 +233,15 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, repository sour
233233
defer unlock()
234234

235235
// archive artifact and check integrity
236-
if err := r.Storage.Archive(&artifact, tmpGit, repository.Spec); err != nil {
236+
if err := r.Storage.Archive(&artifact, tmpGit, repository.Spec.Ignore); err != nil {
237237
err = fmt.Errorf("storage archive error: %w", err)
238238
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
239239
}
240240

241241
// update latest symlink
242242
url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
243243
if err != nil {
244-
err = fmt.Errorf("storage lock error: %w", err)
244+
err = fmt.Errorf("storage symlink error: %w", err)
245245
return sourcev1.GitRepositoryNotReady(repository, sourcev1.StorageOperationFailedReason, err.Error()), err
246246
}
247247

0 commit comments

Comments
 (0)