Skip to content

Commit 83b6fdc

Browse files
committed
controllers: use digest for Bucket revision
Signed-off-by: Hidde Beydals <[email protected]>
1 parent f4eae19 commit 83b6fdc

File tree

5 files changed

+720
-303
lines changed

5 files changed

+720
-303
lines changed

controllers/bucket_controller.go

Lines changed: 48 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,14 @@ package controllers
1818

1919
import (
2020
"context"
21-
"crypto/sha256"
2221
"errors"
2322
"fmt"
2423
"os"
2524
"path/filepath"
26-
"sort"
2725
"strings"
28-
"sync"
2926
"time"
3027

31-
"github.com/fluxcd/source-controller/pkg/azure"
28+
"github.com/opencontainers/go-digest"
3229
"golang.org/x/sync/errgroup"
3330
"golang.org/x/sync/semaphore"
3431
corev1 "k8s.io/api/core/v1"
@@ -51,10 +48,14 @@ import (
5148

5249
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
5350
"github.com/fluxcd/pkg/sourceignore"
51+
5452
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
53+
intdigest "github.com/fluxcd/source-controller/internal/digest"
5554
serror "github.com/fluxcd/source-controller/internal/error"
55+
"github.com/fluxcd/source-controller/internal/index"
5656
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
5757
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
58+
"github.com/fluxcd/source-controller/pkg/azure"
5859
"github.com/fluxcd/source-controller/pkg/gcp"
5960
"github.com/fluxcd/source-controller/pkg/minio"
6061
)
@@ -154,83 +155,7 @@ type BucketProvider interface {
154155
// bucketReconcileFunc is the function type for all the v1beta2.Bucket
155156
// (sub)reconcile functions. The type implementations are grouped and
156157
// executed serially to perform the complete reconcile of the object.
157-
type bucketReconcileFunc func(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error)
158-
159-
// etagIndex is an index of storage object keys and their Etag values.
160-
type etagIndex struct {
161-
sync.RWMutex
162-
index map[string]string
163-
}
164-
165-
// newEtagIndex returns a new etagIndex with an empty initialized index.
166-
func newEtagIndex() *etagIndex {
167-
return &etagIndex{
168-
index: make(map[string]string),
169-
}
170-
}
171-
172-
func (i *etagIndex) Add(key, etag string) {
173-
i.Lock()
174-
defer i.Unlock()
175-
i.index[key] = etag
176-
}
177-
178-
func (i *etagIndex) Delete(key string) {
179-
i.Lock()
180-
defer i.Unlock()
181-
delete(i.index, key)
182-
}
183-
184-
func (i *etagIndex) Get(key string) string {
185-
i.RLock()
186-
defer i.RUnlock()
187-
return i.index[key]
188-
}
189-
190-
func (i *etagIndex) Has(key string) bool {
191-
i.RLock()
192-
defer i.RUnlock()
193-
_, ok := i.index[key]
194-
return ok
195-
}
196-
197-
func (i *etagIndex) Index() map[string]string {
198-
i.RLock()
199-
defer i.RUnlock()
200-
index := make(map[string]string)
201-
for k, v := range i.index {
202-
index[k] = v
203-
}
204-
return index
205-
}
206-
207-
func (i *etagIndex) Len() int {
208-
i.RLock()
209-
defer i.RUnlock()
210-
return len(i.index)
211-
}
212-
213-
// Revision calculates the SHA256 checksum of the index.
214-
// The keys are stable sorted, and the SHA256 sum is then calculated for the
215-
// string representation of the key/value pairs, each pair written on a newline
216-
// with a space between them. The sum result is returned as a string.
217-
func (i *etagIndex) Revision() (string, error) {
218-
i.RLock()
219-
defer i.RUnlock()
220-
keyIndex := make([]string, 0, len(i.index))
221-
for k := range i.index {
222-
keyIndex = append(keyIndex, k)
223-
}
224-
225-
sort.Strings(keyIndex)
226-
sum := sha256.New()
227-
for _, k := range keyIndex {
228-
if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i.index[k]))); err != nil {
229-
return "", err
230-
}
231-
}
232-
return fmt.Sprintf("%x", sum.Sum(nil)), nil
233-
}
158+
type bucketReconcileFunc func(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *index.Digester, dir string) (sreconcile.Result, error)
234159

