Skip to content

Commit 2d608c3

Browse files
authored
Merge pull request moby#4529 from tonistiigi/lease-flightcontrol-fix
fix lease management with flightcontrol
2 parents 0fecf46 + 4339ee5 commit 2d608c3

File tree

3 files changed

+137
-44
lines changed

3 files changed

+137
-44
lines changed

cache/blobs.go

Lines changed: 67 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/moby/buildkit/util/compression"
1717
"github.com/moby/buildkit/util/converter"
1818
"github.com/moby/buildkit/util/flightcontrol"
19+
"github.com/moby/buildkit/util/leaseutil"
1920
"github.com/moby/buildkit/util/winlayers"
2021
digest "github.com/opencontainers/go-digest"
2122
imagespecidentity "github.com/opencontainers/image-spec/identity"
@@ -24,7 +25,7 @@ import (
2425
"golang.org/x/sync/errgroup"
2526
)
2627

27-
var g flightcontrol.Group[struct{}]
28+
var g flightcontrol.Group[*leaseutil.LeaseRef]
2829
var gFileList flightcontrol.Group[[]string]
2930

3031
var ErrNoBlobs = errors.Errorf("no blobs for snapshot")
@@ -87,14 +88,24 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
8788

8889
if _, ok := filter[sr.ID()]; ok {
8990
eg.Go(func() error {
90-
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (struct{}, error) {
91+
l, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (_ *leaseutil.LeaseRef, err error) {
9192
if sr.getBlob() != "" {
92-
return struct{}{}, nil
93+
return nil, nil
9394
}
9495
if !createIfNeeded {
95-
return struct{}{}, errors.WithStack(ErrNoBlobs)
96+
return nil, errors.WithStack(ErrNoBlobs)
9697
}
9798

99+
l, ctx, err := leaseutil.NewLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
100+
if err != nil {
101+
return nil, err
102+
}
103+
defer func() {
104+
if err != nil {
105+
l.Discard()
106+
}
107+
}()
108+
98109
compressorFunc, finalize := comp.Type.Compress(ctx, comp)
99110
mediaType := comp.Type.MediaType()
100111

@@ -109,12 +120,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
109120
if lowerRef != nil {
110121
m, err := lowerRef.Mount(ctx, true, s)
111122
if err != nil {
112-
return struct{}{}, err
123+
return nil, err
113124
}
114125
var release func() error
115126
lower, release, err = m.Mount()
116127
if err != nil {
117-
return struct{}{}, err
128+
return nil, err
118129
}
119130
if release != nil {
120131
defer release()
@@ -132,27 +143,26 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
132143
if upperRef != nil {
133144
m, err := upperRef.Mount(ctx, true, s)
134145
if err != nil {
135-
return struct{}{}, err
146+
return nil, err
136147
}
137148
var release func() error
138149
upper, release, err = m.Mount()
139150
if err != nil {
140-
return struct{}{}, err
151+
return nil, err
141152
}
142153
if release != nil {
143154
defer release()
144155
}
145156
}
146157

147158
var desc ocispecs.Descriptor
148-
var err error
149159

150160
// Determine differ and error/log handling according to the platform, envvar and the snapshotter.
151161
var enableOverlay, fallback, logWarnOnErr bool
152162
if forceOvlStr := os.Getenv("BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF"); forceOvlStr != "" && sr.kind() != Diff {
153163
enableOverlay, err = strconv.ParseBool(forceOvlStr)
154164
if err != nil {
155-
return struct{}{}, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
165+
return nil, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
156166
}
157167
fallback = false // prohibit fallback on debug
158168
} else if !isTypeWindows(sr) {
@@ -174,10 +184,10 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
174184
if !ok || err != nil {
175185
if !fallback {
176186
if !ok {
177-
return struct{}{}, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
187+
return nil, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
178188
}
179189
if err != nil {
180-
return struct{}{}, errors.Wrapf(err, "failed to compute overlay diff")
190+
return nil, errors.Wrapf(err, "failed to compute overlay diff")
181191
}
182192
}
183193
if logWarnOnErr {
@@ -210,7 +220,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
210220
diff.WithCompressor(compressorFunc),
211221
)
212222
if err != nil {
213-
return struct{}{}, err
223+
return nil, err
214224
}
215225
}
216226

@@ -220,34 +230,40 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
220230
if finalize != nil {
221231
a, err := finalize(ctx, sr.cm.ContentStore)
222232
if err != nil {
223-
return struct{}{}, errors.Wrapf(err, "failed to finalize compression")
233+
return nil, errors.Wrapf(err, "failed to finalize compression")
224234
}
225235
for k, v := range a {
226236
desc.Annotations[k] = v
227237
}
228238
}
229239
info, err := sr.cm.ContentStore.Info(ctx, desc.Digest)
230240
if err != nil {
231-
return struct{}{}, err
241+
return nil, err
232242
}
233243

234244
if diffID, ok := info.Labels[labels.LabelUncompressed]; ok {
235245
desc.Annotations[labels.LabelUncompressed] = diffID
236246
} else if mediaType == ocispecs.MediaTypeImageLayer {
237247
desc.Annotations[labels.LabelUncompressed] = desc.Digest.String()
238248
} else {
239-
return struct{}{}, errors.Errorf("unknown layer compression type")
249+
return nil, errors.Errorf("unknown layer compression type")
240250
}
241251

242252
if err := sr.setBlob(ctx, desc); err != nil {
243-
return struct{}{}, err
253+
return nil, err
244254
}
245-
return struct{}{}, nil
255+
return l, nil
246256
})
247257
if err != nil {
248258
return err
249259
}
250260

261+
if l != nil {
262+
if err := l.Adopt(ctx); err != nil {
263+
return err
264+
}
265+
}
266+
251267
if comp.Force {
252268
if err := ensureCompression(ctx, sr, comp, s); err != nil {
253269
return errors.Wrapf(err, "failed to ensure compression type of %q", comp.Type)
@@ -416,29 +432,42 @@ func isTypeWindows(sr *immutableRef) bool {
416432

417433
// ensureCompression ensures the specified ref has the blob of the specified compression Type.
418434
func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error {
419-
_, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (struct{}, error) {
435+
l, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (_ *leaseutil.LeaseRef, err error) {
420436
desc, err := ref.ociDesc(ctx, ref.descHandlers, true)
421437
if err != nil {
422-
return struct{}{}, err
438+
return nil, err
439+
}
440+
441+
l, ctx, err := leaseutil.NewLease(ctx, ref.cm.LeaseManager, leaseutil.MakeTemporary)
442+
if err != nil {
443+
return nil, err
423444
}
445+
defer func() {
446+
if err != nil {
447+
l.Discard()
448+
}
449+
}()
424450

425451
// Resolve converters
426452
layerConvertFunc, err := converter.New(ctx, ref.cm.ContentStore, desc, comp)
427453
if err != nil {
428-
return struct{}{}, err
454+
return nil, err
429455
} else if layerConvertFunc == nil {
430456
if isLazy, err := ref.isLazy(ctx); err != nil {
431-
return struct{}{}, err
457+
return nil, err
432458
} else if isLazy {
433459
// This ref can be used as the specified compressionType. Keep it lazy.
434-
return struct{}{}, nil
460+
return l, nil
435461
}
436-
return struct{}{}, ref.linkBlob(ctx, desc)
462+
if err := ref.linkBlob(ctx, desc); err != nil {
463+
return nil, err
464+
}
465+
return l, nil
437466
}
438467

439468
// First, lookup local content store
440469
if _, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil {
441-
return struct{}{}, nil // found the compression variant. no need to convert.
470+
return l, nil // found the compression variant. no need to convert.
442471
}
443472

444473
// Convert layer compression type
@@ -448,18 +477,26 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.
448477
dh: ref.descHandlers[desc.Digest],
449478
session: s,
450479
}).Unlazy(ctx); err != nil {
451-
return struct{}{}, err
480+
return l, err
452481
}
453482
newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc)
454483
if err != nil {
455-
return struct{}{}, errors.Wrapf(err, "failed to convert")
484+
return nil, errors.Wrapf(err, "failed to convert")
456485
}
457486

458487
// Start to track converted layer
459488
if err := ref.linkBlob(ctx, *newDesc); err != nil {
460-
return struct{}{}, errors.Wrapf(err, "failed to add compression blob")
489+
return nil, errors.Wrapf(err, "failed to add compression blob")
461490
}
462-
return struct{}{}, nil
491+
return l, nil
463492
})
464-
return err
493+
if err != nil {
494+
return err
495+
}
496+
if l != nil {
497+
if err := l.Adopt(ctx); err != nil {
498+
return err
499+
}
500+
}
501+
return nil
465502
}

cache/refs.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context,
10621062
}
10631063

10641064
func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s session.Group) error {
1065-
_, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ struct{}, rerr error) {
1065+
_, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ *leaseutil.LeaseRef, rerr error) {
10661066
dhs := sr.descHandlers
10671067
for _, r := range sr.layerChain() {
10681068
r := r
@@ -1074,7 +1074,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s
10741074
dh := dhs[digest.Digest(r.getBlob())]
10751075
if dh == nil {
10761076
// We cannot prepare remote snapshots without descHandler.
1077-
return struct{}{}, nil
1077+
return nil, nil
10781078
}
10791079

10801080
// tmpLabels contains dh.SnapshotLabels + session IDs. All keys contain
@@ -1135,7 +1135,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s
11351135
break
11361136
}
11371137

1138-
return struct{}{}, nil
1138+
return nil, nil
11391139
})
11401140
return err
11411141
}
@@ -1158,28 +1158,31 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields
11581158
}
11591159

11601160
func (sr *immutableRef) unlazy(ctx context.Context, dhs DescHandlers, pg progress.Controller, s session.Group, topLevel bool, ensureContentStore bool) error {
1161-
_, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ struct{}, rerr error) {
1161+
_, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ *leaseutil.LeaseRef, rerr error) {
11621162
if _, err := sr.cm.Snapshotter.Stat(ctx, sr.getSnapshotID()); err == nil {
11631163
if !ensureContentStore {
1164-
return struct{}{}, nil
1164+
return nil, nil
11651165
}
11661166
if blob := sr.getBlob(); blob == "" {
1167-
return struct{}{}, nil
1167+
return nil, nil
11681168
}
11691169
if _, err := sr.cm.ContentStore.Info(ctx, sr.getBlob()); err == nil {
1170-
return struct{}{}, nil
1170+
return nil, nil
11711171
}
11721172
}
11731173

11741174
switch sr.kind() {
11751175
case Merge, Diff:
1176-
return struct{}{}, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel, ensureContentStore)
1176+
return nil, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel, ensureContentStore)
11771177
case Layer, BaseLayer:
1178-
return struct{}{}, sr.unlazyLayer(ctx, dhs, pg, s, ensureContentStore)
1178+
return nil, sr.unlazyLayer(ctx, dhs, pg, s, ensureContentStore)
11791179
}
1180-
return struct{}{}, nil
1180+
return nil, nil
11811181
})
1182-
return err
1182+
if err != nil {
1183+
return err
1184+
}
1185+
return nil
11831186
}
11841187

