Skip to content

Commit efd44de

Browse files
authored
watch: support multiple containers for tar implementation (docker#10860)
Support services with scale > 1 for the tar watch sync. Add a "lossy" multi-writer specific to pipes that writes the tar data to each `io.PipeWriter`, which is connected to `stdin` for the `tar` process being exec'd in the container. The data is written serially to each writer. This could be adjusted to do concurrent writes but that will rapidly increase the I/O load, so is not done here - in general, 99% of the time you'll be developing (and thus using watch/sync) with a single replica of a service. If a write fails, the corresponding `io.PipeWriter` is removed from the active set and closed with an error. This means that a single container copy failing won't stop writes to the others that are succeeding. Of course, they will be in an inconsistent state afterwards still, but that's a different problem. Signed-off-by: Milas Bowman <[email protected]>
1 parent bdb3f91 commit efd44de

File tree

3 files changed

+260
-4
lines changed

3 files changed

+260
-4
lines changed

internal/sync/tar.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,30 +80,43 @@ func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []Pat
8080
}
8181
}
8282

83-
// TODO: this can't be read from multiple times
84-
tarReader := tarArchive(pathsToCopy)
85-
8683
var deleteCmd []string
8784
if len(pathsToDelete) != 0 {
8885
deleteCmd = append([]string{"rm", "-rf"}, pathsToDelete...)
8986
}
9087
copyCmd := []string{"tar", "-v", "-C", "/", "-x", "-f", "-"}
9188

9289
var eg multierror.Group
90+
writers := make([]*io.PipeWriter, len(containers))
9391
for i := range containers {
9492
containerID := containers[i].ID
93+
r, w := io.Pipe()
94+
writers[i] = w
9595
eg.Go(func() error {
9696
if len(deleteCmd) != 0 {
9797
if err := t.client.Exec(ctx, containerID, deleteCmd, nil); err != nil {
9898
return fmt.Errorf("deleting paths in %s: %w", containerID, err)
9999
}
100100
}
101-
if err := t.client.Exec(ctx, containerID, copyCmd, tarReader); err != nil {
101+
if err := t.client.Exec(ctx, containerID, copyCmd, r); err != nil {
102102
return fmt.Errorf("copying files to %s: %w", containerID, err)
103103
}
104104
return nil
105105
})
106106
}
107+
108+
multiWriter := newLossyMultiWriter(writers...)
109+
tarReader := tarArchive(pathsToCopy)
110+
defer func() {
111+
_ = tarReader.Close()
112+
multiWriter.Close()
113+
}()
114+
_, err = io.Copy(multiWriter, tarReader)
115+
if err != nil {
116+
return err
117+
}
118+
multiWriter.Close()
119+
107120
return eg.Wait().ErrorOrNil()
108121
}
109122

internal/sync/writer.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
Copyright 2023 Docker Compose CLI authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package sync
18+
19+
import (
20+
"errors"
21+
"io"
22+
)
23+
24+
// lossyMultiWriter attempts to tee all writes to the provided io.PipeWriter
25+
// instances.
26+
//
27+
// If a writer fails during a Write call, the write-side of the pipe is then
28+
// closed with the error and no subsequent attempts are made to write to the
29+
// pipe.
30+
//
31+
// If all writers fail during a write, an error is returned.
32+
//
33+
// On Close, any remaining writers are closed.
34+
type lossyMultiWriter struct {
35+
writers []*io.PipeWriter
36+
}
37+
38+
// newLossyMultiWriter creates a new writer that *attempts* to tee all data written to it to the provided io.PipeWriter
39+
// instances. Rather than failing a write operation if any writer fails, writes only fail if there are no more valid
40+
// writers. Otherwise, errors for specific writers are propagated via CloseWithError.
41+
func newLossyMultiWriter(writers ...*io.PipeWriter) *lossyMultiWriter {
42+
// reverse the writers because during the write we iterate
43+
// backwards, so this way we'll end up writing in the same
44+
// order as the writers were passed to us
45+
writers = append([]*io.PipeWriter(nil), writers...)
46+
for i, j := 0, len(writers)-1; i < j; i, j = i+1, j-1 {
47+
writers[i], writers[j] = writers[j], writers[i]
48+
}
49+
50+
return &lossyMultiWriter{
51+
writers: writers,
52+
}
53+
}
54+
55+
// Write writes to each writer that is still active (i.e. has not failed/encountered an error on write).
56+
//
57+
// If a writer encounters an error during the write, the write side of the pipe is closed with the error
58+
// and no subsequent attempts will be made to write to that writer.
59+
//
60+
// An error is only returned from this function if ALL writers have failed.
61+
func (l *lossyMultiWriter) Write(p []byte) (int, error) {
62+
// NOTE: this function iterates backwards so that it can
63+
// safely remove elements during the loop
64+
for i := len(l.writers) - 1; i >= 0; i-- {
65+
written, err := l.writers[i].Write(p)
66+
if err == nil && written != len(p) {
67+
err = io.ErrShortWrite
68+
}
69+
if err != nil {
70+
// pipe writer close cannot fail
71+
_ = l.writers[i].CloseWithError(err)
72+
l.writers = append(l.writers[:i], l.writers[i+1:]...)
73+
}
74+
}
75+
76+
if len(l.writers) == 0 {
77+
return 0, errors.New("no writers remaining")
78+
}
79+
80+
return len(p), nil
81+
}
82+
83+
// Close closes any still open (non-failed) writers.
84+
//
85+
// Failed writers have already been closed with an error.
86+
func (l *lossyMultiWriter) Close() {
87+
for i := range l.writers {
88+
// pipe writer close cannot fail
89+
_ = l.writers[i].Close()
90+
}
91+
}

