Skip to content

Commit f65fd02

Browse files
authored
watch: add tar sync implementation (docker#10853)
Brought to you by Tilt ❤️ Signed-off-by: Milas Bowman <[email protected]>
1 parent cf8dc46 commit f65fd02

File tree

4 files changed

+465
-24
lines changed

4 files changed

+465
-24
lines changed

internal/sync/tar.go

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
/*
2+
Copyright 2018 The Tilt Dev Authors
3+
Copyright 2023 Docker Compose CLI authors
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package sync
19+
20+
import (
21+
"archive/tar"
22+
"bytes"
23+
"context"
24+
"fmt"
25+
"io"
26+
"io/fs"
27+
"os"
28+
"path"
29+
"path/filepath"
30+
"strings"
31+
32+
"github.com/hashicorp/go-multierror"
33+
"github.com/pkg/errors"
34+
35+
"github.com/compose-spec/compose-go/types"
36+
moby "github.com/docker/docker/api/types"
37+
"github.com/docker/docker/pkg/archive"
38+
)
39+
40+
type archiveEntry struct {
41+
path string
42+
info os.FileInfo
43+
header *tar.Header
44+
}
45+
46+
type LowLevelClient interface {
47+
ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error)
48+
49+
Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error
50+
}
51+
52+
type Tar struct {
53+
client LowLevelClient
54+
55+
projectName string
56+
}
57+
58+
var _ Syncer = &Tar{}
59+
60+
func NewTar(projectName string, client LowLevelClient) *Tar {
61+
return &Tar{
62+
projectName: projectName,
63+
client: client,
64+
}
65+
}
66+
67+
func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error {
68+
containers, err := t.client.ContainersForService(ctx, t.projectName, service.Name)
69+
if err != nil {
70+
return err
71+
}
72+
73+
var pathsToCopy []PathMapping
74+
var pathsToDelete []string
75+
for _, p := range paths {
76+
if _, err := os.Stat(p.HostPath); err != nil && errors.Is(err, fs.ErrNotExist) {
77+
pathsToDelete = append(pathsToDelete, p.ContainerPath)
78+
} else {
79+
pathsToCopy = append(pathsToCopy, p)
80+
}
81+
}
82+
83+
// TODO: this can't be read from multiple times
84+
tarReader := tarArchive(pathsToCopy)
85+
86+
var deleteCmd []string
87+
if len(pathsToDelete) != 0 {
88+
deleteCmd = append([]string{"rm", "-rf"}, pathsToDelete...)
89+
}
90+
copyCmd := []string{"tar", "-v", "-C", "/", "-x", "-f", "-"}
91+
92+
var eg multierror.Group
93+
for i := range containers {
94+
containerID := containers[i].ID
95+
eg.Go(func() error {
96+
if len(deleteCmd) != 0 {
97+
if err := t.client.Exec(ctx, containerID, deleteCmd, nil); err != nil {
98+
return fmt.Errorf("deleting paths in %s: %w", containerID, err)
99+
}
100+
}
101+
if err := t.client.Exec(ctx, containerID, copyCmd, tarReader); err != nil {
102+
return fmt.Errorf("copying files to %s: %w", containerID, err)
103+
}
104+
return nil
105+
})
106+
}
107+
return eg.Wait().ErrorOrNil()
108+
}
109+
110+
type ArchiveBuilder struct {
111+
tw *tar.Writer
112+
paths []string // local paths archived
113+
114+
// A shared I/O buffer to help with file copying.
115+
copyBuf *bytes.Buffer
116+
}
117+
118+
func NewArchiveBuilder(writer io.Writer) *ArchiveBuilder {
119+
tw := tar.NewWriter(writer)
120+
return &ArchiveBuilder{
121+
tw: tw,
122+
copyBuf: &bytes.Buffer{},
123+
}
124+
}
125+
126+
func (a *ArchiveBuilder) Close() error {
127+
return a.tw.Close()
128+
}
129+
130+
// ArchivePathsIfExist creates a tar archive of all local files in `paths`. It quietly skips any paths that don't exist.
131+
func (a *ArchiveBuilder) ArchivePathsIfExist(paths []PathMapping) error {
132+
// In order to handle overlapping syncs, we
133+
// 1) collect all the entries,
134+
// 2) de-dupe them, with last-one-wins semantics
135+
// 3) write all the entries
136+
//
137+
// It's not obvious that this is the correct behavior. A better approach
138+
// (that's more in-line with how syncs work) might ignore files in earlier
139+
// path mappings when we know they're going to be "synced" over.
140+
// There's a bunch of subtle product decisions about how overlapping path
141+
// mappings work that we're not sure about.
142+
var entries []archiveEntry
143+
for _, p := range paths {
144+
newEntries, err := a.entriesForPath(p.HostPath, p.ContainerPath)
145+
if err != nil {
146+
return fmt.Errorf("inspecting %q: %w", p.HostPath, err)
147+
}
148+
149+
entries = append(entries, newEntries...)
150+
}
151+
152+
entries = dedupeEntries(entries)
153+
for _, entry := range entries {
154+
err := a.writeEntry(entry)
155+
if err != nil {
156+
return fmt.Errorf("archiving %q: %w", entry.path, err)
157+
}
158+
a.paths = append(a.paths, entry.path)
159+
}
160+
return nil
161+
}
162+
163+
func (a *ArchiveBuilder) writeEntry(entry archiveEntry) error {
164+
pathInTar := entry.path
165+
header := entry.header
166+
167+
if header.Typeflag != tar.TypeReg {
168+
// anything other than a regular file (e.g. dir, symlink) just needs the header
169+
if err := a.tw.WriteHeader(header); err != nil {
170+
return fmt.Errorf("writing %q header: %w", pathInTar, err)
171+
}
172+
return nil
173+
}
174+
175+
file, err := os.Open(pathInTar)
176+
if err != nil {
177+
// In case the file has been deleted since we last looked at it.
178+
if os.IsNotExist(err) {
179+
return nil
180+
}
181+
return err
182+
}
183+
184+
defer func() {
185+
_ = file.Close()
186+
}()
187+
188+
// The size header must match the number of contents bytes.
189+
//
190+
// There is room for a race condition here if something writes to the file
191+
// after we've read the file size.
192+
//
193+
// For small files, we avoid this by first copying the file into a buffer,
194+
// and using the size of the buffer to populate the header.
195+
//
196+
// For larger files, we don't want to copy the whole thing into a buffer,
197+
// because that would blow up heap size. There is some danger that this
198+
// will lead to a spurious error when the tar writer validates the sizes.
199+
// That error will be disruptive but will be handled as best as we
200+
// can downstream.
201+
useBuf := header.Size < 5000000
202+
if useBuf {
203+
a.copyBuf.Reset()
204+
_, err = io.Copy(a.copyBuf, file)
205+
if err != nil && err != io.EOF {
206+
return fmt.Errorf("copying %q: %w", pathInTar, err)
207+
}
208+
header.Size = int64(len(a.copyBuf.Bytes()))
209+
}
210+
211+
// wait to write the header until _after_ the file is successfully opened
212+
// to avoid generating an invalid tar entry that has a header but no contents
213+
// in the case the file has been deleted
214+
err = a.tw.WriteHeader(header)
215+
if err != nil {
216+
return fmt.Errorf("writing %q header: %w", pathInTar, err)
217+
}
218+
219+
if useBuf {
220+
_, err = io.Copy(a.tw, a.copyBuf)
221+
} else {
222+
_, err = io.Copy(a.tw, file)
223+
}
224+
225+
if err != nil && err != io.EOF {
226+
return fmt.Errorf("copying %q: %w", pathInTar, err)
227+
}
228+
229+
// explicitly flush so that if the entry is invalid we will detect it now and
230+
// provide a more meaningful error
231+
if err := a.tw.Flush(); err != nil {
232+
return fmt.Errorf("finalizing %q: %w", pathInTar, err)
233+
}
234+
return nil
235+
}
236+
237+
// tarPath writes the given source path into tarWriter at the given dest (recursively for directories).
238+
// e.g. tarring my_dir --> dest d: d/file_a, d/file_b
239+
// If source path does not exist, quietly skips it and returns no err
240+
func (a *ArchiveBuilder) entriesForPath(localPath, containerPath string) ([]archiveEntry, error) {
241+
localInfo, err := os.Stat(localPath)
242+
if err != nil {
243+
if os.IsNotExist(err) {
244+
return nil, nil
245+
}
246+
return nil, err
247+
}
248+
249+
localPathIsDir := localInfo.IsDir()
250+
if localPathIsDir {
251+
// Make sure we can trim this off filenames to get valid relative filepaths
252+
if !strings.HasSuffix(localPath, string(filepath.Separator)) {
253+
localPath += string(filepath.Separator)
254+
}
255+
}
256+
257+
containerPath = strings.TrimPrefix(containerPath, "/")
258+
259+
result := make([]archiveEntry, 0)
260+
err = filepath.Walk(localPath, func(curLocalPath string, info os.FileInfo, err error) error {
261+
if err != nil {
262+
return fmt.Errorf("walking %q: %w", curLocalPath, err)
263+
}
264+
265+
linkname := ""
266+
if info.Mode()&os.ModeSymlink != 0 {
267+
var err error
268+
linkname, err = os.Readlink(curLocalPath)
269+
if err != nil {
270+
return err
271+
}
272+
}
273+
274+
var name string
275+
//nolint:gocritic
276+
if localPathIsDir {
277+
// Name of file in tar should be relative to source directory...
278+
tmp, err := filepath.Rel(localPath, curLocalPath)
279+
if err != nil {
280+
return fmt.Errorf("making %q relative to %q: %w", curLocalPath, localPath, err)
281+
}
282+
// ...and live inside `dest`
283+
name = path.Join(containerPath, filepath.ToSlash(tmp))
284+
} else if strings.HasSuffix(containerPath, "/") {
285+
name = containerPath + filepath.Base(curLocalPath)
286+
} else {
287+
name = containerPath
288+
}
289+
290+
header, err := archive.FileInfoHeader(name, info, linkname)
291+
if err != nil {
292+
// Not all types of files are allowed in a tarball. That's OK.
293+
// Mimic the Docker behavior and just skip the file.
294+
return nil
295+
}
296+
297+
result = append(result, archiveEntry{
298+
path: curLocalPath,
299+
info: info,
300+
header: header,
301+
})
302+
303+
return nil
304+
})
305+
if err != nil {
306+
return nil, err
307+
}
308+
return result, nil
309+
}
310+
311+
func tarArchive(ops []PathMapping) io.ReadCloser {
312+
pr, pw := io.Pipe()
313+
go func() {
314+
ab := NewArchiveBuilder(pw)
315+
err := ab.ArchivePathsIfExist(ops)
316+
if err != nil {
317+
_ = pw.CloseWithError(fmt.Errorf("adding files to tar: %w", err))
318+
} else {
319+
// propagate errors from the TarWriter::Close() because it performs a final
320+
// Flush() and any errors mean the tar is invalid
321+
if err := ab.Close(); err != nil {
322+
_ = pw.CloseWithError(fmt.Errorf("closing tar: %w", err))
323+
} else {
324+
_ = pw.Close()
325+
}
326+
}
327+
}()
328+
return pr
329+
}
330+
331+
// Dedupe the entries with last-entry-wins semantics.
332+
func dedupeEntries(entries []archiveEntry) []archiveEntry {
333+
seenIndex := make(map[string]int, len(entries))
334+
result := make([]archiveEntry, 0, len(entries))
335+
for i, entry := range entries {
336+
seenIndex[entry.header.Name] = i
337+
}
338+
for i, entry := range entries {
339+
if seenIndex[entry.header.Name] == i {
340+
result = append(result, entry)
341+
}
342+
}
343+
return result
344+
}

0 commit comments

Comments
 (0)