Skip to content

Commit 4339ee5

Browse files
committed
fix lease management with flightcontrol
When one flightcontrol callback gets canceled ctx.Value() stops working for aquiring leases for remaining callbacks. While this behavior should be also looked at more carefully, returning a lease for the first callback or for remaining callback would not be correct as some objects can be tracked by first lease and that lease could be already deleted by the first callpath. This fixes it so that any object tracked by flightcontrol callback will be copied to the lease of every codepath after the callback has returned. Signed-off-by: Tonis Tiigi <[email protected]>
1 parent 5809d41 commit 4339ee5

File tree

3 files changed

+137
-44
lines changed

3 files changed

+137
-44
lines changed

cache/blobs.go

Lines changed: 67 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/moby/buildkit/util/compression"
1717
"github.com/moby/buildkit/util/converter"
1818
"github.com/moby/buildkit/util/flightcontrol"
19+
"github.com/moby/buildkit/util/leaseutil"
1920
"github.com/moby/buildkit/util/winlayers"
2021
digest "github.com/opencontainers/go-digest"
2122
imagespecidentity "github.com/opencontainers/image-spec/identity"
@@ -24,7 +25,7 @@ import (
2425
"golang.org/x/sync/errgroup"
2526
)
2627

27-
var g flightcontrol.Group[struct{}]
28+
var g flightcontrol.Group[*leaseutil.LeaseRef]
2829
var gFileList flightcontrol.Group[[]string]
2930

3031
var ErrNoBlobs = errors.Errorf("no blobs for snapshot")
@@ -87,14 +88,24 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
8788

