@@ -331,10 +331,12 @@ type pushWriter struct {
331
331
332
332
pipe * io.PipeWriter
333
333
334
- pipeC chan * io.PipeWriter
335
- respC chan * http.Response
334
+ done chan struct {}
336
335
closeOnce sync.Once
337
- errC chan error
336
+
337
+ pipeC chan * io.PipeWriter
338
+ respC chan * http.Response
339
+ errC chan error
338
340
339
341
isManifest bool
340
342
@@ -352,19 +354,30 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S
352
354
pipeC : make (chan * io.PipeWriter , 1 ),
353
355
respC : make (chan * http.Response , 1 ),
354
356
errC : make (chan error , 1 ),
357
+ done : make (chan struct {}),
355
358
isManifest : isManifest ,
356
359
}
357
360
}
358
361
359
362
func (pw * pushWriter ) setPipe (p * io.PipeWriter ) {
360
- pw .pipeC <- p
363
+ select {
364
+ case <- pw .done :
365
+ case pw .pipeC <- p :
366
+ }
361
367
}
362
368
363
369
func (pw * pushWriter ) setError (err error ) {
364
- pw .errC <- err
370
+ select {
371
+ case <- pw .done :
372
+ case pw .errC <- err :
373
+ }
365
374
}
375
+
366
376
func (pw * pushWriter ) setResponse (resp * http.Response ) {
367
- pw .respC <- resp
377
+ select {
378
+ case <- pw .done :
379
+ case pw .respC <- resp :
380
+ }
368
381
}
369
382
370
383
func (pw * pushWriter ) Write (p []byte ) (n int , err error ) {
@@ -374,22 +387,26 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
374
387
}
375
388
376
389
if pw .pipe == nil {
377
- p , ok := <- pw . pipeC
378
- if ! ok {
390
+ select {
391
+ case <- pw . done :
379
392
return 0 , io .ErrClosedPipe
393
+ case p := <- pw .pipeC :
394
+ pw .pipe = p
380
395
}
381
- pw .pipe = p
382
396
} else {
383
397
select {
384
- case p , ok := <- pw .pipeC :
385
- if ! ok {
386
- return 0 , io .ErrClosedPipe
387
- }
398
+ case <- pw .done :
399
+ return 0 , io .ErrClosedPipe
400
+ case p := <- pw .pipeC :
388
401
pw .pipe .CloseWithError (content .ErrReset )
389
402
pw .pipe = p
390
403
391
404
// If content has already been written, the bytes
392
- // cannot be written and the caller must reset
405
+ // cannot be written again and the caller must reset
406
+ status , err := pw .tracker .GetStatus (pw .ref )
407
+ if err != nil {
408
+ return 0 , err
409
+ }
393
410
status .Offset = 0
394
411
status .UpdatedAt = time .Now ()
395
412
pw .tracker .SetStatus (pw .ref , status )
@@ -418,7 +435,7 @@ func (pw *pushWriter) Close() error {
418
435
// Ensure pipeC is closed but handle `Close()` being
419
436
// called multiple times without panicking
420
437
pw .closeOnce .Do (func () {
421
- close (pw .pipeC )
438
+ close (pw .done )
422
439
})
423
440
if pw .pipe != nil {
424
441
status , err := pw .tracker .GetStatus (pw .ref )
@@ -458,17 +475,16 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
458
475
// TODO: timeout waiting for response
459
476
var resp * http.Response
460
477
select {
478
+ case <- pw .done :
479
+ return io .ErrClosedPipe
461
480
case err := <- pw .errC :
462
481
return err
463
482
case resp = <- pw .respC :
464
483
defer resp .Body .Close ()
465
- case p , ok := <- pw .pipeC :
484
+ case p := <- pw .pipeC :
466
485
// check whether the pipe has changed in the commit, because sometimes Write
467
486
// can complete successfully, but the pipe may have changed. In that case, the
468
487
// content needs to be reset.
469
- if ! ok {
470
- return io .ErrClosedPipe
471
- }
472
488
pw .pipe .CloseWithError (content .ErrReset )
473
489
pw .pipe = p
474
490
0 commit comments