235160
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
236161
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
@@ -371,7 +296,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, sp *patch.SerialPatche
371296
var (
372297
res sreconcile.Result
373298
resErr error
374-
index = newEtagIndex()
299+
index = index.NewDigester()
375300
)
376301

377302
for _, rec := range reconcilers {
@@ -397,7 +322,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, sp *patch.SerialPatche
397322
}
398323

399324
// notify emits notification related to the reconciliation.
400-
func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) {
325+
func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.Bucket, index *index.Digester, res sreconcile.Result, resErr error) {
401326
// Notify successful reconciliation for new artifact and recovery from any
402327
// failure.
403328
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
@@ -443,7 +368,7 @@ func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.
443368
// condition is added.
444369
// The hostname of any URL in the Status of the object are updated, to ensure
445370
// they match the Storage server hostname of current runtime.
446-
func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) {
371+
func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, _ *index.Digester, _ string) (sreconcile.Result, error) {
447372
// Garbage collect previous advertised artifact(s) from storage
448373
_ = r.garbageCollect(ctx, obj)
449374

@@ -484,7 +409,7 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.Seria
484409
// When a SecretRef is defined, it attempts to fetch the Secret before calling
485410
// the provider. If this fails, it records v1beta2.FetchFailedCondition=True on
486411
// the object and returns early.
487-
func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
412+
func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *index.Digester, dir string) (sreconcile.Result, error) {
488413
secret, err := r.getBucketSecret(ctx, obj)
489414
if err != nil {
490415
e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason}
@@ -538,26 +463,21 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
538463
return sreconcile.ResultEmpty, e
539464
}
540465