11851188
// should be called within sizeG.Do call for this ref's ID

util/leaseutil/manager.go

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package leaseutil
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
"github.com/containerd/containerd/leases"
89
"github.com/containerd/containerd/namespaces"
10+
"github.com/pkg/errors"
911
)
1012

1113
func WithLease(ctx context.Context, ls leases.Manager, opts ...leases.Opt) (context.Context, func(context.Context) error, error) {
@@ -16,17 +18,68 @@ func WithLease(ctx context.Context, ls leases.Manager, opts ...leases.Opt) (cont
1618
}, nil
1719
}
1820

19-
l, err := ls.Create(ctx, append([]leases.Opt{leases.WithRandomID(), leases.WithExpiration(time.Hour)}, opts...)...)
21+
lr, ctx, err := NewLease(ctx, ls, opts...)
2022
if err != nil {
2123
return nil, nil, err
2224
}
2325

24-
ctx = leases.WithLease(ctx, l.ID)
2526
return ctx, func(ctx context.Context) error {
26-
return ls.Delete(ctx, l)
27+
return ls.Delete(ctx, lr.l)
2728
}, nil
2829
}
2930

31+
func NewLease(ctx context.Context, lm leases.Manager, opts ...leases.Opt) (*LeaseRef, context.Context, error) {
32+
l, err := lm.Create(ctx, append([]leases.Opt{leases.WithRandomID(), leases.WithExpiration(time.Hour)}, opts...)...)
33+
if err != nil {
34+
return nil, nil, err
35+
}
36+
37+
ctx = leases.WithLease(ctx, l.ID)
38+
return &LeaseRef{lm: lm, l: l}, ctx, nil
39+
}
40+
41+
type LeaseRef struct {
42+
lm leases.Manager
43+
l leases.Lease
44+
45+
once sync.Once
46+
resources []leases.Resource
47+
err error
48+
}
49+
50+
func (l *LeaseRef) Discard() error {
51+
return l.lm.Delete(context.Background(), l.l)
52+
}
53+
54+
func (l *LeaseRef) Adopt(ctx context.Context) error {
55+
l.once.Do(func() {
56+
resources, err := l.lm.ListResources(ctx, l.l)
57+
if err != nil {
58+
l.err = err
59+
return
60+
}
61+
l.resources = resources
62+
})
63+
if l.err != nil {
64+
return l.err
65+
}
66+
currentID, ok := leases.FromContext(ctx)
67+
if !ok {
68+
return errors.Errorf("missing lease requirement for adopt")
69+
}
70+
for _, r := range l.resources {
71+
if err := l.lm.AddResource(ctx, leases.Lease{ID: currentID}, r); err != nil {
72+
return err
73+
}
74+
}
75+
if len(l.resources) == 0 {
76+
l.Discard()
77+
return nil
78+
}
79+
go l.Discard()
80+
return nil
81+
}
82+
3083
func MakeTemporary(l *leases.Lease) error {
3184
if l.Labels == nil {
3285
l.Labels = map[string]string{}

0 commit comments

Comments
 (0)