Skip to content

Commit e07b5ed

Browse files
authored
Merge pull request #6451 from amrmahdi/amrmahdi/parallel-export
solver: run image and cache exports in parallel
2 parents 649062d + 88ef66c commit e07b5ed

File tree

6 files changed

+145
-83
lines changed

6 files changed

+145
-83
lines changed

exporter/containerimage/export.go

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func (e *imageExporterInstance) Attrs() map[string]string {
219219
return e.attrs
220220
}
221221

222-
func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source, buildInfo exporter.ExportBuildInfo) (_ map[string]string, descref exporter.DescriptorReference, err error) {
222+
func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source, buildInfo exporter.ExportBuildInfo) (_ map[string]string, _ exporter.FinalizeFunc, descref exporter.DescriptorReference, err error) {
223223
src = src.Clone()
224224
if src.Metadata == nil {
225225
src.Metadata = make(map[string][]byte)
@@ -229,14 +229,17 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
229229
opts := e.opts
230230
as, _, err := ParseAnnotations(src.Metadata)
231231
if err != nil {
232-
return nil, nil, err
232+
return nil, nil, nil, err
233233
}
234234
opts.Annotations = opts.Annotations.Merge(as)
235235

236236
ctx, done, err := leaseutil.WithLease(ctx, e.opt.LeaseManager, leaseutil.MakeTemporary)
237237
if err != nil {
238-
return nil, nil, err
238+
return nil, nil, nil, err
239239
}
240+
// On success, we create descref which holds the lease's done function.
241+
// The solver will release descref after recording the descriptor in build
242+
// history. On error (descref is nil), we release the lease here.
240243
defer func() {
241244
if descref == nil {
242245
done(context.WithoutCancel(ctx))
@@ -245,13 +248,8 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
245248

246249
desc, err := e.opt.ImageWriter.Commit(ctx, src, buildInfo.SessionID, buildInfo.InlineCache, &opts)
247250
if err != nil {
248-
return nil, nil, err
251+
return nil, nil, nil, err
249252
}
250-
defer func() {
251-
if err == nil {
252-
descref = NewDescriptorReference(*desc, done)
253-
}
254-
}()
255253

256254
resp := make(map[string]string)
257255

@@ -270,6 +268,9 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
270268
}
271269
}
272270

271+
// Collect names for finalize callback to push
272+
var namesToPush []string
273+
273274
if e.opts.ImageName != "" {
274275
targetNames := strings.SplitSeq(e.opts.ImageName, ",")
275276
for targetName := range targetNames {
@@ -299,11 +300,11 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
299300
img.Name = targetName + sfx
300301
if _, err := e.opt.Images.Update(imageClientCtx, img); err != nil {
301302
if !errors.Is(err, cerrdefs.ErrNotFound) {
302-
return nil, nil, tagDone(err)
303+
return nil, nil, nil, tagDone(err)
303304
}
304305

305306
if _, err := e.opt.Images.Create(imageClientCtx, img); err != nil {
306-
return nil, nil, tagDone(err)
307+
return nil, nil, nil, tagDone(err)
307308
}
308309
}
309310
}
@@ -315,10 +316,10 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
315316
// /
316317
// TODO: change e.unpackImage so that it takes Result[Remote] as parameter.
317318
// https://github.com/moby/buildkit/pull/4057#discussion_r1324106088
318-
return nil, nil, errors.New("exporter option \"rewrite-timestamp\" conflicts with \"unpack\"")
319+
return nil, nil, nil, errors.New("exporter option \"rewrite-timestamp\" conflicts with \"unpack\"")
319320
}
320321
if err := e.unpackImage(ctx, img, src, session.NewGroup(buildInfo.SessionID)); err != nil {
321-
return nil, nil, err
322+
return nil, nil, nil, err
322323
}
323324
}
324325

@@ -350,19 +351,13 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
350351
})
351352
}
352353
if err := eg.Wait(); err != nil {
353-
return nil, nil, err
354+
return nil, nil, nil, err
354355
}
355356
}
356357
}
358+
// Collect names for pushing in finalize
357359
if e.push {
358-
err = e.pushImage(ctx, src, buildInfo.SessionID, targetName, desc.Digest)
359-
if err != nil {
360-
var statusErr remoteserrors.ErrUnexpectedStatus
361-
if errors.As(err, &statusErr) {
362-
err = errutil.WithDetails(err)
363-
}
364-
return nil, nil, errors.Wrapf(err, "failed to push %v", targetName)
365-
}
360+
namesToPush = append(namesToPush, targetName)
366361
}
367362
}
368363
resp[exptypes.ExporterImageNameKey] = e.opts.ImageName
@@ -376,11 +371,34 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
376371

