@@ -10,9 +10,11 @@ import (
1010 "maps"
1111 "reflect"
1212 "slices"
13+ "sort"
1314 "strings"
1415 "sync"
1516
17+ tarpatch "github.com/containers/tar-diff/pkg/tar-patch"
1618 digest "github.com/opencontainers/go-digest"
1719 imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
1820 "github.com/sirupsen/logrus"
@@ -30,6 +32,20 @@ import (
3032 chunkedToc "go.podman.io/storage/pkg/chunked/toc"
3133)
3234
35+ // formatSize formats a size in bytes with dynamic units (B, KB, MB, GB)
36+ func formatSize (size int64 ) string {
37+ const unit = 1024
38+ if size < unit {
39+ return fmt .Sprintf ("%d B" , size )
40+ }
41+ div , exp := int64 (unit ), 0
42+ for n := size / unit ; n >= unit ; n /= unit {
43+ div *= unit
44+ exp ++
45+ }
46+ return fmt .Sprintf ("%.1f %cB" , float64 (size )/ float64 (div ), "KMGT" [exp ])
47+ }
48+
3349// imageCopier tracks state specific to a single image (possibly an item of a manifest list)
3450type imageCopier struct {
3551 c * copier
@@ -448,6 +464,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) ([]compressiontypes.Algor
448464 srcInfosUpdated = true
449465 }
450466
467+ deltaLayers , err := types .ImageDeltaLayers (ic .src , ctx )
468+ if err != nil {
469+ return nil , err
470+ }
471+
451472 type copyLayerData struct {
452473 destInfo types.BlobInfo
453474 diffID digest.Digest
@@ -466,7 +487,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) ([]compressiontypes.Algor
466487 copyGroup := sync.WaitGroup {}
467488
468489 data := make ([]copyLayerData , len (srcInfos ))
469- copyLayerHelper := func (index int , srcLayer types.BlobInfo , toEncrypt bool , pool * mpb.Progress , srcRef reference.Named ) {
490+ copyLayerHelper := func (index int , srcLayer types.BlobInfo , toEncrypt bool , pool * mpb.Progress , srcRef reference.Named , deltaLayers []types. BlobInfo ) {
470491 defer ic .c .concurrentBlobCopiesSemaphore .Release (1 )
471492 defer copyGroup .Done ()
472493 cld := copyLayerData {}
@@ -481,7 +502,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) ([]compressiontypes.Algor
481502 logrus .Debugf ("Skipping foreign layer %q copy to %s" , cld .destInfo .Digest , ic .c .dest .Reference ().Transport ().Name ())
482503 }
483504 } else {
484- cld .destInfo , cld .diffID , cld .err = ic .copyLayer (ctx , srcLayer , toEncrypt , pool , index , srcRef , manifestLayerInfos [index ].EmptyLayer )
505+ cld .destInfo , cld .diffID , cld .err = ic .copyLayer (ctx , index , srcLayer , toEncrypt , pool , srcRef , manifestLayerInfos [index ].EmptyLayer , deltaLayers )
485506 }
486507 data [index ] = cld
487508 }
@@ -521,7 +542,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) ([]compressiontypes.Algor
521542 return fmt .Errorf ("copying layer: %w" , err )
522543 }
523544 copyGroup .Add (1 )
524- go copyLayerHelper (i , srcLayer , layersToEncrypt .Contains (i ), progressPool , ic .c .rawSource .Reference ().DockerReference ())
545+ go copyLayerHelper (i , srcLayer , layersToEncrypt .Contains (i ), progressPool , ic .c .rawSource .Reference ().DockerReference (), deltaLayers )
525546 }
526547
527548 // A call to copyGroup.Wait() is done at this point by the defer above.
@@ -690,10 +711,85 @@ func compressionEditsFromBlobInfo(srcInfo types.BlobInfo) (types.LayerCompressio
690711 }
691712}
692713
714+ // getMatchingDeltaLayers gets all the deltas that apply to this layer
715+ func (ic * imageCopier ) getMatchingDeltaLayers (ctx context.Context , srcIndex int , deltaLayers []types.BlobInfo ) (digest.Digest , []* types.BlobInfo ) {
716+ if deltaLayers == nil {
717+ return "" , nil
718+ }
719+ config , _ := ic .src .OCIConfig (ctx )
720+ if config == nil || config .RootFS .DiffIDs == nil || len (config .RootFS .DiffIDs ) <= srcIndex {
721+ return "" , nil
722+ }
723+
724+ layerDiffID := config .RootFS .DiffIDs [srcIndex ]
725+
726+ var matchingLayers []* types.BlobInfo
727+ for i := range deltaLayers {
728+ deltaLayer := & deltaLayers [i ]
729+ to := deltaLayer .Annotations ["io.github.containers.delta.to" ]
730+ if to == layerDiffID .String () {
731+ matchingLayers = append (matchingLayers , deltaLayer )
732+ }
733+ }
734+
735+ return layerDiffID , matchingLayers
736+ }
737+
738+ // resolveDeltaLayer looks at which of the matching delta froms have locally available data and picks the best one
739+ func (ic * imageCopier ) resolveDeltaLayer (ctx context.Context , matchingDeltas []* types.BlobInfo ) (io.ReadCloser , tarpatch.DataSource , types.BlobInfo , error ) {
740+ // Sort smallest deltas so we favour the smallest useable one
741+ sort .Slice (matchingDeltas , func (i , j int ) bool {
742+ return matchingDeltas [i ].Size < matchingDeltas [j ].Size
743+ })
744+
745+ for i := range matchingDeltas {
746+ matchingDelta := matchingDeltas [i ]
747+ from := matchingDelta .Annotations ["io.github.containers.delta.from" ]
748+ fromDigest , err := digest .Parse (from )
749+ if err != nil {
750+ continue // Silently ignore if server specified a weird format
751+ }
752+
753+ dataSource , err := types .ImageDestinationGetLayerDeltaData (ic .c .dest , ctx , fromDigest )
754+ if err != nil {
755+ return nil , nil , types.BlobInfo {}, err // Internal error
756+ }
757+ if dataSource == nil {
758+ continue // from layer doesn't exist
759+ }
760+
761+ logrus .Debugf ("Using delta %v for DiffID %v" , matchingDelta .Digest , fromDigest )
762+
763+ deltaStream , _ , err := ic .c .rawSource .GetBlob (ctx , * matchingDelta , ic .c .blobInfoCache )
764+ if err != nil {
765+ return nil , nil , types.BlobInfo {}, fmt .Errorf ("reading delta blob %s: %w" , matchingDelta .Digest , err )
766+ }
767+ return deltaStream , dataSource , * matchingDelta , nil
768+ }
769+ return nil , nil , types.BlobInfo {}, nil
770+ }
771+
772+ // canUseDeltas checks if deltas can be used for this layer
773+ func (ic * imageCopier ) canUseDeltas (srcInfo types.BlobInfo ) (bool , string ) {
774+ // Deltas rewrite the manifest to refer to the uncompressed digest, so we must be able to substitute blobs
775+ if ! ic .canSubstituteBlobs {
776+ return false , ""
777+ }
778+
779+ switch srcInfo .MediaType {
780+ case manifest .DockerV2Schema2LayerMediaType , manifest .DockerV2SchemaLayerMediaTypeUncompressed :
781+ return true , manifest .DockerV2SchemaLayerMediaTypeUncompressed
782+ case imgspecv1 .MediaTypeImageLayer , imgspecv1 .MediaTypeImageLayerGzip , imgspecv1 .MediaTypeImageLayerZstd :
783+ return true , imgspecv1 .MediaTypeImageLayer
784+ }
785+
786+ return false , ""
787+ }
788+
693789// copyLayer copies a layer with srcInfo (with known Digest and Annotations and possibly known Size) in src to dest, perhaps (de/re/)compressing it,
694790// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
695791// srcRef can be used as an additional hint to the destination during checking whether a layer can be reused but srcRef can be nil.
696- func (ic * imageCopier ) copyLayer (ctx context.Context , srcInfo types.BlobInfo , toEncrypt bool , pool * mpb.Progress , layerIndex int , srcRef reference.Named , emptyLayer bool ) (types.BlobInfo , digest.Digest , error ) {
792+ func (ic * imageCopier ) copyLayer (ctx context.Context , srcIndex int , srcInfo types.BlobInfo , toEncrypt bool , pool * mpb.Progress , srcRef reference.Named , emptyLayer bool , deltaLayers []types. BlobInfo ) (types.BlobInfo , digest.Digest , error ) {
697793 // If the srcInfo doesn't contain compression information, try to compute it from the
698794 // MediaType, which was either read from a manifest by way of LayerInfos() or constructed
699795 // by LayerInfosForCopy(), if it was supplied at all. If we succeed in copying the blob,
@@ -712,6 +808,59 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
712808
713809 ic .c .printCopyInfo ("blob" , srcInfo )
714810
811+ // First look for a delta that matches this layer and substitute the result of that
812+ if ok , deltaResultMediaType := ic .canUseDeltas (srcInfo ); ok {
813+ // Get deltas going TO this layer
814+ deltaDiffID , matchingDeltas := ic .getMatchingDeltaLayers (ctx , srcIndex , deltaLayers )
815+ // Get best possible FROM delta
816+ deltaStream , deltaDataSource , matchingDelta , err := ic .resolveDeltaLayer (ctx , matchingDeltas )
817+ if err != nil {
818+ return types.BlobInfo {}, "" , err
819+ }
820+ if deltaStream != nil {
821+ logrus .Debugf ("Applying delta for layer %s (delta size: %.1f MB)" , deltaDiffID , float64 (matchingDelta .Size )/ (1024.0 * 1024.0 ))
822+ bar , err := ic .c .createProgressBar (pool , false , matchingDelta , "delta" , "done" )
823+ if err != nil {
824+ return types.BlobInfo {}, "" , err
825+ }
826+
827+ wrappedDeltaStream := bar .ProxyReader (deltaStream )
828+
829+ // Convert deltaStream to uncompressed tar layer stream
830+ pr , pw := io .Pipe ()
831+ go func () {
832+ if err := tarpatch .Apply (wrappedDeltaStream , deltaDataSource , pw ); err != nil {
833+ // We will notice this error when failing to verify the digest, so leave it be
834+ logrus .Infof ("Failed to apply layer delta: %v" , err )
835+ }
836+ deltaDataSource .Close ()
837+ deltaStream .Close ()
838+ wrappedDeltaStream .Close ()
839+ pw .Close ()
840+ }()
841+ defer pr .Close ()
842+
843+ // Copy uncompressed tar layer to destination, verifying the diffID
844+ blobInfo , diffIDChan , err := ic .copyLayerFromStream (ctx , pr , types.BlobInfo {Digest : deltaDiffID , Size : - 1 , MediaType : deltaResultMediaType , Annotations : srcInfo .Annotations }, true , toEncrypt , bar , srcIndex , emptyLayer )
845+ if err != nil {
846+ return types.BlobInfo {}, "" , err
847+ }
848+
849+ // Wait for diffID verification
850+ diffIDResult := <- diffIDChan
851+ if diffIDResult .err != nil {
852+ return types.BlobInfo {}, "" , diffIDResult .err
853+ }
854+
855+ bar .SetTotal (matchingDelta .Size , true )
856+
857+ // Record the fact that this blob is uncompressed
858+ ic .c .blobInfoCache .RecordDigestUncompressedPair (diffIDResult .digest , diffIDResult .digest )
859+
860+ return blobInfo , diffIDResult .digest , nil
861+ }
862+ }
863+
715864 diffIDIsNeeded := false
716865 var cachedDiffID digest.Digest = ""
717866 if ic .diffIDsAreNeeded {
@@ -751,7 +900,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
751900 Cache : ic .c .blobInfoCache ,
752901 CanSubstitute : canSubstitute ,
753902 EmptyLayer : emptyLayer ,
754- LayerIndex : & layerIndex ,
903+ LayerIndex : & srcIndex ,
755904 SrcRef : srcRef ,
756905 PossibleManifestFormats : append ([]string {ic .manifestConversionPlan .preferredMIMEType }, ic .manifestConversionPlan .otherMIMETypeCandidates ... ),
757906 RequiredCompression : requiredCompression ,
@@ -813,7 +962,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
813962 uploadedBlob , err := ic .c .dest .PutBlobPartial (ctx , & proxy , srcInfo , private.PutBlobPartialOptions {
814963 Cache : ic .c .blobInfoCache ,
815964 EmptyLayer : emptyLayer ,
816- LayerIndex : layerIndex ,
965+ LayerIndex : srcIndex ,
817966 })
818967 if err == nil {
819968 if srcInfo .Size != - 1 {
@@ -856,7 +1005,9 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
8561005 }
8571006 defer srcStream .Close ()
8581007
859- blobInfo , diffIDChan , err := ic .copyLayerFromStream (ctx , srcStream , types.BlobInfo {Digest : srcInfo .Digest , Size : srcBlobSize , MediaType : srcInfo .MediaType , Annotations : srcInfo .Annotations }, diffIDIsNeeded , toEncrypt , bar , layerIndex , emptyLayer )
1008+ logrus .Debugf ("Downloading layer %s (blob size: %s)" , srcInfo .Digest , formatSize (srcBlobSize ))
1009+
1010+ blobInfo , diffIDChan , err := ic .copyLayerFromStream (ctx , srcStream , types.BlobInfo {Digest : srcInfo .Digest , Size : srcBlobSize , MediaType : srcInfo .MediaType , Annotations : srcInfo .Annotations }, diffIDIsNeeded , toEncrypt , bar , srcIndex , emptyLayer )
8601011 if err != nil {
8611012 return types.BlobInfo {}, "" , err
8621013 }
0 commit comments