Skip to content

Commit 8effd5b

Browse files
committed
feat: api catalog
1 parent b1f120a commit 8effd5b

File tree

17 files changed

+441
-221
lines changed

17 files changed

+441
-221
lines changed

.just/mod/serve.just

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ serve:
55
--log-format=text \
66
--addr=:5070
77

8+
gc:
9+
{{ crkit }} gc -c
10+
811
serve-proxy:
912
{{ crkit }} serve registry -c \
1013
--log-format=text \

internal/cmd/crkit/gc.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package main
2+
3+
import (
4+
"github.com/innoai-tech/infra/pkg/cli"
5+
"github.com/innoai-tech/infra/pkg/otel"
6+
contentapi "github.com/octohelm/crkit/pkg/content/api"
7+
"github.com/octohelm/crkit/pkg/content/fs/garbagecollector"
8+
)
9+
10+
func init() {
11+
c := cli.AddTo(App, &GC{})
12+
c.LogFormat = "text"
13+
}
14+
15+
type GC struct {
16+
cli.C
17+
otel.Otel
18+
19+
contentapi.NamespaceProvider
20+
21+
garbagecollector.Executor
22+
}

internal/cmd/crkit/serve_registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:generate go tool devtool gen .
12
package main
23

34
import (

internal/cmd/crkit/zz_generated.runtimedoc.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,25 @@ DON'T EDIT THIS FILE
44
*/
55
package main
66

7+
func (v *GC) RuntimeDoc(names ...string) ([]string, bool) {
8+
if len(names) > 0 {
9+
switch names[0] {
10+
}
11+
if doc, ok := runtimeDoc(&v.Otel, "", names...); ok {
12+
return doc, ok
13+
}
14+
if doc, ok := runtimeDoc(&v.NamespaceProvider, "", names...); ok {
15+
return doc, ok
16+
}
17+
if doc, ok := runtimeDoc(&v.Executor, "", names...); ok {
18+
return doc, ok
19+
}
20+
21+
return nil, false
22+
}
23+
return []string{}, true
24+
}
25+
726
func (v *Registry) RuntimeDoc(names ...string) ([]string, bool) {
827
if len(names) > 0 {
928
switch names[0] {

pkg/content/collect/collect.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package collect
33
import (
44
"context"
55
"errors"
6+
67
"github.com/octohelm/crkit/pkg/content"
78
"github.com/opencontainers/go-digest"
89
)

pkg/content/fs/blob_store.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ func (bs *blobStore) Digests(ctx context.Context) iter.Seq2[digest.Digest, error
3030
}
3131

3232
err := bs.workspace.WalkDir(ctx, bs.workspace.layout.BlobsPath(), func(path string, d fs.DirEntry, err error) error {
33+
if err != nil {
34+
return err
35+
}
36+
3337
if path == "." || d.IsDir() {
3438
return nil
3539
}
@@ -38,18 +42,20 @@ func (bs *blobStore) Digests(ctx context.Context) iter.Seq2[digest.Digest, error
3842
if base != "data" {
3943
return nil
4044
}
45+
4146
parentDir, hex := filepath.Split(strings.TrimSuffix(dir, string(filepath.Separator)))
4247
alg := filepath.Dir(strings.TrimSuffix(parentDir, string(filepath.Separator)))
4348

44-
if alg == "" || hex == "" {
45-
return fmt.Errorf("invalid alg or hex: %q", path)
49+
dgst := digest.NewDigestFromHex(alg, hex)
50+
if err := dgst.Validate(); err != nil {
51+
return fmt.Errorf("invalid digest of data path %s: %w", path, err)
4652
}
47-
48-
if !yieldNamed(digest.NewDigestFromHex(alg, hex), nil) {
53+
54+
if !yieldNamed(dgst, nil) {
4955
return fs.SkipAll
5056
}
5157

52-
return err
58+
return nil
5359
})
5460
if err != nil {
5561
if !yield("", err) {

pkg/content/fs/blob_store__linked.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ func (lbs *linkedBlobStore) LinkedDigests(ctx context.Context) iter.Seq2[content
9292
}
9393

9494
if err := lbs.workspace.WalkDir(ctx, lbs.linkDirFunc(), func(path string, d fs.DirEntry, err error) error {
95+
if err != nil {
96+
return err
97+
}
98+
9599
if path == "." || d.IsDir() {
96100
return nil
97101
}
@@ -105,23 +109,24 @@ func (lbs *linkedBlobStore) LinkedDigests(ctx context.Context) iter.Seq2[content
105109
parentDir, hex := filepath.Split(strings.TrimSuffix(dir, string(filepath.Separator)))
106110
alg := filepath.Base(strings.TrimSuffix(parentDir, string(filepath.Separator)))
107111

112+
dgst := digest.NewDigestFromHex(alg, hex)
113+
if err := dgst.Validate(); err != nil {
114+
return fmt.Errorf("invalid linked digest of link path %s: %w", path, err)
115+
}
116+
108117
info, err := d.Info()
109118
if err != nil {
110119
return err
111120
}
112121

113-
if alg == "" || hex == "" {
114-
return fmt.Errorf("invalid alg or hex: %q", path)
115-
}
116-
117122
if !yieldLinkedDigest(content.LinkedDigest{
118-
Digest: digest.NewDigestFromHex(alg, hex),
123+
Digest: dgst,
119124
ModTime: info.ModTime(),
120125
}, nil) {
121126
return fs.SkipAll
122127
}
123128

124-
return err
129+
return nil
125130
}); err != nil {
126131
if !yield(content.LinkedDigest{}, err) {
127132
return
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package garbagecollector
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/octohelm/x/ptr"
8+
"log/slog"
9+
"time"
10+
11+
"github.com/distribution/reference"
12+
"github.com/go-courier/logr"
13+
manifestv1 "github.com/octohelm/crkit/pkg/apis/manifest/v1"
14+
"github.com/octohelm/crkit/pkg/content"
15+
"github.com/octohelm/crkit/pkg/content/fs/driver"
16+
"github.com/opencontainers/go-digest"
17+
)
18+
19+
func MarkAndSweepExcludeModifiedIn(ctx context.Context, namespace content.Namespace, d driver.Driver, excludeModifiedIn time.Duration) error {
20+
repositoryNameIterable, ok := namespace.(content.RepositoryNameIterable)
21+
if !ok {
22+
return &content.ErrNotImplemented{Reason: errors.New("RepositoryNameIterable of Namespace")}
23+
}
24+
25+
blobDigestIterable, ok := namespace.(content.DigestIterable)
26+
if !ok {
27+
return &content.ErrNotImplemented{Reason: errors.New("DigestIterable of Namespace")}
28+
}
29+
30+
stabled := time.Now().Add(-excludeModifiedIn)
31+
32+
c := &collector{
33+
Vacuum: NewVacuum(d),
34+
namespace: namespace,
35+
used: map[digest.Digest]struct{}{},
36+
recentlyActivated: func(modTime time.Time) bool {
37+
return modTime.After(stabled)
38+
},
39+
}
40+
41+
return c.MarkAndSweep(ctx, repositoryNameIterable, blobDigestIterable)
42+
}
43+
44+
type collector struct {
45+
*Vacuum
46+
namespace content.Namespace
47+
used map[digest.Digest]struct{}
48+
recentlyActivated func(modTime time.Time) bool
49+
}
50+
51+
func (c *collector) count(dgst digest.Digest) {
52+
c.used[dgst] = struct{}{}
53+
}
54+
55+
func (c *collector) referenced(dgst digest.Digest) bool {
56+
_, ok := c.used[dgst]
57+
return ok
58+
}
59+
60+
func (c *collector) referencedOrRecentlyActivated(ld content.LinkedDigest) bool {
61+
if c.recentlyActivated(ld.ModTime) {
62+
c.count(ld.Digest)
63+
64+
return true
65+
}
66+
67+
_, ok := c.used[ld.Digest]
68+
return ok
69+
}
70+
71+
func (c *collector) MarkAndSweep(pctx context.Context, repositoryNameIterable content.RepositoryNameIterable, blobDigestIterable content.DigestIterable) error {
72+
ctx, l := logr.FromContext(pctx).Start(pctx, "MarkAndSweep")
73+
defer l.End()
74+
75+
for named, err := range repositoryNameIterable.RepositoryNames(ctx) {
76+
if err != nil {
77+
return err
78+
}
79+
80+
if err := c.markAndSweepRepository(ctx, named); err != nil {
81+
return fmt.Errorf("failed to mark and sweep repository %s: %w", named, err)
82+
}
83+
}
84+
85+
for d, err := range blobDigestIterable.Digests(ctx) {
86+
if err != nil {
87+
return err
88+
}
89+
90+
ll := l.WithValues(slog.String("blob", string(d)))
91+
92+
ll.Info("checking")
93+
94+
if c.referenced(d) {
95+
continue
96+
}
97+
98+
if err := c.RemoveBlob(ctx, d); err != nil {
99+
return fmt.Errorf("failed to remove blob %s: %w", d, err)
100+
}
101+
102+
ll.Info("deleted")
103+
}
104+
105+
return nil
106+
}
107+
108+
func (c *collector) markAndSweepRepository(ctx context.Context, named reference.Named) error {
109+
l := logr.FromContext(ctx)
110+
111+
repository, err := c.namespace.Repository(ctx, named)
112+
if err != nil {
113+
return fmt.Errorf("failed to construct repository: %w", err)
114+
}
115+
116+
tagService, err := repository.Tags(ctx)
117+
if err != nil {
118+
return fmt.Errorf("failed to tag service: %w", err)
119+
}
120+
121+
manifestService, err := repository.Manifests(ctx)
122+
if err != nil {
123+
return fmt.Errorf("failed to manifest service: %w", err)
124+
}
125+
126+
blobStore, err := repository.Blobs(ctx)
127+
if err != nil {
128+
return fmt.Errorf("failed to blob store: %w", err)
129+
}
130+
131+
manifestDigestIterable, ok := manifestService.(content.LinkedDigestIterable)
132+
if !ok {
133+
return &content.ErrNotImplemented{Reason: errors.New("LinkedDigestIterable of ManifestService")}
134+
}
135+
136+
layerDigestIterable, ok := blobStore.(content.LinkedDigestIterable)
137+
if !ok {
138+
return &content.ErrNotImplemented{Reason: errors.New("LinkedDigestIterable of BlobStore")}
139+
}
140+
141+
allTags, err := tagService.All(ctx)
142+
if err != nil {
143+
return fmt.Errorf("failed to get all tags: %w", err)
144+
}
145+
146+
for _, tag := range allTags {
147+
d, err := tagService.Get(ctx, tag)
148+
if err != nil {
149+
return err
150+
}
151+
152+
if err := c.markManifest(ctx, manifestService, d.Digest); err != nil {
153+
return err
154+
}
155+
}
156+
157+
l.WithValues(slog.String("name", named.String())).Info("marking")
158+
159+
for ld, err := range manifestDigestIterable.LinkedDigests(ctx) {
160+
if err != nil {
161+
return err
162+
}
163+
164+
ll := l.WithValues(
165+
slog.String("name", named.String()),
166+
slog.String("manifest", string(ld.Digest)),
167+
)
168+
169+
ll.Info("checking")
170+
171+
if c.referencedOrRecentlyActivated(ld) {
172+
continue
173+
}
174+
175+
if err := c.RemoveManifest(ctx, named, ld.Digest, allTags); err != nil {
176+
return fmt.Errorf("failed to remove manifest %s@%s: %w", named, ld.Digest, err)
177+
}
178+
179+
ll.Info("deleted")
180+
}
181+
182+
for ld, err := range layerDigestIterable.LinkedDigests(ctx) {
183+
if err != nil {
184+
return err
185+
}
186+
187+
ll := l.WithValues(
188+
slog.String("name", named.String()),
189+
slog.String("layer", string(ld.Digest)),
190+
)
191+
192+
ll.Debug("checking")
193+
194+
if c.referencedOrRecentlyActivated(ld) {
195+
continue
196+
}
197+
198+
if err := c.RemoveLayer(ctx, named, ld.Digest); err != nil {
199+
return fmt.Errorf("failed to remove layer %s@%s: %w", named, ld.Digest, err)
200+
}
201+
202+
ll.Info("deleted")
203+
}
204+
205+
return nil
206+
}
207+
208+
func (c *collector) markManifest(ctx context.Context, manifestService content.ManifestService, manifestDigest digest.Digest) error {
209+
m, err := manifestService.Get(ctx, manifestDigest)
210+
if err != nil {
211+
return err
212+
}
213+
214+
c.count(manifestDigest)
215+
216+
switch m.Type() {
217+
case manifestv1.DockerMediaTypeManifestList, manifestv1.MediaTypeImageIndex:
218+
for d := range m.References() {
219+
if err := c.markManifest(ctx, manifestService, d.Digest); err != nil {
220+
// skip for partial cached
221+
if errors.As(err, ptr.Ptr(&content.ErrManifestUnknownRevision{})) {
222+
continue
223+
}
224+
return err
225+
}
226+
}
227+
default:
228+
for d := range m.References() {
229+
c.count(d.Digest)
230+
}
231+
}
232+
233+
return nil
234+
}

0 commit comments

Comments
 (0)