Skip to content

Commit 656c306

Browse files
committed
feat: add purge feature
1 parent 68d26b7 commit 656c306

File tree

1 file changed

+258
-0
lines changed

1 file changed

+258
-0
lines changed

internal/sync/s3_purge.go

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
package sync
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"path/filepath"
8+
"regexp"
9+
"slices"
10+
"strings"
11+
"sync"
12+
13+
"github.com/Altinity/docker-sync/config"
14+
"github.com/Altinity/docker-sync/structs"
15+
"github.com/aws/aws-sdk-go-v2/aws"
16+
"github.com/aws/aws-sdk-go-v2/service/s3"
17+
"github.com/rs/zerolog/log"
18+
"golang.org/x/sync/errgroup"
19+
)
20+
21+
func deleteS3(ctx context.Context, image *structs.Image, dst string, repository string, tag string) error {
22+
s3Session, bucket, err := getS3Session(dst)
23+
if err != nil {
24+
return err
25+
}
26+
27+
return deleteS3WithSession(ctx, s3Session, bucket, dst, repository, tag)
28+
}
29+
30+
func deleteS3WithSession(ctx context.Context, s3Session *s3.Client, bucket *string, dst string, repository string, tag string) error {
31+
s3c := &s3Client{
32+
s3Session: s3Session,
33+
bucket: bucket,
34+
dst: dst,
35+
baseDir: filepath.Join("v2", repository),
36+
}
37+
38+
if err := deleteObject(ctx, s3c, filepath.Join(s3c.baseDir, "manifests", tag)); err != nil {
39+
return err
40+
}
41+
42+
return nil
43+
}
44+
45+
func deleteObject(ctx context.Context, s3c *s3Client, key string) error {
46+
log.Info().
47+
Str("bucket", *s3c.bucket).
48+
Str("key", key).
49+
Msg("Deleting object")
50+
51+
_, err := s3c.s3Session.DeleteObject(ctx, &s3.DeleteObjectInput{
52+
Bucket: aws.String(*s3c.bucket),
53+
Key: aws.String(key),
54+
})
55+
if err != nil {
56+
return fmt.Errorf("failed to delete object %s from bucket %s: %w", key, *s3c.bucket, err)
57+
}
58+
59+
if config.SyncS3ObjectCacheEnabled.Bool() {
60+
cacheKey := fmt.Sprintf("%s/%s", *s3c.bucket, key)
61+
objectCache.Delete(cacheKey)
62+
}
63+
64+
return nil
65+
}
66+
67+
func getAllRepositoryBlobsS3(ctx context.Context, s3Session *s3.Client, bucket string, repository string) ([]string, error) {
68+
log.Info().
69+
Str("bucket", bucket).
70+
Str("repository", repository).
71+
Msg("Getting all blobs in repository")
72+
73+
blobs := []string{}
74+
75+
p := s3.NewListObjectsV2Paginator(s3Session, &s3.ListObjectsV2Input{
76+
Bucket: aws.String(bucket),
77+
Prefix: aws.String(filepath.Join("v2", repository, "blobs")),
78+
})
79+
80+
var i int
81+
for p.HasMorePages() {
82+
i++
83+
page, err := p.NextPage(ctx)
84+
if err != nil {
85+
return nil, fmt.Errorf("failed to get page %d, %w", i, err)
86+
}
87+
for _, obj := range page.Contents {
88+
fname := filepath.Base(*obj.Key)
89+
if strings.HasPrefix(fname, "sha256:") {
90+
blobs = append(blobs, fname)
91+
}
92+
}
93+
}
94+
95+
slices.Sort(blobs)
96+
97+
return slices.Compact(blobs), nil
98+
}
99+
100+
func getAllReferencedBlobsS3(ctx context.Context, s3Session *s3.Client, bucket string, repository string) ([]string, error) {
101+
log.Info().
102+
Str("bucket", bucket).
103+
Str("repository", repository).
104+
Msg("Getting all referenced blobs")
105+
106+
blobs := []string{}
107+
108+
p := s3.NewListObjectsV2Paginator(s3Session, &s3.ListObjectsV2Input{
109+
Bucket: aws.String(bucket),
110+
Prefix: aws.String(filepath.Join("v2", repository, "manifests")),
111+
})
112+
113+
var blobsMutex sync.Mutex
114+
115+
var i int
116+
for p.HasMorePages() {
117+
i++
118+
page, err := p.NextPage(ctx)
119+
if err != nil {
120+
return nil, fmt.Errorf("failed to get page %d, %w", i, err)
121+
}
122+
123+
log.Debug().
124+
Str("bucket", bucket).
125+
Int("page", i).
126+
Int("objects", len(page.Contents)).
127+
Msg("Processing page of objects")
128+
129+
g, _ := errgroup.WithContext(ctx)
130+
g.SetLimit(config.SyncS3MaxPurgeConcurrency.Int())
131+
132+
for _, obj := range page.Contents {
133+
g.Go(func() error {
134+
log.Debug().
135+
Str("bucket", bucket).
136+
Str("key", *obj.Key).
137+
Msg("Processing object")
138+
139+
// We need to read the manifest to find out which blobs it references
140+
resp, err := s3Session.GetObject(ctx, &s3.GetObjectInput{
141+
Bucket: aws.String(bucket),
142+
Key: obj.Key,
143+
})
144+
if err != nil {
145+
log.Error().
146+
Err(err).
147+
Str("bucket", bucket).
148+
Str("key", *obj.Key).
149+
Msg("Failed to get object")
150+
151+
return fmt.Errorf("failed to get object %s from bucket %s: %w", *obj.Key, bucket, err)
152+
}
153+
defer resp.Body.Close()
154+
155+
// To avoid having to parse the manifest, we can just read the body and look for "sha256:<64 chars>" patterns using regex
156+
buf := new(bytes.Buffer)
157+
if _, err := buf.ReadFrom(resp.Body); err != nil {
158+
// Don't proceed because we can miss blobs
159+
return fmt.Errorf("failed to read object body: %w", err)
160+
}
161+
162+
body := buf.String()
163+
// Find all sha256 hashes in the body
164+
re := regexp.MustCompile(`sha256:[a-f0-9]{64}`)
165+
matches := re.FindAllString(body, -1)
166+
167+
blobsMutex.Lock()
168+
for _, match := range matches {
169+
blobs = append(blobs, match)
170+
}
171+
blobsMutex.Unlock()
172+
173+
return nil
174+
})
175+
}
176+
177+
if err := g.Wait(); err != nil {
178+
return nil, fmt.Errorf("failed to process page %d: %w", i, err)
179+
}
180+
}
181+
182+
slices.Sort(blobs)
183+
184+
return slices.Compact(blobs), nil
185+
}
186+
187+
func deleteOrphanedBlobsS3(ctx context.Context, s3Session *s3.Client, bucket string, repository string) error {
188+
// Get all blobs in the repository
189+
allBlobs, err := getAllRepositoryBlobsS3(ctx, s3Session, bucket, repository)
190+
if err != nil {
191+
return fmt.Errorf("failed to get all blobs: %w", err)
192+
}
193+
log.Info().
194+
Str("bucket", bucket).
195+
Str("repository", repository).
196+
Int("blobs", len(allBlobs)).
197+
Msg("Retrieved all blobs in repository")
198+
199+
// Get all referenced blobs in the repository
200+
referencedBlobs, err := getAllReferencedBlobsS3(ctx, s3Session, bucket, repository)
201+
if err != nil {
202+
return fmt.Errorf("failed to get all referenced blobs: %w", err)
203+
}
204+
log.Info().
205+
Str("bucket", bucket).
206+
Str("repository", repository).
207+
Int("referenced_blobs", len(referencedBlobs)).
208+
Msg("Retrieved all referenced blobs in repository")
209+
210+
// Find orphaned blobs
211+
var orphanedBlobs []string
212+
for _, blob := range allBlobs {
213+
if !slices.Contains(referencedBlobs, blob) {
214+
orphanedBlobs = append(orphanedBlobs, blob)
215+
}
216+
}
217+
218+
if len(orphanedBlobs) == 0 {
219+
log.Info().
220+
Str("bucket", bucket).
221+
Str("repository", repository).
222+
Msg("No orphaned blobs found")
223+
return nil
224+
}
225+
226+
log.Info().
227+
Str("bucket", bucket).
228+
Str("repository", repository).
229+
Int("orphaned_blobs", len(orphanedBlobs)).
230+
Msg("Found orphaned blobs")
231+
232+
g, _ := errgroup.WithContext(ctx)
233+
g.SetLimit(config.SyncS3MaxPurgeConcurrency.Int())
234+
235+
for _, blob := range orphanedBlobs {
236+
g.Go(func() error {
237+
key := filepath.Join("v2", repository, "blobs", blob)
238+
if err := deleteObject(ctx, &s3Client{
239+
s3Session: s3Session,
240+
bucket: aws.String(bucket),
241+
}, key); err != nil {
242+
log.Error().
243+
Err(err).
244+
Str("bucket", bucket).
245+
Str("key", key).
246+
Msg("Failed to delete orphaned blob")
247+
248+
return err
249+
}
250+
251+
return nil
252+
})
253+
}
254+
255+
_ = g.Wait()
256+
257+
return nil
258+
}

0 commit comments

Comments
 (0)