541-
// Calculate revision
542-
revision, err := index.Revision()
543-
if err != nil {
544-
return sreconcile.ResultEmpty, &serror.Event{
545-
Err: fmt.Errorf("failed to calculate revision: %w", err),
546-
Reason: meta.FailedReason,
547-
}
466+
// Check if index has changed compared to current Artifact revision.
467+
var changed bool
468+
if artifact := obj.Status.Artifact; artifact != nil && artifact.Revision != "" {
469+
curRev := backwardsCompatibleDigest(artifact.Revision)
470+
changed = curRev != index.Digest(curRev.Algorithm())
548471
}
549472

550-
// Mark observations about the revision on the object
551-
defer func() {
552-
// As fetchIndexFiles can make last-minute modifications to the etag
553-
// index, we need to re-calculate the revision at the end
554-
revision, err := index.Revision()
555-
if err != nil {
556-
ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision after fetching etag index")
557-
return
558-
}
473+
// Fetch the bucket objects if required to.
474+
if artifact := obj.GetArtifact(); artifact == nil || changed {
475+
// Mark observations about the revision on the object
476+
defer func() {
477+
// As fetchIndexFiles can make last-minute modifications to the etag
478+
// index, we need to re-calculate the revision at the end
479+
revision := index.Digest(intdigest.Canonical)
559480

560-
if !obj.GetArtifact().HasRevision(revision) {
561481
message := fmt.Sprintf("new upstream revision '%s'", revision)
562482
if obj.GetArtifact() != nil {
563483
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
@@ -567,10 +487,8 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
567487
ctrl.LoggerFrom(ctx).Error(err, "failed to patch")
568488
return
569489
}
570-
}
571-
}()
490+
}()
572491

573-
if !obj.GetArtifact().HasRevision(revision) {
574492
if err = fetchIndexFiles(ctx, provider, obj, index, dir); err != nil {
575493
e := &serror.Event{Err: err, Reason: sourcev1.BucketOperationFailedReason}
576494
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
@@ -591,32 +509,32 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial
591509
// early.
592510
// On a successful archive, the Artifact in the Status of the object is set,
593511
// and the symlink in the Storage is updated to its path.
594-
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
512+
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *index.Digester, dir string) (sreconcile.Result, error) {
595513
// Calculate revision
596-
revision, err := index.Revision()
597-
if err != nil {
598-
return sreconcile.ResultEmpty, &serror.Event{
599-
Err: fmt.Errorf("failed to calculate revision of new artifact: %w", err),
600-
Reason: meta.FailedReason,
601-
}
602-
}
514+
revision := index.Digest(intdigest.Canonical)
603515

604516
// Create artifact
605-
artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
517+
artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision.String(), fmt.Sprintf("%s.tar.gz", revision.Encoded()))
606518

607519
// Set the ArtifactInStorageCondition if there's no drift.
608520
defer func() {
609-
if obj.GetArtifact().HasRevision(artifact.Revision) {
610-
conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
611-
conditions.MarkTrue(obj, sourcev1.ArtifactInStorageCondition, meta.SucceededReason,
612-
"stored artifact: revision '%s'", artifact.Revision)
521+
if curArtifact := obj.GetArtifact(); curArtifact != nil && curArtifact.Revision != "" {
522+
curRev := backwardsCompatibleDigest(curArtifact.Revision)
523+
if index.Digest(curRev.Algorithm()) == curRev {
524+
conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
525+
conditions.MarkTrue(obj, sourcev1.ArtifactInStorageCondition, meta.SucceededReason,
526+
"stored artifact: revision '%s'", artifact.Revision)
527+
}
613528
}
614529
}()
615530

616531
// The artifact is up-to-date
617-
if obj.GetArtifact().HasRevision(artifact.Revision) {
618-
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision)
619-
return sreconcile.ResultSuccess, nil
532+
if curArtifact := obj.GetArtifact(); curArtifact != nil && curArtifact.Revision != "" {
533+
curRev := backwardsCompatibleDigest(curArtifact.Revision)
534+
if index.Digest(curRev.Algorithm()) == curRev {
535+
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision)
536+
return sreconcile.ResultSuccess, nil
537+
}
620538
}
621539

622540
// Ensure target path exists and is a directory
@@ -781,7 +699,7 @@ func (r *BucketReconciler) annotatedEventLogf(ctx context.Context,
781699
// bucket using the given provider, while filtering them using .sourceignore
782700
// rules. After fetching an object, the etag value in the index is updated to
783701
// the current value to ensure accuracy.
784-
func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error {
702+
func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *index.Digester, tempDir string) error {
785703
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
786704
defer cancel()
787705

@@ -835,7 +753,7 @@ func fetchEtagIndex(ctx context.Context, provider BucketProvider, obj *sourcev1.
835753
// using the given provider, and stores them into tempDir. It downloads in
836754
// parallel, but limited to the maxConcurrentBucketFetches.
837755
// Given an index is provided, the bucket is assumed to exist.
838-
func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *etagIndex, tempDir string) error {
756+
func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1.Bucket, index *index.Digester, tempDir string) error {
839757
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
840758
defer cancel()
841759

@@ -879,3 +797,10 @@ func fetchIndexFiles(ctx context.Context, provider BucketProvider, obj *sourcev1
879797

880798
return nil
881799
}
800+
801+
func backwardsCompatibleDigest(d string) digest.Digest {
802+
if !strings.Contains(d, ":") {
803+
d = digest.SHA256.String() + ":" + d
804+
}
805+
return digest.Digest(d)
806+
}

controllers/bucket_controller_fetch_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929

3030
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
31+
"github.com/fluxcd/source-controller/internal/index"
3132
)
3233

3334
type mockBucketObject struct {
@@ -88,8 +89,8 @@ func (m *mockBucketClient) addObject(key string, object mockBucketObject) {
8889
m.objects[key] = object
8990
}
9091

91-
func (m *mockBucketClient) objectsToEtagIndex() *etagIndex {
92-
i := newEtagIndex()
92+
func (m *mockBucketClient) objectsToDigestIndex() *index.Digester {
93+
i := index.NewDigester()
9394
for k, v := range m.objects {
9495
i.Add(k, v.etag)
9596
}
@@ -114,7 +115,7 @@ func Test_fetchEtagIndex(t *testing.T) {
114115
client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"})
115116
client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"})
116117

117-
index := newEtagIndex()
118+
index := index.NewDigester()
118119
err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
119120
if err != nil {
120121
t.Fatal(err)
@@ -128,7 +129,7 @@ func Test_fetchEtagIndex(t *testing.T) {
128129

129130
client := mockBucketClient{bucketName: "other-bucket-name"}
130131

131-
index := newEtagIndex()
132+
index := index.NewDigester()
132133
err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
133134
assert.ErrorContains(t, err, "not found")
134135
})
@@ -141,7 +142,7 @@ func Test_fetchEtagIndex(t *testing.T) {
141142
client.addObject("foo.yaml", mockBucketObject{etag: "etag1", data: "foo.yaml"})
142143
client.addObject("foo.txt", mockBucketObject{etag: "etag2", data: "foo.txt"})
143144

144-
index := newEtagIndex()
145+
index := index.NewDigester()
145146
err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
146147
if err != nil {
147148
t.Fatal(err)
@@ -168,7 +169,7 @@ func Test_fetchEtagIndex(t *testing.T) {
168169
bucket := bucket.DeepCopy()
169170
bucket.Spec.Ignore = &ignore
170171

171-
index := newEtagIndex()
172+
index := index.NewDigester()
172173
err := fetchEtagIndex(context.TODO(), client, bucket.DeepCopy(), index, tmp)
173174
if err != nil {
174175
t.Fatal(err)
@@ -203,7 +204,7 @@ func Test_fetchFiles(t *testing.T) {
203204
client.addObject("bar.yaml", mockBucketObject{data: "bar.yaml", etag: "etag2"})
204205
client.addObject("baz.yaml", mockBucketObject{data: "baz.yaml", etag: "etag3"})
205206

206-
index := client.objectsToEtagIndex()
207+
index := client.objectsToDigestIndex()
207208

208209
err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
209210
if err != nil {
@@ -225,7 +226,7 @@ func Test_fetchFiles(t *testing.T) {
225226
client := mockBucketClient{bucketName: bucketName, objects: map[string]mockBucketObject{}}
226227
client.objects["error"] = mockBucketObject{}
227228

228-
err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), client.objectsToEtagIndex(), tmp)
229+
err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), client.objectsToDigestIndex(), tmp)
229230
if err == nil {
230231
t.Fatal("expected error but got nil")
231232
}
@@ -237,7 +238,7 @@ func Test_fetchFiles(t *testing.T) {
237238
client := mockBucketClient{bucketName: bucketName}
238239
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag2"})
239240

240-
index := newEtagIndex()
241+
index := index.NewDigester()
241242
index.Add("foo.yaml", "etag1")
242243
err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
243244
if err != nil {
@@ -253,7 +254,7 @@ func Test_fetchFiles(t *testing.T) {
253254
client := mockBucketClient{bucketName: bucketName}
254255
client.addObject("foo.yaml", mockBucketObject{data: "foo.yaml", etag: "etag1"})
255256

256-
index := newEtagIndex()
257+
index := index.NewDigester()
257258
index.Add("foo.yaml", "etag1")
258259
// Does not exist on server
259260
index.Add("bar.yaml", "etag2")
@@ -276,7 +277,7 @@ func Test_fetchFiles(t *testing.T) {
276277
f := fmt.Sprintf("file-%d", i)
277278
client.addObject(f, mockBucketObject{etag: f, data: f})
278279
}
279-
index := client.objectsToEtagIndex()
280+
index := client.objectsToDigestIndex()
280281

281282
err := fetchIndexFiles(context.TODO(), client, bucket.DeepCopy(), index, tmp)
282283
if err != nil {

0 commit comments

Comments
 (0)