8889
if _, ok := filter[sr.ID()]; ok {
8990
eg.Go(func() error {
90-
_, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (struct{}, error) {
91+
l, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (_ *leaseutil.LeaseRef, err error) {
9192
if sr.getBlob() != "" {
92-
return struct{}{}, nil
93+
return nil, nil
9394
}
9495
if !createIfNeeded {
95-
return struct{}{}, errors.WithStack(ErrNoBlobs)
96+
return nil, errors.WithStack(ErrNoBlobs)
9697
}
9798

99+
l, ctx, err := leaseutil.NewLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
100+
if err != nil {
101+
return nil, err
102+
}
103+
defer func() {
104+
if err != nil {
105+
l.Discard()
106+
}
107+
}()
108+
98109
compressorFunc, finalize := comp.Type.Compress(ctx, comp)
99110
mediaType := comp.Type.MediaType()
100111

@@ -109,12 +120,12 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
109120
if lowerRef != nil {
110121
m, err := lowerRef.Mount(ctx, true, s)
111122
if err != nil {
112-
return struct{}{}, err
123+
return nil, err
113124
}
114125
var release func() error
115126
lower, release, err = m.Mount()
116127
if err != nil {
117-
return struct{}{}, err
128+
return nil, err
118129
}
119130
if release != nil {
120131
defer release()
@@ -132,27 +143,26 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
132143
if upperRef != nil {
133144
m, err := upperRef.Mount(ctx, true, s)
134145
if err != nil {
135-
return struct{}{}, err
146+
return nil, err
136147
}
137148
var release func() error
138149
upper, release, err = m.Mount()
139150
if err != nil {
140-
return struct{}{}, err
151+
return nil, err
141152
}
142153
if release != nil {
143154
defer release()
144155
}
145156
}
146157

147158
var desc ocispecs.Descriptor
148-
var err error
149159

150160
// Determine differ and error/log handling according to the platform, envvar and the snapshotter.
151161
var enableOverlay, fallback, logWarnOnErr bool
152162
if forceOvlStr := os.Getenv("BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF"); forceOvlStr != "" && sr.kind() != Diff {
153163
enableOverlay, err = strconv.ParseBool(forceOvlStr)
154164
if err != nil {
155-
return struct{}{}, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
165+
return nil, errors.Wrapf(err, "invalid boolean in BUILDKIT_DEBUG_FORCE_OVERLAY_DIFF")
156166
}
157167
fallback = false // prohibit fallback on debug
158168
} else if !isTypeWindows(sr) {
@@ -174,10 +184,10 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
174184
if !ok || err != nil {
175185
if !fallback {
176186
if !ok {
177-
return struct{}{}, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
187+
return nil, errors.Errorf("overlay mounts not detected (lower=%+v,upper=%+v)", lower, upper)
178188
}
179189
if err != nil {
180-
return struct{}{}, errors.Wrapf(err, "failed to compute overlay diff")
190+
return nil, errors.Wrapf(err, "failed to compute overlay diff")
181191
}
182192
}
183193
if logWarnOnErr {
@@ -210,7 +220,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
210220
diff.WithCompressor(compressorFunc),
211221
)
212222
if err != nil {
213-
return struct{}{}, err
223+
return nil, err
214224
}
215225
}
216226

@@ -220,34 +230,40 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
220230
if finalize != nil {
221231
a, err := finalize(ctx, sr.cm.ContentStore)
222232
if err != nil {
223-
return struct{}{}, errors.Wrapf(err, "failed to finalize compression")
233+
return nil, errors.Wrapf(err, "failed to finalize compression")
224234
}
225235
for k, v := range a {
226236
desc.Annotations[k] = v
227237
}
228238
}
229239
info, err := sr.cm.ContentStore.Info(ctx, desc.Digest)
230240
if err != nil {
231-
return struct{}{}, err
241+
return nil, err
232242
}
233243

234244
if diffID, ok := info.Labels[labels.LabelUncompressed]; ok {
235245
desc.Annotations[labels.LabelUncompressed] = diffID
236246
} else if mediaType == ocispecs.MediaTypeImageLayer {
237247
desc.Annotations[labels.LabelUncompressed] = desc.Digest.String()
238248
} else {
239-
return struct{}{}, errors.Errorf("unknown layer compression type")
249+
return nil, errors.Errorf("unknown layer compression type")
240250
}
241251

242252
if err := sr.setBlob(ctx, desc); err != nil {
243-
return struct{}{}, err
253+
return nil, err
244254
}
245-
return struct{}{}, nil
255+
return l, nil
246256
})
247257
if err != nil {
248258
return err
249259
}
250260

261+
if l != nil {
262+
if err := l.Adopt(ctx); err != nil {
263+
return err
264+
}
265+
}
266+
251267
if comp.Force {
252268
if err := ensureCompression(ctx, sr, comp, s); err != nil {
253269
return errors.Wrapf(err, "failed to ensure compression type of %q", comp.Type)
@@ -416,29 +432,42 @@ func isTypeWindows(sr *immutableRef) bool {
416432

417433
// ensureCompression ensures the specified ref has the blob of the specified compression Type.
418434
func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error {
419-
_, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (struct{}, error) {
435+
l, err := g.Do(ctx, fmt.Sprintf("ensureComp-%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (_ *leaseutil.LeaseRef, err error) {
420436
desc, err := ref.ociDesc(ctx, ref.descHandlers, true)
421437
if err != nil {
422-
return struct{}{}, err
438+
return nil, err
439+
}
440+
441+
l, ctx, err := leaseutil.NewLease(ctx, ref.cm.LeaseManager, leaseutil.MakeTemporary)
442+
if err != nil {
443+
return nil, err
423444
}
445+
defer func() {
446+
if err != nil {
447+
l.Discard()
448+
}
449+
}()
424450

425451
// Resolve converters
426452
layerConvertFunc, err := converter.New(ctx, ref.cm.ContentStore, desc, comp)
427453
if err != nil {
428-
return struct{}{}, err
454+
return nil, err
429455
} else if layerConvertFunc == nil {
430456
if isLazy, err := ref.isLazy(ctx); err != nil {
431-
return struct{}{}, err
457+
return nil, err
432458
} else if isLazy {
433459
// This ref can be used as the specified compressionType. Keep it lazy.
434-
return struct{}{}, nil
460+
return l, nil
435461
}
436-
return struct{}{}, ref.linkBlob(ctx, desc)
462+
if err := ref.linkBlob(ctx, desc); err != nil {
463+
return nil, err
464+
}
465+
return l, nil
437466
}
438467

439468
// First, lookup local content store
440469
if _, err := ref.getBlobWithCompression(ctx, comp.Type); err == nil {
441-
return struct{}{}, nil // found the compression variant. no need to convert.
470+
return l, nil // found the compression variant. no need to convert.
442471
}
443472

444473
// Convert layer compression type
@@ -448,18 +477,26 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.
448477
dh: ref.descHandlers[desc.Digest],
449478
session: s,
450479
}).Unlazy(ctx); err != nil {
451-
return struct{}{}, err
480+
return l, err
452481
}
453482
newDesc, err := layerConvertFunc(ctx, ref.cm.ContentStore, desc)
454483
if err != nil {
455-
return struct{}{}, errors.Wrapf(err, "failed to convert")
484+
return nil, errors.Wrapf(err, "failed to convert")
456485
}
457486

458487
// Start to track converted layer
459488
if err := ref.linkBlob(ctx, *newDesc); err != nil {
460-
return struct{}{}, errors.Wrapf(err, "failed to add compression blob")
489+
return nil, errors.Wrapf(err, "failed to add compression blob")
461490
}
462-
return struct{}{}, nil
491+
return l, nil
463492
})
464-
return err
493+
if err != nil {
494+
return err
495+
}
496+
if l != nil {
497+
if err := l.Adopt(ctx); err != nil {
498+
return err
499+
}
500+
}
501+
return nil
465502
}

cache/refs.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context,
10621062
}
10631063

10641064
func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s session.Group) error {
1065-
_, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ struct{}, rerr error) {
1065+
_, err := g.Do(ctx, sr.ID()+"-prepare-remote-snapshot", func(ctx context.Context) (_ *leaseutil.LeaseRef, rerr error) {
10661066
dhs := sr.descHandlers
10671067
for _, r := range sr.layerChain() {
10681068
r := r
@@ -1074,7 +1074,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s
10741074
dh := dhs[digest.Digest(r.getBlob())]
10751075
if dh == nil {
10761076
// We cannot prepare remote snapshots without descHandler.
1077-
return struct{}{}, nil
1077+
return nil, nil
10781078
}
10791079

10801080
// tmpLabels contains dh.SnapshotLabels + session IDs. All keys contain
@@ -1135,7 +1135,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s
11351135
break
11361136
}
11371137

1138-
return struct{}{}, nil
1138+
return nil, nil
11391139
})
11401140
return err
11411141
}
@@ -1158,28 +1158,31 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields
11581158
}
11591159

11601160
func (sr *immutableRef) unlazy(ctx context.Context, dhs DescHandlers, pg progress.Controller, s session.Group, topLevel bool, ensureContentStore bool) error {
1161-
_, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ struct{}, rerr error) {
1161+
_, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ *leaseutil.LeaseRef, rerr error) {
11621162
if _, err := sr.cm.Snapshotter.Stat(ctx, sr.getSnapshotID()); err == nil {
11631163
if !ensureContentStore {
1164-
return struct{}{}, nil
1164+
return nil, nil
11651165
}
11661166
if blob := sr.getBlob(); blob == "" {
1167-
return struct{}{}, nil
1167+
return nil, nil
11681168
}
11691169
if _, err := sr.cm.ContentStore.Info(ctx, sr.getBlob()); err == nil {
1170-
return struct{}{}, nil
1170+
return nil, nil
11711171
}
11721172
}
11731173

11741174
switch sr.kind() {
11751175
case Merge, Diff:
1176-
return struct{}{}, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel, ensureContentStore)
1176+
return nil, sr.unlazyDiffMerge(ctx, dhs, pg, s, topLevel, ensureContentStore)
11771177
case Layer, BaseLayer:
1178-
return struct{}{}, sr.unlazyLayer(ctx, dhs, pg, s, ensureContentStore)
1178+
return nil, sr.unlazyLayer(ctx, dhs, pg, s, ensureContentStore)
11791179
}
1180-
return struct{}{}, nil
1180+
return nil, nil
11811181
})
1182-
return err
1182+
if err != nil {
1183+
return err
1184+
}
1185+
return nil
11831186
}
11841187

11851188
// should be called within sizeG.Do call for this ref's ID

util/leaseutil/manager.go

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package leaseutil
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
"github.com/containerd/containerd/leases"
89
"github.com/containerd/containerd/namespaces"
10+
"github.com/pkg/errors"
911
)
1012

1113
func WithLease(ctx context.Context, ls leases.Manager, opts ...leases.Opt) (context.Context, func(context.Context) error, error) {
@@ -16,17 +18,68 @@ func WithLease(ctx context.Context, ls leases.Manager, opts ...leases.Opt) (cont
1618
}, nil
1719
}
1820

19-
l, err := ls.Create(ctx, append([]leases.Opt{leases.WithRandomID(), leases.WithExpiration(time.Hour)}, opts...)...)
21+
lr, ctx, err := NewLease(ctx, ls, opts...)
2022
if err != nil {
2123
return nil, nil, err
2224
}
2325

24-
ctx = leases.WithLease(ctx, l.ID)
2526
return ctx, func(ctx context.Context) error {
26-
return ls.Delete(ctx, l)
27+
return ls.Delete(ctx, lr.l)
2728
}, nil
2829
}
2930

