Skip to content

Commit 1909622

Browse files
committed
remove most containers/image usage from imageutil.Cache interface
Signed-off-by: Joe Lanford <[email protected]>
1 parent efce35b commit 1909622

File tree

3 files changed

+78
-66
lines changed

3 files changed

+78
-66
lines changed

catalogd/internal/controllers/core/clustercatalog_controller_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@ import (
55
"errors"
66
"fmt"
77
"io/fs"
8+
"iter"
89
"net/http"
910
"testing"
1011
"testing/fstest"
1112
"time"
1213

1314
"github.com/containers/image/v5/docker/reference"
14-
"github.com/containers/image/v5/types"
1515
"github.com/google/go-cmp/cmp"
1616
"github.com/google/go-cmp/cmp/cmpopts"
17+
ocispecv1 "github.com/opencontainers/image-spec/specs-go/v1"
1718
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
1920
"k8s.io/apimachinery/pkg/api/meta"
@@ -59,7 +60,7 @@ func (ms *MockCache) Fetch(_ context.Context, _ string, _ reference.Canonical) (
5960
panic("not implemented")
6061
}
6162

62-
func (ms *MockCache) Store(_ context.Context, _ string, _ reference.Named, _ reference.Canonical, _ types.Image, _ types.ImageSource) (fs.FS, time.Time, error) {
63+
func (ms *MockCache) Store(_ context.Context, _ string, _ reference.Named, _ reference.Canonical, _ ocispecv1.Image, _ iter.Seq[imageutil.LayerData]) (fs.FS, time.Time, error) {
6364
panic("not implemented")
6465
}
6566

internal/util/image/disk_cache.go

Lines changed: 28 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,14 @@ import (
55
"errors"
66
"fmt"
77
"io/fs"
8+
"iter"
89
"os"
910
"path/filepath"
1011
"slices"
1112
"time"
1213

1314
"github.com/containerd/containerd/archive"
1415
"github.com/containers/image/v5/docker/reference"
15-
"github.com/containers/image/v5/pkg/blobinfocache/none"
16-
"github.com/containers/image/v5/pkg/compression"
17-
"github.com/containers/image/v5/types"
1816
"github.com/opencontainers/go-digest"
1917
ocispecv1 "github.com/opencontainers/image-spec/specs-go/v1"
2018
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -85,27 +83,41 @@ func (a *diskCache) unpackPath(id string, digest digest.Digest) string {
8583
return filepath.Join(a.idPath(id), digest.String())
8684
}
8785

88-
func (a *diskCache) Store(ctx context.Context, id string, srcRef reference.Named, canonicalRef reference.Canonical, img types.Image, imgSrc types.ImageSource) (fs.FS, time.Time, error) {
89-
var filter archive.Filter
86+
func (a *diskCache) Store(ctx context.Context, id string, srcRef reference.Named, canonicalRef reference.Canonical, imgCfg ocispecv1.Image, layers iter.Seq[LayerData]) (fs.FS, time.Time, error) {
87+
var applyOpts []archive.ApplyOpt
9088
if a.filterFunc != nil {
91-
var err error
92-
93-
imgCfg, err := img.OCIConfig(ctx)
94-
if err != nil {
95-
return nil, time.Time{}, fmt.Errorf("error parsing image config: %w", err)
96-
}
97-
98-
filter, err = a.filterFunc(ctx, srcRef, *imgCfg)
89+
filter, err := a.filterFunc(ctx, srcRef, imgCfg)
9990
if err != nil {
10091
return nil, time.Time{}, err
10192
}
93+
applyOpts = append(applyOpts, archive.WithFilter(filter))
10294
}
10395

104-
modTime, err := a.applyLayersToDisk(ctx, id, canonicalRef, img, imgSrc, filter)
96+
dest := a.unpackPath(id, canonicalRef.Digest())
97+
if err := fsutil.EnsureEmptyDirectory(dest, 0700); err != nil {
98+
return nil, time.Time{}, fmt.Errorf("error ensuring empty unpack directory: %w", err)
99+
}
100+
modTime, err := func() (time.Time, error) {
101+
l := log.FromContext(ctx)
102+
l.Info("unpacking image", "path", dest)
103+
for layer := range layers {
104+
if layer.Err != nil {
105+
return time.Time{}, fmt.Errorf("error reading layer[%d]: %w", layer.Index, layer.Err)
106+
}
107+
if _, err := archive.Apply(ctx, dest, layer.Reader, applyOpts...); err != nil {
108+
return time.Time{}, fmt.Errorf("error applying layer[%d]: %w", layer.Index, err)
109+
}
110+
l.Info("applied layer", "layer", layer.Index)
111+
}
112+
if err := fsutil.SetReadOnlyRecursive(dest); err != nil {
113+
return time.Time{}, fmt.Errorf("error making unpack directory read-only: %w", err)
114+
}
115+
return fsutil.GetDirectoryModTime(dest)
116+
}()
105117
if err != nil {
106-
return nil, time.Time{}, err
118+
return nil, time.Time{}, errors.Join(err, fsutil.DeleteReadOnlyRecursive(dest))
107119
}
108-
return os.DirFS(a.unpackPath(id, canonicalRef.Digest())), modTime, nil
120+
return os.DirFS(dest), modTime, nil
109121
}
110122

111123
func (a *diskCache) DeleteID(_ context.Context, id string) error {
@@ -130,49 +142,3 @@ func (a *diskCache) GarbageCollect(_ context.Context, id string, keep reference.
130142
}
131143
return nil
132144
}
133-
134-
// applyLayersToDisk writes the layers from img and imgSrc to disk using the provided filter.
135-
// The destination directory will be created, if necessary. If dest is already present, its
136-
// contents will be deleted. If img and imgSrc do not represent the same image, an error will
137-
// be returned due to a mismatch in the expected layers. Once complete, the dest and its contents
138-
// are marked as read-only to provide a safeguard against unintended changes.
139-
func (a *diskCache) applyLayersToDisk(ctx context.Context, id string, canonicalRef reference.Canonical, img types.Image, imgSrc types.ImageSource, filter archive.Filter) (time.Time, error) {
140-
var applyOpts []archive.ApplyOpt
141-
if filter != nil {
142-
applyOpts = append(applyOpts, archive.WithFilter(filter))
143-
}
144-
145-
dest := a.unpackPath(id, canonicalRef.Digest())
146-
if err := fsutil.EnsureEmptyDirectory(dest, 0700); err != nil {
147-
return time.Time{}, fmt.Errorf("error ensuring empty unpack directory: %w", err)
148-
}
149-
l := log.FromContext(ctx)
150-
l.Info("unpacking image", "path", dest)
151-
for i, layerInfo := range img.LayerInfos() {
152-
if err := func() error {
153-
layerReader, _, err := imgSrc.GetBlob(ctx, layerInfo, none.NoCache)
154-
if err != nil {
155-
return fmt.Errorf("error getting blob for layer[%d]: %w", i, err)
156-
}
157-
defer layerReader.Close()
158-
159-
decompressed, _, err := compression.AutoDecompress(layerReader)
160-
if err != nil {
161-
return fmt.Errorf("auto-decompress failed: %w", err)
162-
}
163-
defer decompressed.Close()
164-
165-
if _, err := archive.Apply(ctx, dest, decompressed, applyOpts...); err != nil {
166-
return fmt.Errorf("error applying layer[%d]: %w", i, err)
167-
}
168-
l.Info("applied layer", "layer", i)
169-
return nil
170-
}(); err != nil {
171-
return time.Time{}, errors.Join(err, fsutil.DeleteReadOnlyRecursive(dest))
172-
}
173-
}
174-
if err := fsutil.SetReadOnlyRecursive(dest); err != nil {
175-
return time.Time{}, fmt.Errorf("error making unpack directory read-only: %w", err)
176-
}
177-
return fsutil.GetDirectoryModTime(dest)
178-
}

internal/util/image/pull.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"io"
78
"io/fs"
9+
"iter"
810
"os"
911
"time"
1012

@@ -14,10 +16,13 @@ import (
1416
"github.com/containers/image/v5/image"
1517
"github.com/containers/image/v5/manifest"
1618
"github.com/containers/image/v5/oci/layout"
19+
"github.com/containers/image/v5/pkg/blobinfocache/none"
20+
"github.com/containers/image/v5/pkg/compression"
1721
"github.com/containers/image/v5/pkg/sysregistriesv2"
1822
"github.com/containers/image/v5/signature"
1923
"github.com/containers/image/v5/types"
2024
"github.com/go-logr/logr"
25+
ocispecv1 "github.com/opencontainers/image-spec/specs-go/v1"
2126
"sigs.k8s.io/controller-runtime/pkg/log"
2227
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2328
)
@@ -26,9 +31,15 @@ type Puller interface {
2631
Pull(context.Context, string, string, Cache) (fs.FS, reference.Canonical, time.Time, error)
2732
}
2833

34+
type LayerData struct {
35+
Reader io.Reader
36+
Index int
37+
Err error
38+
}
39+
2940
type Cache interface {
3041
Fetch(context.Context, string, reference.Canonical) (fs.FS, time.Time, error)
31-
Store(context.Context, string, reference.Named, reference.Canonical, types.Image, types.ImageSource) (fs.FS, time.Time, error)
42+
Store(context.Context, string, reference.Named, reference.Canonical, ocispecv1.Image, iter.Seq[LayerData]) (fs.FS, time.Time, error)
3243
DeleteID(context.Context, string) error
3344
GarbageCollect(context.Context, string, reference.Canonical) error
3445
}
@@ -202,7 +213,41 @@ func (p *ContainersImagePuller) applyImage(ctx context.Context, id string, srcRe
202213
panic(err)
203214
}
204215
}()
205-
return cache.Store(ctx, id, srcRef, canonicalRef, img, imgSrc)
216+
217+
ociImg, err := img.OCIConfig(ctx)
218+
if err != nil {
219+
return nil, time.Time{}, err
220+
}
221+
222+
layerIter := iter.Seq[LayerData](func(yield func(LayerData) bool) {
223+
for i, layerInfo := range img.LayerInfos() {
224+
ld := LayerData{Index: i}
225+
layerReader, _, err := imgSrc.GetBlob(ctx, layerInfo, none.NoCache)
226+
if err != nil {
227+
ld.Err = fmt.Errorf("error getting layer blob reader: %w", err)
228+
if !yield(ld) {
229+
return
230+
}
231+
}
232+
defer layerReader.Close()
233+
234+
decompressed, _, err := compression.AutoDecompress(layerReader)
235+
if err != nil {
236+
ld.Err = fmt.Errorf("error decompressing layer: %w", err)
237+
if !yield(ld) {
238+
return
239+
}
240+
}
241+
defer decompressed.Close()
242+
243+
ld.Reader = decompressed
244+
if !yield(ld) {
245+
return
246+
}
247+
}
248+
})
249+
250+
return cache.Store(ctx, id, srcRef, canonicalRef, *ociImg, layerIter)
206251
}
207252

208253
func loadPolicyContext(sourceContext *types.SystemContext, l logr.Logger) (*signature.PolicyContext, error) {

0 commit comments

Comments
 (0)