Skip to content

Commit 6d2c6f1

Browse files
committed
feat: api catalog
1 parent b1f120a commit 6d2c6f1

18 files changed

+501
-229
lines changed

.just/mod/serve.just

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ serve:
55
--log-format=text \
66
--addr=:5070
77

8+
gc:
9+
{{ crkit }} gc -c --log-level=debug --dry-run
10+
811
serve-proxy:
912
{{ crkit }} serve registry -c \
1013
--log-format=text \

internal/cmd/crkit/gc.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package main
2+
3+
import (
4+
"github.com/innoai-tech/infra/pkg/cli"
5+
"github.com/innoai-tech/infra/pkg/otel"
6+
contentapi "github.com/octohelm/crkit/pkg/content/api"
7+
"github.com/octohelm/crkit/pkg/content/fs/garbagecollector"
8+
)
9+
10+
func init() {
11+
c := cli.AddTo(App, &GC{})
12+
c.LogFormat = "text"
13+
}
14+
15+
type GC struct {
16+
cli.C
17+
otel.Otel
18+
19+
contentapi.NamespaceProvider
20+
21+
garbagecollector.Executor
22+
}

internal/cmd/crkit/serve_registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:generate go tool devtool gen .
12
package main
23

34
import (

internal/cmd/crkit/zz_generated.runtimedoc.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,25 @@ DON'T EDIT THIS FILE
44
*/
55
package main
66

7+
func (v *GC) RuntimeDoc(names ...string) ([]string, bool) {
8+
if len(names) > 0 {
9+
switch names[0] {
10+
}
11+
if doc, ok := runtimeDoc(&v.Otel, "", names...); ok {
12+
return doc, ok
13+
}
14+
if doc, ok := runtimeDoc(&v.NamespaceProvider, "", names...); ok {
15+
return doc, ok
16+
}
17+
if doc, ok := runtimeDoc(&v.Executor, "", names...); ok {
18+
return doc, ok
19+
}
20+
21+
return nil, false
22+
}
23+
return []string{}, true
24+
}
25+
726
func (v *Registry) RuntimeDoc(names ...string) ([]string, bool) {
827
if len(names) > 0 {
928
switch names[0] {

pkg/content/collect/collect.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package collect
33
import (
44
"context"
55
"errors"
6+
67
"github.com/octohelm/crkit/pkg/content"
78
"github.com/opencontainers/go-digest"
89
)

pkg/content/fs/blob_store.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ func (bs *blobStore) Digests(ctx context.Context) iter.Seq2[digest.Digest, error
3030
}
3131

3232
err := bs.workspace.WalkDir(ctx, bs.workspace.layout.BlobsPath(), func(path string, d fs.DirEntry, err error) error {
33+
if err != nil {
34+
return err
35+
}
36+
3337
if path == "." || d.IsDir() {
3438
return nil
3539
}
@@ -38,18 +42,20 @@ func (bs *blobStore) Digests(ctx context.Context) iter.Seq2[digest.Digest, error
3842
if base != "data" {
3943
return nil
4044
}
45+
4146
parentDir, hex := filepath.Split(strings.TrimSuffix(dir, string(filepath.Separator)))
4247
alg := filepath.Dir(strings.TrimSuffix(parentDir, string(filepath.Separator)))
4348

44-
if alg == "" || hex == "" {
45-
return fmt.Errorf("invalid alg or hex: %q", path)
49+
dgst := digest.NewDigestFromHex(alg, hex)
50+
if err := dgst.Validate(); err != nil {
51+
return fmt.Errorf("invalid digest of data path %s: %w", path, err)
4652
}
47-
48-
if !yieldNamed(digest.NewDigestFromHex(alg, hex), nil) {
53+
54+
if !yieldNamed(dgst, nil) {
4955
return fs.SkipAll
5056
}
5157

52-
return err
58+
return nil
5359
})
5460
if err != nil {
5561
if !yield("", err) {

pkg/content/fs/blob_store__linked.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ func (lbs *linkedBlobStore) LinkedDigests(ctx context.Context) iter.Seq2[content
9292
}
9393

9494
if err := lbs.workspace.WalkDir(ctx, lbs.linkDirFunc(), func(path string, d fs.DirEntry, err error) error {
95+
if err != nil {
96+
return err
97+
}
98+
9599
if path == "." || d.IsDir() {
96100
return nil
97101
}
@@ -105,23 +109,24 @@ func (lbs *linkedBlobStore) LinkedDigests(ctx context.Context) iter.Seq2[content
105109
parentDir, hex := filepath.Split(strings.TrimSuffix(dir, string(filepath.Separator)))
106110
alg := filepath.Base(strings.TrimSuffix(parentDir, string(filepath.Separator)))
107111

112+
dgst := digest.NewDigestFromHex(alg, hex)
113+
if err := dgst.Validate(); err != nil {
114+
return fmt.Errorf("invalid linked digest of link path %s: %w", path, err)
115+
}
116+
108117
info, err := d.Info()
109118
if err != nil {
110119
return err
111120
}
112121

113-
if alg == "" || hex == "" {
114-
return fmt.Errorf("invalid alg or hex: %q", path)
115-
}
116-
117122
if !yieldLinkedDigest(content.LinkedDigest{
118-
Digest: digest.NewDigestFromHex(alg, hex),
123+
Digest: dgst,
119124
ModTime: info.ModTime(),
120125
}, nil) {
121126
return fs.SkipAll
122127
}
123128

124-
return err
129+
return nil
125130
}); err != nil {
126131
if !yield(content.LinkedDigest{}, err) {
127132
return
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
package garbagecollector
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/octohelm/x/ptr"
8+
"log/slog"
9+
"time"
10+
11+
"github.com/distribution/reference"
12+
"github.com/go-courier/logr"
13+
manifestv1 "github.com/octohelm/crkit/pkg/apis/manifest/v1"
14+
"github.com/octohelm/crkit/pkg/content"
15+
"github.com/octohelm/crkit/pkg/content/fs/driver"
16+
"github.com/opencontainers/go-digest"
17+
)
18+
19+
func MarkAndSweepExcludeModifiedIn(
20+
ctx context.Context,
21+
namespace content.Namespace, d driver.Driver,
22+
excludeModifiedIn time.Duration,
23+
dryRun bool,
24+
) error {
25+
repositoryNameIterable, ok := namespace.(content.RepositoryNameIterable)
26+
if !ok {
27+
return &content.ErrNotImplemented{Reason: errors.New("RepositoryNameIterable of Namespace")}
28+
}
29+
30+
blobDigestIterable, ok := namespace.(content.DigestIterable)
31+
if !ok {
32+
return &content.ErrNotImplemented{Reason: errors.New("DigestIterable of Namespace")}
33+
}
34+
35+
stabled := time.Now().Add(-excludeModifiedIn)
36+
37+
c := &collector{
38+
Vacuum: NewVacuum(d, dryRun),
39+
namespace: namespace,
40+
used: map[digest.Digest]struct{}{},
41+
recentlyActivated: func(modTime time.Time) bool {
42+
return modTime.After(stabled)
43+
},
44+
}
45+
46+
return c.MarkAndSweep(ctx, repositoryNameIterable, blobDigestIterable)
47+
}
48+
49+
type collector struct {
50+
Vacuum
51+
namespace content.Namespace
52+
used map[digest.Digest]struct{}
53+
recentlyActivated func(modTime time.Time) bool
54+
}
55+
56+
func (c *collector) count(dgst digest.Digest) {
57+
c.used[dgst] = struct{}{}
58+
}
59+
60+
func (c *collector) referenced(dgst digest.Digest) bool {
61+
_, ok := c.used[dgst]
62+
return ok
63+
}
64+
65+
func (c *collector) referencedOrRecentlyActivated(ld content.LinkedDigest) bool {
66+
if c.recentlyActivated(ld.ModTime) {
67+
c.count(ld.Digest)
68+
69+
return true
70+
}
71+
72+
_, ok := c.used[ld.Digest]
73+
return ok
74+
}
75+
76+
func (c *collector) MarkAndSweep(pctx context.Context, repositoryNameIterable content.RepositoryNameIterable, blobDigestIterable content.DigestIterable) error {
77+
ctx, l := logr.FromContext(pctx).Start(pctx, "MarkAndSweep")
78+
defer l.End()
79+
80+
for named, err := range repositoryNameIterable.RepositoryNames(ctx) {
81+
if err != nil {
82+
return err
83+
}
84+
85+
if err := c.markAndSweepRepository(ctx, named); err != nil {
86+
return fmt.Errorf("failed to mark and sweep repository %s: %w", named, err)
87+
}
88+
}
89+
90+
for d, err := range blobDigestIterable.Digests(ctx) {
91+
if err != nil {
92+
return err
93+
}
94+
95+
l.WithValues(slog.String("blob", string(d))).Debug("checking")
96+
97+
if c.referenced(d) {
98+
continue
99+
}
100+
101+
if err := c.RemoveBlob(ctx, d); err != nil {
102+
return fmt.Errorf("failed to remove blob %s: %w", d, err)
103+
}
104+
}
105+
106+
return nil
107+
}
108+
109+
func (c *collector) markAndSweepRepository(ctx context.Context, named reference.Named) error {
110+
l := logr.FromContext(ctx)
111+
112+
repository, err := c.namespace.Repository(ctx, named)
113+
if err != nil {
114+
return fmt.Errorf("failed to construct repository: %w", err)
115+
}
116+
117+
tagService, err := repository.Tags(ctx)
118+
if err != nil {
119+
return fmt.Errorf("failed to tag service: %w", err)
120+
}
121+
122+
manifestService, err := repository.Manifests(ctx)
123+
if err != nil {
124+
return fmt.Errorf("failed to manifest service: %w", err)
125+
}
126+
127+
blobStore, err := repository.Blobs(ctx)
128+
if err != nil {
129+
return fmt.Errorf("failed to blob store: %w", err)
130+
}
131+
132+
manifestDigestIterable, ok := manifestService.(content.LinkedDigestIterable)
133+
if !ok {
134+
return &content.ErrNotImplemented{Reason: errors.New("LinkedDigestIterable of ManifestService")}
135+
}
136+
137+
layerDigestIterable, ok := blobStore.(content.LinkedDigestIterable)
138+
if !ok {
139+
return &content.ErrNotImplemented{Reason: errors.New("LinkedDigestIterable of BlobStore")}
140+
}
141+
142+
allTags, err := tagService.All(ctx)
143+
if err != nil {
144+
return fmt.Errorf("failed to get all tags: %w", err)
145+
}
146+
147+
for _, tag := range allTags {
148+
d, err := tagService.Get(ctx, tag)
149+
if err != nil {
150+
return err
151+
}
152+
153+
if err := c.markManifest(ctx, manifestService, d.Digest); err != nil {
154+
return err
155+
}
156+
}
157+
158+
l.WithValues(slog.String("name", named.String())).Info("marking")
159+
160+
for ld, err := range manifestDigestIterable.LinkedDigests(ctx) {
161+
if err != nil {
162+
return err
163+
}
164+
165+
l.WithValues(
166+
slog.String("name", named.String()),
167+
slog.String("manifest", string(ld.Digest)),
168+
).Info("checking")
169+
170+
if c.referencedOrRecentlyActivated(ld) {
171+
continue
172+
}
173+
174+
if err := c.RemoveManifest(ctx, named, ld.Digest, allTags); err != nil {
175+
return fmt.Errorf("failed to remove manifest %s@%s: %w", named, ld.Digest, err)
176+
}
177+
}
178+
179+
for ld, err := range layerDigestIterable.LinkedDigests(ctx) {
180+
if err != nil {
181+
return err
182+
}
183+
184+
l.WithValues(
185+
slog.String("name", named.String()),
186+
slog.String("layer", string(ld.Digest)),
187+
).Debug("checking")
188+
189+
if c.referencedOrRecentlyActivated(ld) {
190+
continue
191+
}
192+
193+
if err := c.RemoveLayer(ctx, named, ld.Digest); err != nil {
194+
return fmt.Errorf("failed to remove layer %s@%s: %w", named, ld.Digest, err)
195+
}
196+
}
197+
198+
return nil
199+
}
200+
201+
func (c *collector) markManifest(ctx context.Context, manifestService content.ManifestService, manifestDigest digest.Digest) error {
202+
m, err := manifestService.Get(ctx, manifestDigest)
203+
if err != nil {
204+
return err
205+
}
206+
207+
c.count(manifestDigest)
208+
209+
switch m.Type() {
210+
case manifestv1.DockerMediaTypeManifestList, manifestv1.MediaTypeImageIndex:
211+
for d := range m.References() {
212+
if err := c.markManifest(ctx, manifestService, d.Digest); err != nil {
213+
// skip for partial cached
214+
if errors.As(err, ptr.Ptr(&content.ErrManifestUnknownRevision{})) {
215+
continue
216+
}
217+
return err
218+
}
219+
}
220+
default:
221+
for d := range m.References() {
222+
c.count(d.Digest)
223+
}
224+
}
225+
226+
return nil
227+
}

0 commit comments

Comments
 (0)