31+
func NewLease(ctx context.Context, lm leases.Manager, opts ...leases.Opt) (*LeaseRef, context.Context, error) {
32+
l, err := lm.Create(ctx, append([]leases.Opt{leases.WithRandomID(), leases.WithExpiration(time.Hour)}, opts...)...)
33+
if err != nil {
34+
return nil, nil, err
35+
}
36+
37+
ctx = leases.WithLease(ctx, l.ID)
38+
return &LeaseRef{lm: lm, l: l}, ctx, nil
39+
}
40+
41+
type LeaseRef struct {
42+
lm leases.Manager
43+
l leases.Lease
44+
45+
once sync.Once
46+
resources []leases.Resource
47+
err error
48+
}
49+
50+
func (l *LeaseRef) Discard() error {
51+
return l.lm.Delete(context.Background(), l.l)
52+
}
53+
54+
func (l *LeaseRef) Adopt(ctx context.Context) error {
55+
l.once.Do(func() {
56+
resources, err := l.lm.ListResources(ctx, l.l)
57+
if err != nil {
58+
l.err = err
59+
return
60+
}
61+
l.resources = resources
62+
})
63+
if l.err != nil {
64+
return l.err
65+
}
66+
currentID, ok := leases.FromContext(ctx)
67+
if !ok {
68+
return errors.Errorf("missing lease requirement for adopt")
69+
}
70+
for _, r := range l.resources {
71+
if err := l.lm.AddResource(ctx, leases.Lease{ID: currentID}, r); err != nil {
72+
return err
73+
}
74+
}
75+
if len(l.resources) == 0 {
76+
l.Discard()
77+
return nil
78+
}
79+
go l.Discard()
80+
return nil
81+
}
82+
3083
func MakeTemporary(l *leases.Lease) error {
3184
if l.Labels == nil {
3285
l.Labels = map[string]string{}

0 commit comments

Comments
 (0)