Skip to content

Commit 21840d2

Browse files
committed
exporter: fix local mode=delete for multi-platform outputs
Signed-off-by: CrazyMax <1951866+crazy-max@users.noreply.github.com>
1 parent f6d1e97 commit 21840d2

File tree

2 files changed

+209
-9
lines changed

2 files changed

+209
-9
lines changed

exporter/local/export.go

Lines changed: 158 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package local
22

33
import (
44
"context"
5+
"io"
6+
"io/fs"
57
"os"
68
"strings"
79
"sync"
@@ -50,6 +52,10 @@ func (e *localExporter) Resolve(ctx context.Context, id int, opt map[string]stri
5052
return i, nil
5153
}
5254

55+
func (e *localExporter) Config() *exporter.Config {
56+
return exporter.NewConfig()
57+
}
58+
5359
type localExporterInstance struct {
5460
*localExporter
5561
id int
@@ -74,10 +80,6 @@ func (e *localExporterInstance) Attrs() map[string]string {
7480
return e.attrs
7581
}
7682

77-
func (e *localExporter) Config() *exporter.Config {
78-
return exporter.NewConfig()
79-
}
80-
8183
func (e *localExporterInstance) Export(ctx context.Context, inp *exporter.Source, buildInfo exporter.ExportBuildInfo) (map[string]string, exporter.FinalizeFunc, exporter.DescriptorReference, error) {
8284
timeoutCtx, cancel := context.WithCancelCause(ctx)
8385
timeoutCtx, _ = context.WithTimeoutCause(timeoutCtx, 5*time.Second, errors.WithStack(context.DeadlineExceeded)) //nolint:govet
@@ -115,6 +117,21 @@ func (e *localExporterInstance) Export(ctx context.Context, inp *exporter.Source
115117
visitedPath := map[string]string{}
116118
var visitedMu sync.Mutex
117119

120+
if e.opts.Mode == filesync.FSSyncDirModeDelete {
121+
outputFS, cleanup, err := e.buildDeleteModeFS(ctx, inp, buildInfo, p, isMap, now)
122+
if err != nil {
123+
return nil, nil, nil, err
124+
}
125+
if cleanup != nil {
126+
defer cleanup()
127+
}
128+
progress := NewProgressHandler(ctx, "copying files")
129+
if err := filesync.CopyToCaller(ctx, outputFS, e.id, caller, progress); err != nil {
130+
return nil, nil, nil, err
131+
}
132+
return nil, nil, nil, nil
133+
}
134+
118135
export := func(ctx context.Context, k string, ref cache.ImmutableRef, attestations []exporter.Attestation, opt CreateFSOpts) func() error {
119136
return func() error {
120137
outputFS, cleanup, err := CreateFS(ctx, buildInfo.SessionID, k, ref, attestations, now, isMap, opt)
@@ -197,6 +214,143 @@ func (e *localExporterInstance) Export(ctx context.Context, inp *exporter.Source
197214
return nil, nil, nil, nil
198215
}
199216

217+
func (e *localExporterInstance) buildDeleteModeFS(ctx context.Context, inp *exporter.Source, buildInfo exporter.ExportBuildInfo, p exptypes.Platforms, isMap bool, now time.Time) (_ fsutil.FS, cleanup func() error, err error) {
218+
root, err := os.MkdirTemp("", "buildkit-local-export-")
219+
if err != nil {
220+
return nil, nil, err
221+
}
222+
223+
cleanup = func() error {
224+
return os.RemoveAll(root)
225+
}
226+
defer func() {
227+
if err != nil {
228+
cleanup()
229+
}
230+
}()
231+
232+
visitedPath := map[string]string{}
233+
234+
export := func(k string, ref cache.ImmutableRef, attestations []exporter.Attestation, opt CreateFSOpts) error {
235+
outputFS, outputCleanup, err := CreateFS(ctx, buildInfo.SessionID, k, ref, attestations, now, isMap, opt)
236+
if err != nil {
237+
return err
238+
}
239+
if outputCleanup != nil {
240+
defer outputCleanup()
241+
}
242+
243+
if e.opts.UsePlatformSplit(isMap) {
244+
st := &fstypes.Stat{
245+
Mode: uint32(os.ModeDir | 0755),
246+
Path: strings.ReplaceAll(k, "/", "_"),
247+
}
248+
if opt.Epoch != nil && opt.Epoch.Value != nil {
249+
st.ModTime = opt.Epoch.Value.UnixNano()
250+
}
251+
outputFS, err = fsutil.SubDirFS([]fsutil.Dir{{FS: outputFS, Stat: st}})
252+
if err != nil {
253+
return err
254+
}
255+
} else {
256+
err = fsWalk(ctx, outputFS, "", func(p string, entry os.DirEntry, err error) error {
257+
if entry.IsDir() {
258+
return nil
259+
}
260+
if err != nil && !errors.Is(err, os.ErrNotExist) {
261+
return err
262+
}
263+
if vp, ok := visitedPath[p]; ok {
264+
return errors.Errorf("cannot overwrite %s from %s with %s when split option is disabled", p, vp, k)
265+
}
266+
visitedPath[p] = k
267+
return nil
268+
})
269+
if err != nil {
270+
return err
271+
}
272+
}
273+
274+
if err := writeFS(ctx, outputFS, root); err != nil {
275+
return err
276+
}
277+
return nil
278+
}
279+
280+
if len(p.Platforms) > 0 {
281+
for _, p := range p.Platforms {
282+
r, ok := inp.FindRef(p.ID)
283+
if !ok {
284+
return nil, nil, errors.Errorf("failed to find ref for ID %s", p.ID)
285+
}
286+
opt := e.opts
287+
if e.opts.Epoch == nil {
288+
tm, err := epoch.ParseSource(inp, &p)
289+
if err != nil {
290+
return nil, nil, err
291+
}
292+
opt.Epoch = &epoch.Epoch{Value: tm}
293+
}
294+
if err := export(p.ID, r, inp.Attestations[p.ID], opt); err != nil {
295+
return nil, nil, err
296+
}
297+
}
298+
} else {
299+
if err := export("", inp.Ref, nil, e.opts); err != nil {
300+
return nil, nil, err
301+
}
302+
}
303+
304+
outputFS, err := fsutil.NewFS(root)
305+
if err != nil {
306+
return nil, nil, err
307+
}
308+
return outputFS, cleanup, nil
309+
}
310+
311+
func writeFS(ctx context.Context, src fsutil.FS, dest string) error {
312+
if err := os.MkdirAll(dest, 0700); err != nil {
313+
return errors.Wrapf(err, "failed to create export staging dir %s", dest)
314+
}
315+
316+
dw, err := fsutil.NewDiskWriter(ctx, dest, fsutil.DiskWriterOpt{
317+
AsyncDataCb: func(ctx context.Context, p string, wc io.WriteCloser) (retErr error) {
318+
defer func() {
319+
if err := wc.Close(); retErr == nil {
320+
retErr = err
321+
}
322+
}()
323+
324+
r, err := src.Open(p)
325+
if err != nil {
326+
return err
327+
}
328+
defer r.Close()
329+
330+
_, err = io.Copy(wc, r)
331+
return err
332+
},
333+
})
334+
if err != nil {
335+
return err
336+
}
337+
338+
if err := src.Walk(ctx, "", func(p string, entry fs.DirEntry, err error) error {
339+
if err != nil {
340+
return err
341+
}
342+
info, err := entry.Info()
343+
if err != nil {
344+
return err
345+
}
346+
return dw.HandleChange(fsutil.ChangeKindAdd, p, info, nil)
347+
}); err != nil {
348+
return err
349+
}
350+
351+
return dw.Wait(ctx)
352+
}
353+
200354
func NewProgressHandler(ctx context.Context, id string) func(int, bool) {
201355
limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
202356
pw, _, _ := progress.NewFromContext(ctx)

session/filesync/diffcopy.go

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import (
55
"context"
66
io "io"
77
"os"
8+
"path/filepath"
89
"time"
910

11+
continuityfs "github.com/containerd/continuity/fs"
1012
"github.com/moby/buildkit/util/bklog"
11-
1213
"github.com/pkg/errors"
1314
"github.com/tonistiigi/fsutil"
15+
fscopy "github.com/tonistiigi/fsutil/copy"
1416
fstypes "github.com/tonistiigi/fsutil/types"
1517
"google.golang.org/grpc"
1618
)
@@ -112,21 +114,22 @@ func recvDiffCopy(ds grpc.ClientStream, dest string, cu CacheUpdater, progress p
112114
}
113115

114116
func syncTargetDiffCopy(ds grpc.ServerStream, dest string, mode FSSyncDirMode) error {
115-
var merge bool
116117
switch mode {
117118
case "", FSSyncDirModeCopy:
118-
merge = true
119+
return syncTargetDiffCopyCopy(ds, dest)
119120
case FSSyncDirModeDelete:
120-
merge = false
121+
return syncTargetDiffCopyDelete(ds, dest)
121122
default:
122123
return errors.Errorf("invalid local exporter mode %q", mode)
123124
}
125+
}
124126

127+
func syncTargetDiffCopyCopy(ds grpc.ServerStream, dest string) error {
125128
if err := os.MkdirAll(dest, 0700); err != nil {
126129
return errors.Wrapf(err, "failed to create synctarget dest dir %s", dest)
127130
}
128131
return errors.WithStack(fsutil.Receive(ds.Context(), ds, dest, fsutil.ReceiveOpt{
129-
Merge: merge,
132+
Merge: true,
130133
Filter: func() func(string, *fstypes.Stat) bool {
131134
uid := os.Getuid()
132135
gid := os.Getgid()
@@ -139,6 +142,49 @@ func syncTargetDiffCopy(ds grpc.ServerStream, dest string, mode FSSyncDirMode) e
139142
}))
140143
}
141144

145+
func syncTargetDiffCopyDelete(ds grpc.ServerStream, dest string) error {
146+
if err := os.MkdirAll(filepath.Dir(dest), 0700); err != nil {
147+
return errors.Wrapf(err, "failed to create synctarget parent dir %s", filepath.Dir(dest))
148+
}
149+
150+
stageDir, err := os.MkdirTemp(filepath.Dir(dest), ".buildkit-local-export-")
151+
if err != nil {
152+
return errors.Wrap(err, "failed to create synctarget staging dir")
153+
}
154+
defer os.RemoveAll(stageDir)
155+
156+
if err := syncTargetDiffCopyCopy(ds, stageDir); err != nil {
157+
return err
158+
}
159+
160+
if err := os.MkdirAll(dest, 0700); err != nil {
161+
return errors.Wrapf(err, "failed to create synctarget dest dir %s", dest)
162+
}
163+
164+
if err = continuityfs.Changes(ds.Context(), dest, stageDir, func(kind continuityfs.ChangeKind, p string, _ os.FileInfo, err error) error {
165+
if err != nil {
166+
return err
167+
}
168+
if kind != continuityfs.ChangeKindDelete {
169+
return nil
170+
}
171+
if len(p) > 0 {
172+
p = p[1:]
173+
}
174+
if p == "" {
175+
return nil
176+
}
177+
return errors.WithStack(os.RemoveAll(filepath.Join(dest, p)))
178+
}); err != nil {
179+
return errors.WithStack(err)
180+
}
181+
182+
return errors.WithStack(fscopy.Copy(ds.Context(), stageDir, ".", dest, ".", fscopy.WithCopyInfo(fscopy.CopyInfo{
183+
CopyDirContents: true,
184+
AlwaysReplaceExistingDestPaths: true,
185+
})))
186+
}
187+
142188
func writeTargetFile(ds grpc.ServerStream, wc io.WriteCloser) error {
143189
var bm BytesMessage
144190
for {

0 commit comments

Comments
 (0)