internal/sync/writer_test.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
Copyright 2023 Docker Compose CLI authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package sync
18+
19+
import (
20+
"context"
21+
"io"
22+
"sync"
23+
"testing"
24+
"time"
25+
26+
"github.com/pkg/errors"
27+
"github.com/stretchr/testify/require"
28+
)
29+
30+
func TestLossyMultiWriter(t *testing.T) {
31+
ctx, cancel := context.WithCancel(context.Background())
32+
t.Cleanup(cancel)
33+
34+
const count = 5
35+
readers := make([]*bufReader, count)
36+
writers := make([]*io.PipeWriter, count)
37+
for i := 0; i < count; i++ {
38+
r, w := io.Pipe()
39+
readers[i] = newBufReader(ctx, r)
40+
writers[i] = w
41+
}
42+
43+
w := newLossyMultiWriter(writers...)
44+
t.Cleanup(w.Close)
45+
n, err := w.Write([]byte("hello world"))
46+
require.Equal(t, 11, n)
47+
require.NoError(t, err)
48+
for i := range readers {
49+
readers[i].waitForWrite(t)
50+
require.Equal(t, "hello world", string(readers[i].contents()))
51+
readers[i].reset()
52+
}
53+
54+
// even if a writer fails (in this case simulated by closing the receiving end of the pipe),
55+
// write operations should continue to return nil error but the writer should be closed
56+
// with an error
57+
const failIndex = 3
58+
require.NoError(t, readers[failIndex].r.CloseWithError(errors.New("oh no")))
59+
n, err = w.Write([]byte("hello"))
60+
require.Equal(t, 5, n)
61+
require.NoError(t, err)
62+
for i := range readers {
63+
readers[i].waitForWrite(t)
64+
if i == failIndex {
65+
err := readers[i].error()
66+
require.EqualError(t, err, "io: read/write on closed pipe")
67+
require.Empty(t, readers[i].contents())
68+
} else {
69+
require.Equal(t, "hello", string(readers[i].contents()))
70+
}
71+
}
72+
73+
// perform another write, verify there's still no errors
74+
n, err = w.Write([]byte(" world"))
75+
require.Equal(t, 6, n)
76+
require.NoError(t, err)
77+
}
78+
79+
type bufReader struct {
80+
ctx context.Context
81+
r *io.PipeReader
82+
mu sync.Mutex
83+
err error
84+
data []byte
85+
writeSync chan struct{}
86+
}
87+
88+
func newBufReader(ctx context.Context, r *io.PipeReader) *bufReader {
89+
b := &bufReader{
90+
ctx: ctx,
91+
r: r,
92+
writeSync: make(chan struct{}),
93+
}
94+
go b.consume()
95+
return b
96+
}
97+
98+
func (b *bufReader) waitForWrite(t testing.TB) {
99+
t.Helper()
100+
select {
101+
case <-b.writeSync:
102+
return
103+
case <-time.After(50 * time.Millisecond):
104+
t.Fatal("timed out waiting for write")
105+
}
106+
}
107+
108+
func (b *bufReader) consume() {
109+
defer close(b.writeSync)
110+
for {
111+
buf := make([]byte, 512)
112+
n, err := b.r.Read(buf)
113+
if n != 0 {
114+
b.mu.Lock()
115+
b.data = append(b.data, buf[:n]...)
116+
b.mu.Unlock()
117+
}
118+
if err == io.EOF {
119+
return
120+
}
121+
if err != nil {
122+
b.mu.Lock()
123+
b.err = err
124+
b.mu.Unlock()
125+
return
126+
}
127+
// prevent goroutine leak, tie lifetime to the test
128+
select {
129+
case b.writeSync <- struct{}{}:
130+
case <-b.ctx.Done():
131+
return
132+
}
133+
}
134+
}
135+
136+
func (b *bufReader) contents() []byte {
137+
b.mu.Lock()
138+
defer b.mu.Unlock()
139+
return b.data
140+
}
141+
142+
func (b *bufReader) reset() {
143+
b.mu.Lock()
144+
defer b.mu.Unlock()
145+
b.data = nil
146+
}
147+
148+
func (b *bufReader) error() error {
149+
b.mu.Lock()
150+
defer b.mu.Unlock()
151+
return b.err
152+
}

0 commit comments

Comments
 (0)