377372
dtdesc, err := json.Marshal(desc)
378373
if err != nil {
379-
return nil, nil, err
374+
return nil, nil, nil, err
380375
}
381376
resp[exptypes.ExporterImageDescriptorKey] = base64.StdEncoding.EncodeToString(dtdesc)
382377

383-
return resp, nil, nil
378+
// Create descref so descriptor is recorded in build history.
379+
// Transfer lease ownership to descref - caller releases after finalize.
380+
descref = NewDescriptorReference(*desc, done)
381+
382+
if len(namesToPush) == 0 {
383+
return resp, nil, descref, nil
384+
}
385+
386+
// Create finalize callback for pushing
387+
finalize := func(ctx context.Context) error {
388+
for _, targetName := range namesToPush {
389+
err := e.pushImage(ctx, src, buildInfo.SessionID, targetName, desc.Digest)
390+
if err != nil {
391+
var statusErr remoteserrors.ErrUnexpectedStatus
392+
if errors.As(err, &statusErr) {
393+
err = errutil.WithDetails(err)
394+
}
395+
return errors.Wrapf(err, "failed to push %v", targetName)
396+
}
397+
}
398+
return nil
399+
}
400+
401+
return resp, finalize, descref, nil
384402
}
385403

386404
func (e *imageExporterInstance) pushImage(ctx context.Context, src *exporter.Source, sessionID string, targetName string, dgst digest.Digest) error {

exporter/exporter.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,35 @@ type Exporter interface {
1818
Resolve(context.Context, int, map[string]string) (ExporterInstance, error)
1919
}
2020

21+
// FinalizeFunc completes an export operation after all exports have created
22+
// their artifacts. It may perform network operations like pushing to a registry.
23+
//
24+
// Calling FinalizeFunc is optional. If not called (e.g., due to cancellation or
25+
// an error in another operation), the export will be incomplete but no resources
26+
// will leak. FinalizeFunc performs completion work only, not cleanup.
27+
//
28+
// FinalizeFunc is safe to call concurrently with other FinalizeFunc calls.
29+
type FinalizeFunc func(ctx context.Context) error
30+
2131
type ExporterInstance interface {
2232
ID() int
2333
Name() string
2434
Config() *Config
2535
Type() string
2636
Attrs() map[string]string
27-
Export(ctx context.Context, src *Source, buildInfo ExportBuildInfo) (map[string]string, DescriptorReference, error)
37+
38+
// Export performs the export operation and optionally returns a finalize
39+
// callback. This separates work that must run sequentially from work that
40+
// can run in parallel with other exports (e.g., cache export).
41+
//
42+
// For exporters that complete all work during Export (tar, local),
43+
// return nil for the finalize callback.
44+
Export(ctx context.Context, src *Source, buildInfo ExportBuildInfo) (
45+
response map[string]string,
46+
finalize FinalizeFunc,
47+
ref DescriptorReference,
48+
err error,
49+
)
2850
}
2951

3052
type ExportBuildInfo struct {

exporter/local/export.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,36 +78,36 @@ func (e *localExporter) Config() *exporter.Config {
7878
return exporter.NewConfig()
7979
}
8080

81-
func (e *localExporterInstance) Export(ctx context.Context, inp *exporter.Source, buildInfo exporter.ExportBuildInfo) (map[string]string, exporter.DescriptorReference, error) {
81+
func (e *localExporterInstance) Export(ctx context.Context, inp *exporter.Source, buildInfo exporter.ExportBuildInfo) (map[string]string, exporter.FinalizeFunc, exporter.DescriptorReference, error) {
8282
timeoutCtx, cancel := context.WithCancelCause(ctx)
8383
timeoutCtx, _ = context.WithTimeoutCause(timeoutCtx, 5*time.Second, errors.WithStack(context.DeadlineExceeded)) //nolint:govet
8484
defer func() { cancel(errors.WithStack(context.Canceled)) }()
8585

8686
if e.opts.Epoch == nil {
8787
if tm, ok, err := epoch.ParseSource(inp); err != nil {
88-
return nil, nil, err
88+
return nil, nil, nil, err
8989
} else if ok {
9090
e.opts.Epoch = tm
9191
}
9292
}
9393

9494
caller, err := e.opt.SessionManager.Get(timeoutCtx, buildInfo.SessionID, false)
9595
if err != nil {
96-
return nil, nil, err
96+
return nil, nil, nil, err
9797
}
9898

9999
isMap := len(inp.Refs) > 0
100100

101101
if _, ok := inp.Metadata[exptypes.ExporterPlatformsKey]; isMap && !ok {
102-
return nil, nil, errors.Errorf("unable to export multiple refs, missing platforms mapping")
102+
return nil, nil, nil, errors.Errorf("unable to export multiple refs, missing platforms mapping")
103103
}
104104
p, err := exptypes.ParsePlatforms(inp.Metadata)
105105
if err != nil {
106-
return nil, nil, err
106+
return nil, nil, nil, err
107107
}
108108

109109
if !isMap && len(p.Platforms) > 1 {
110-
return nil, nil, errors.Errorf("unable to export multiple platforms without map")
110+
return nil, nil, nil, errors.Errorf("unable to export multiple platforms without map")
111111
}
112112

113113
now := time.Now().Truncate(time.Second)
@@ -175,7 +175,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp *exporter.Source
175175
for _, p := range p.Platforms {
176176
r, ok := inp.FindRef(p.ID)
177177
if !ok {
178-
return nil, nil, errors.Errorf("failed to find ref for ID %s", p.ID)
178+
return nil, nil, nil, errors.Errorf("failed to find ref for ID %s", p.ID)
179179
}
180180
eg.Go(export(ctx, p.ID, r, inp.Attestations[p.ID]))
181181
}
@@ -184,9 +184,9 @@ func (e *localExporterInstance) Export(ctx context.Context, inp *exporter.Source
184184
}
185185

186186
if err := eg.Wait(); err != nil {
187-
return nil, nil, err
187+
return nil, nil, nil, err
188188
}
189-
return nil, nil, nil
189+
return nil, nil, nil, nil
190190
}
191191

192192
func NewProgressHandler(ctx context.Context, id string) func(int, bool) {

exporter/oci/export.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,9 @@ func (e *imageExporterInstance) Config() *exporter.Config {
131131
return exporter.NewConfigWithCompression(e.opts.RefCfg.Compression)
132132
}
133133

134-
func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source, buildInfo exporter.ExportBuildInfo) (_ map[string]string, descref exporter.DescriptorReference, err error) {
134+
func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source, buildInfo exporter.ExportBuildInfo) (_ map[string]string, _ exporter.FinalizeFunc, descref exporter.DescriptorReference, err error) {
135135
if e.opt.Variant == VariantDocker && len(src.Refs) > 0 {
136-
return nil, nil, errors.Errorf("docker exporter does not currently support exporting manifest lists")
136+
return nil, nil, nil, errors.Errorf("docker exporter does not currently support exporting manifest lists")
137137
}
138138

139139
src = src.Clone()
@@ -145,13 +145,13 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
145145
opts := e.opts
146146
as, _, err := containerimage.ParseAnnotations(src.Metadata)
147147
if err != nil {
148-
return nil, nil, err
148+
return nil, nil, nil, err
149149
}
150150
opts.Annotations = opts.Annotations.Merge(as)
151151

152152
ctx, done, err := leaseutil.WithLease(ctx, e.opt.LeaseManager, leaseutil.MakeTemporary)
153153
if err != nil {
154-
return nil, nil, err
154+
return nil, nil, nil, err
155155
}
156156
defer func() {
157157
if descref == nil {
@@ -161,7 +161,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
161161

162162
desc, err := e.opt.ImageWriter.Commit(ctx, src, buildInfo.SessionID, buildInfo.InlineCache, &opts)
163163
if err != nil {
164-
return nil, nil, err
164+
return nil, nil, nil, err
165165
}
166166
defer func() {
167167
if err == nil {
@@ -190,7 +190,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
190190

191191
dtdesc, err := json.Marshal(desc)
192192
if err != nil {
193-
return nil, nil, err
193+
return nil, nil, nil, err
194194
}
195195
resp[exptypes.ExporterImageDescriptorKey] = base64.StdEncoding.EncodeToString(dtdesc)
196196

@@ -200,7 +200,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
200200

201201
names, err := normalizedNames(e.opts.ImageName)
202202
if err != nil {
203-
return nil, nil, err
203+
return nil, nil, nil, err
204204
}
205205

206206
if len(names) != 0 {
@@ -213,7 +213,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
213213
expOpts = append(expOpts, archiveexporter.WithAllPlatforms(), archiveexporter.WithSkipDockerManifest())
214214
case VariantDocker:
215215
default:
216-
return nil, nil, errors.Errorf("invalid variant %q", e.opt.Variant)
216+
return nil, nil, nil, errors.Errorf("invalid variant %q", e.opt.Variant)
217217
}
218218

219219
timeoutCtx, cancel := context.WithCancelCause(ctx)
@@ -222,7 +222,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
222222

223223
caller, err := e.opt.SessionManager.Get(timeoutCtx, buildInfo.SessionID, false)
224224
if err != nil {
225-
return nil, nil, err
225+
return nil, nil, nil, err
226226
}
227227

228228
var refs []cache.ImmutableRef
@@ -256,43 +256,43 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source
256256
})
257257
}
258258
if err := eg.Wait(); err != nil {
259-
return nil, nil, err
259+
return nil, nil, nil, err
260260
}
261261

262262
if e.tar {
263263
w, err := filesync.CopyFileWriter(ctx, resp, e.id, caller)
264264
if err != nil {
265-
return nil, nil, err
265+
return nil, nil, nil, err
266266
}
267267

268268
report := progress.OneOff(ctx, "sending tarball")
269269
if err := archiveexporter.Export(ctx, mprovider, w, expOpts...); err != nil {
270270
w.Close()
271271
if grpcerrors.Code(err) == codes.AlreadyExists {
272-
return resp, nil, report(nil)
272+
return resp, nil, nil, report(nil)
273273
}
274-
return nil, nil, report(err)
274+
return nil, nil, nil, report(err)
275275
}
276276
err = w.Close()
277277
if grpcerrors.Code(err) == codes.AlreadyExists {
278-
return resp, nil, report(nil)
278+
return resp, nil, nil, report(nil)
279279
}
280280
if err != nil {
281-
return nil, nil, report(err)
281+
return nil, nil, nil, report(err)
282282
}
283283
report(nil)
284284
} else {
285285
store := sessioncontent.NewCallerStore(caller, "export")
286286
if err != nil {
287-
return nil, nil, err
287+
return nil, nil, nil, err
288288
}
289289
err := contentutil.CopyChain(ctx, store, mprovider, *desc)
290290
if err != nil {
291-
return nil, nil, err
291+
return nil, nil, nil, err
292292
}
293293
}
294294

295-
return resp, nil, nil
295+
return resp, nil, nil, nil
296296
}
297297

298298
func normalizedNames(name string) ([]string, error) {

0 commit comments

Comments
 (0)