@@ -380,6 +380,27 @@ func (pw *pushWriter) setResponse(resp *http.Response) {
380
380
}
381
381
}
382
382
383
+ func (pw * pushWriter ) replacePipe (p * io.PipeWriter ) error {
384
+ if pw .pipe == nil {
385
+ pw .pipe = p
386
+ return nil
387
+ }
388
+
389
+ pw .pipe .CloseWithError (content .ErrReset )
390
+ pw .pipe = p
391
+
392
+ // If content has already been written, the bytes
393
+ // cannot be written again and the caller must reset
394
+ status , err := pw .tracker .GetStatus (pw .ref )
395
+ if err != nil {
396
+ return err
397
+ }
398
+ status .Offset = 0
399
+ status .UpdatedAt = time .Now ()
400
+ pw .tracker .SetStatus (pw .ref , status )
401
+ return content .ErrReset
402
+ }
403
+
383
404
func (pw * pushWriter ) Write (p []byte ) (n int , err error ) {
384
405
status , err := pw .tracker .GetStatus (pw .ref )
385
406
if err != nil {
@@ -391,26 +412,14 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
391
412
case <- pw .done :
392
413
return 0 , io .ErrClosedPipe
393
414
case p := <- pw .pipeC :
394
- pw .pipe = p
415
+ pw .replacePipe ( p )
395
416
}
396
417
} else {
397
418
select {
398
419
case <- pw .done :
399
420
return 0 , io .ErrClosedPipe
400
421
case p := <- pw .pipeC :
401
- pw .pipe .CloseWithError (content .ErrReset )
402
- pw .pipe = p
403
-
404
- // If content has already been written, the bytes
405
- // cannot be written again and the caller must reset
406
- status , err := pw .tracker .GetStatus (pw .ref )
407
- if err != nil {
408
- return 0 , err
409
- }
410
- status .Offset = 0
411
- status .UpdatedAt = time .Now ()
412
- pw .tracker .SetStatus (pw .ref , status )
413
- return 0 , content .ErrReset
422
+ return 0 , pw .replacePipe (p )
414
423
default :
415
424
}
416
425
}
@@ -423,19 +432,7 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
423
432
case <- pw .done :
424
433
case err = <- pw .errC :
425
434
case p := <- pw .pipeC :
426
- pw .pipe .CloseWithError (content .ErrReset )
427
- pw .pipe = p
428
-
429
- // If content has already been written, the bytes
430
- // cannot be written again and the caller must reset
431
- status , err := pw .tracker .GetStatus (pw .ref )
432
- if err != nil {
433
- return 0 , err
434
- }
435
- status .Offset = 0
436
- status .UpdatedAt = time .Now ()
437
- pw .tracker .SetStatus (pw .ref , status )
438
- return 0 , content .ErrReset
435
+ return 0 , pw .replacePipe (p )
439
436
}
440
437
}
441
438
status .Offset += int64 (n )
@@ -498,19 +495,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
498
495
// check whether the pipe has changed in the commit, because sometimes Write
499
496
// can complete successfully, but the pipe may have changed. In that case, the
500
497
// content needs to be reset.
501
- pw .pipe .CloseWithError (content .ErrReset )
502
- pw .pipe = p
503
-
504
- // If content has already been written, the bytes
505
- // cannot be written again and the caller must reset
506
- status , err := pw .tracker .GetStatus (pw .ref )
507
- if err != nil {
508
- return err
509
- }
510
- status .Offset = 0
511
- status .UpdatedAt = time .Now ()
512
- pw .tracker .SetStatus (pw .ref , status )
513
- return content .ErrReset
498
+ return pw .replacePipe (p )
514
499
}
515
500
516
501
// 201 is specified return status, some registries return
0 commit comments