Skip to content

Commit b8654e3

Browse files
authored
Merge pull request containerd#8379 from jedevc/docker-pusher-concurrency
Fix various timing issues with docker pusher
2 parents 4510ca3 + a9152eb commit b8654e3

File tree

3 files changed

+100
-60
lines changed

3 files changed

+100
-60
lines changed

core/content/helpers.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ import (
3131
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3232
)
3333

34-
// maxResets is the no.of times the Copy() method can tolerate a reset of the body
35-
const maxResets = 5
36-
3734
var ErrReset = errors.New("writer has been reset")
3835

3936
var bufPool = sync.Pool{
@@ -149,7 +146,7 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er
149146
// Copy is buffered, so no need to wrap reader in buffered io.
150147
func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
151148
r := or
152-
for i := 0; i < maxResets; i++ {
149+
for i := 0; ; i++ {
153150
if i >= 1 {
154151
log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset")
155152
}
@@ -189,9 +186,6 @@ func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected dig
189186
}
190187
return nil
191188
}
192-
193-
log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets)
194-
return fmt.Errorf("failed to copy after %d retries", maxResets)
195189
}
196190

197191
// CopyReaderAt copies to a writer from a given reader at for the given

core/content/helpers_test.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"bytes"
2121
"context"
2222
_ "crypto/sha256" // required by go-digest
23-
"fmt"
23+
"errors"
2424
"io"
2525
"strings"
2626
"testing"
@@ -42,7 +42,7 @@ func TestCopy(t *testing.T) {
4242
cf1 := func(buf *bytes.Buffer, st Status) commitFunction {
4343
i := 0
4444
return func() error {
45-
// function resets the first time
45+
// function resets the first time, but then succeeds after
4646
if i == 0 {
4747
// this is the case where, the pipewriter to which the data was being written has
4848
// changed. which means we need to clear the buffer
@@ -55,11 +55,28 @@ func TestCopy(t *testing.T) {
5555
}
5656
}
5757

58+
cf2err := errors.New("commit failed")
5859
cf2 := func(buf *bytes.Buffer, st Status) commitFunction {
5960
i := 0
6061
return func() error {
61-
// function resets for more than the maxReset value
62-
if i < maxResets+1 {
62+
// function resets a lot of times, and eventually fails
63+
if i < 10 {
64+
// this is the case where, the pipewriter to which the data was being written has
65+
// changed. which means we need to clear the buffer
66+
i++
67+
buf.Reset()
68+
st.Offset = 0
69+
return ErrReset
70+
}
71+
return cf2err
72+
}
73+
}
74+
75+
cf3 := func(buf *bytes.Buffer, st Status) commitFunction {
76+
i := 0
77+
return func() error {
78+
// function resets a lot of times, and eventually succeeds
79+
if i < 10 {
6380
// this is the case where, the pipewriter to which the data was being written has
6481
// changed. which means we need to clear the buffer
6582
i++
@@ -73,8 +90,10 @@ func TestCopy(t *testing.T) {
7390

7491
s1 := Status{}
7592
s2 := Status{}
93+
s3 := Status{}
7694
b1 := bytes.Buffer{}
7795
b2 := bytes.Buffer{}
96+
b3 := bytes.Buffer{}
7897

7998
var testcases = []struct {
8099
name string
@@ -130,15 +149,25 @@ func TestCopy(t *testing.T) {
130149
expected: "content to copy",
131150
},
132151
{
133-
name: "write fails more than maxReset times due to reset",
152+
name: "write fails after lots of resets",
134153
source: newCopySource("content to copy"),
135154
writer: fakeWriter{
136155
Buffer: &b2,
137156
status: s2,
138157
commitFunc: cf2(&b2, s2),
139158
},
140159
expected: "",
141-
expectedErr: fmt.Errorf("failed to copy after %d retries", maxResets),
160+
expectedErr: cf2err,
161+
},
162+
{
163+
name: "write succeeds after lots of resets",
164+
source: newCopySource("content to copy"),
165+
writer: fakeWriter{
166+
Buffer: &b3,
167+
status: s3,
168+
commitFunc: cf3(&b3, s3),
169+
},
170+
expected: "content to copy",
142171
},
143172
}
144173

@@ -153,7 +182,7 @@ func TestCopy(t *testing.T) {
153182

154183
// if an error is expected then further comparisons are not required
155184
if testcase.expectedErr != nil {
156-
assert.Equal(t, testcase.expectedErr, err)
185+
assert.ErrorIs(t, err, testcase.expectedErr)
157186
return
158187
}
159188

core/remotes/docker/pusher.go

Lines changed: 63 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -280,15 +280,14 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
280280
req.body = func() (io.ReadCloser, error) {
281281
pr, pw := io.Pipe()
282282
pushw.setPipe(pw)
283-
return io.NopCloser(pr), nil
283+
return pr, nil
284284
}
285285
req.size = desc.Size
286286

287287
go func() {
288288
resp, err := req.doWithRetries(ctx, nil)
289289
if err != nil {
290290
pushw.setError(err)
291-
pushw.Close()
292291
return
293292
}
294293

@@ -298,7 +297,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
298297
err := remoteserrors.NewUnexpectedStatusErr(resp)
299298
log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response")
300299
pushw.setError(err)
301-
pushw.Close()
300+
return
302301
}
303302
pushw.setResponse(resp)
304303
}()
@@ -331,10 +330,12 @@ type pushWriter struct {
331330

332331
pipe *io.PipeWriter
333332

334-
pipeC chan *io.PipeWriter
335-
respC chan *http.Response
333+
done chan struct{}
336334
closeOnce sync.Once
337-
errC chan error
335+
336+
pipeC chan *io.PipeWriter
337+
respC chan *http.Response
338+
errC chan error
338339

339340
isManifest bool
340341

@@ -352,19 +353,51 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S
352353
pipeC: make(chan *io.PipeWriter, 1),
353354
respC: make(chan *http.Response, 1),
354355
errC: make(chan error, 1),
356+
done: make(chan struct{}),
355357
isManifest: isManifest,
356358
}
357359
}
358360

359361
func (pw *pushWriter) setPipe(p *io.PipeWriter) {
360-
pw.pipeC <- p
362+
select {
363+
case <-pw.done:
364+
case pw.pipeC <- p:
365+
}
361366
}
362367

363368
func (pw *pushWriter) setError(err error) {
364-
pw.errC <- err
369+
select {
370+
case <-pw.done:
371+
case pw.errC <- err:
372+
}
365373
}
374+
366375
func (pw *pushWriter) setResponse(resp *http.Response) {
367-
pw.respC <- resp
376+
select {
377+
case <-pw.done:
378+
case pw.respC <- resp:
379+
}
380+
}
381+
382+
func (pw *pushWriter) replacePipe(p *io.PipeWriter) error {
383+
if pw.pipe == nil {
384+
pw.pipe = p
385+
return nil
386+
}
387+
388+
pw.pipe.CloseWithError(content.ErrReset)
389+
pw.pipe = p
390+
391+
// If content has already been written, the bytes
392+
// cannot be written again and the caller must reset
393+
status, err := pw.tracker.GetStatus(pw.ref)
394+
if err != nil {
395+
return err
396+
}
397+
status.Offset = 0
398+
status.UpdatedAt = time.Now()
399+
pw.tracker.SetStatus(pw.ref, status)
400+
return content.ErrReset
368401
}
369402

370403
func (pw *pushWriter) Write(p []byte) (n int, err error) {
@@ -374,26 +407,18 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
374407
}
375408

376409
if pw.pipe == nil {
377-
p, ok := <-pw.pipeC
378-
if !ok {
410+
select {
411+
case <-pw.done:
379412
return 0, io.ErrClosedPipe
413+
case p := <-pw.pipeC:
414+
pw.replacePipe(p)
380415
}
381-
pw.pipe = p
382416
} else {
383417
select {
384-
case p, ok := <-pw.pipeC:
385-
if !ok {
386-
return 0, io.ErrClosedPipe
387-
}
388-
pw.pipe.CloseWithError(content.ErrReset)
389-
pw.pipe = p
390-
391-
// If content has already been written, the bytes
392-
// cannot be written and the caller must reset
393-
status.Offset = 0
394-
status.UpdatedAt = time.Now()
395-
pw.tracker.SetStatus(pw.ref, status)
396-
return 0, content.ErrReset
418+
case <-pw.done:
419+
return 0, io.ErrClosedPipe
420+
case p := <-pw.pipeC:
421+
return 0, pw.replacePipe(p)
397422
default:
398423
}
399424
}
@@ -403,9 +428,13 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
403428
// if the pipe is closed, we might have the original error on the error
404429
// channel - so we should try and get it
405430
select {
406-
case err2 := <-pw.errC:
407-
err = err2
408-
default:
431+
case <-pw.done:
432+
case err = <-pw.errC:
433+
pw.Close()
434+
case p := <-pw.pipeC:
435+
return 0, pw.replacePipe(p)
436+
case resp := <-pw.respC:
437+
pw.setResponse(resp)
409438
}
410439
}
411440
status.Offset += int64(n)
@@ -418,7 +447,7 @@ func (pw *pushWriter) Close() error {
418447
// Ensure pipeC is closed but handle `Close()` being
419448
// called multiple times without panicking
420449
pw.closeOnce.Do(func() {
421-
close(pw.pipeC)
450+
close(pw.done)
422451
})
423452
if pw.pipe != nil {
424453
status, err := pw.tracker.GetStatus(pw.ref)
@@ -458,30 +487,18 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
458487
// TODO: timeout waiting for response
459488
var resp *http.Response
460489
select {
490+
case <-pw.done:
491+
return io.ErrClosedPipe
461492
case err := <-pw.errC:
493+
pw.Close()
462494
return err
463495
case resp = <-pw.respC:
464496
defer resp.Body.Close()
465-
case p, ok := <-pw.pipeC:
497+
case p := <-pw.pipeC:
466498
// check whether the pipe has changed in the commit, because sometimes Write
467499
// can complete successfully, but the pipe may have changed. In that case, the
468500
// content needs to be reset.
469-
if !ok {
470-
return io.ErrClosedPipe
471-
}
472-
pw.pipe.CloseWithError(content.ErrReset)
473-
pw.pipe = p
474-
475-
// If content has already been written, the bytes
476-
// cannot be written again and the caller must reset
477-
status, err := pw.tracker.GetStatus(pw.ref)
478-
if err != nil {
479-
return err
480-
}
481-
status.Offset = 0
482-
status.UpdatedAt = time.Now()
483-
pw.tracker.SetStatus(pw.ref, status)
484-
return content.ErrReset
501+
return pw.replacePipe(p)
485502
}
486503

487504
// 201 is specified return status, some registries return

0 commit comments

Comments
 (0)