Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions pkg/distribution/distribution/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (

// Client provides model distribution functionality
type Client struct {
store *store.LocalStore
log *logrus.Entry
registry *registry.Client
store *store.LocalStore
log *logrus.Entry
registry *registry.Client
resumableTransport *store.ResumableTransport
}

// GetStorePath returns the root path where models are stored
Expand Down Expand Up @@ -117,9 +118,15 @@ func NewClient(opts ...Option) (*Client, error) {
return nil, fmt.Errorf("initializing store: %w", err)
}

// Wrap the transport with resumable transport to support resuming downloads
resumableTransport := store.NewResumableTransport(options.transport, s)

// Set the resumable transport in the store so it can use it
s.SetResumableTransport(resumableTransport)

// Create registry client options
registryOpts := []registry.ClientOption{
registry.WithTransport(options.transport),
registry.WithTransport(resumableTransport),
registry.WithUserAgent(options.userAgent),
}

Expand All @@ -130,9 +137,10 @@ func NewClient(opts ...Option) (*Client, error) {

options.logger.Infoln("Successfully initialized store")
return &Client{
store: s,
log: options.logger,
registry: registry.NewClient(registryOpts...),
store: s,
log: options.logger,
registry: registry.NewClient(registryOpts...),
resumableTransport: resumableTransport,
}, nil
}

Expand Down
74 changes: 73 additions & 1 deletion pkg/distribution/internal/store/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,91 @@ func (s *LocalStore) writeLayer(layer blob, updates chan<- v1.Update) (bool, v1.
return false, hash, nil
}

// Check if there's a partial download and set resume offset in transport
resumeOffset := int64(0)
if s.resumableTransport != nil {
path, err := s.blobPath(hash)
if err == nil {
incompletePath := incompletePath(path)
if stat, statErr := os.Stat(incompletePath); statErr == nil && stat.Size() > 0 {
resumeOffset = stat.Size()
s.resumableTransport.SetResumeOffset(hash, resumeOffset)
// Clear it after use
defer s.resumableTransport.ClearResumeOffset(hash)
}
}
}

lr, err := layer.Uncompressed()
if err != nil {
return false, v1.Hash{}, fmt.Errorf("get blob contents: %w", err)
}
defer lr.Close()
r := progress.NewReader(lr, updates)

if err := s.WriteBlob(hash, r); err != nil {
// Check if resume actually worked
actualResumeOffset := resumeOffset
if s.resumableTransport != nil && resumeOffset > 0 {
if !s.resumableTransport.DidResume(hash) {
// Resume didn't work, server doesn't support it
// The reader will have full content, so write from scratch
actualResumeOffset = 0
}
}

if err := s.writeBlobResumable(hash, r, actualResumeOffset); err != nil {
return false, hash, err
}
return true, hash, nil
}

// writeBlobResumable writes the blob to the store with resume support
func (s *LocalStore) writeBlobResumable(diffID v1.Hash, r io.Reader, resumeOffset int64) error {
hasBlob, err := s.hasBlob(diffID)
if err != nil {
return fmt.Errorf("check blob existence: %w", err)
}
if hasBlob {
return nil
}

path, err := s.blobPath(diffID)
if err != nil {
return fmt.Errorf("get blob path: %w", err)
}

incompletePath := incompletePath(path)

// Open file for writing (append if resuming, create otherwise)
var f *os.File
if resumeOffset > 0 {
// Resuming, open for append
f, err = os.OpenFile(incompletePath, os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("open incomplete blob file for resume: %w", err)
}
} else {
// Not resuming or resume failed, start from scratch
f, err = createFile(incompletePath)
if err != nil {
return fmt.Errorf("create blob file: %w", err)
}
}

defer os.Remove(incompletePath)
defer f.Close()

if _, err := io.Copy(f, r); err != nil {
return fmt.Errorf("copy blob %q to store: %w", diffID.String(), err)
}

f.Close() // Rename will fail on Windows if the file is still open.
if err := os.Rename(incompletePath, path); err != nil {
return fmt.Errorf("rename blob file: %w", err)
}
return nil
}

// WriteBlob writes the blob to the store, reporting progress to the given channel.
// If the blob is already in the store, it is a no-op and the blob is not consumed from the reader.
func (s *LocalStore) WriteBlob(diffID v1.Hash, r io.Reader) error {
Expand Down
140 changes: 140 additions & 0 deletions pkg/distribution/internal/store/resume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package store

import (
"fmt"
"net/http"
"os"
"strings"
"sync"

v1 "github.com/docker/model-runner/pkg/go-containerregistry/pkg/v1"
)

// ResumableTransport wraps an HTTP transport and adds Range header support for resuming downloads
type ResumableTransport struct {
base http.RoundTripper
store *LocalStore
offsetsMu sync.RWMutex
offsets map[string]int64 // Maps blob digest to resume offset
resumedMu sync.RWMutex
resumed map[string]bool // Tracks which blobs actually resumed successfully
}

// NewResumableTransport creates a new resumable transport
func NewResumableTransport(base http.RoundTripper, store *LocalStore) *ResumableTransport {
return &ResumableTransport{
base: base,
store: store,
offsets: make(map[string]int64),
resumed: make(map[string]bool),
}
}

// SetResumeOffset sets the resume offset for a blob
func (rt *ResumableTransport) SetResumeOffset(digest v1.Hash, offset int64) {
rt.offsetsMu.Lock()
defer rt.offsetsMu.Unlock()
rt.offsets[digest.String()] = offset
// Reset resumed status
rt.resumedMu.Lock()
rt.resumed[digest.String()] = false
rt.resumedMu.Unlock()
}

// GetResumeOffset gets the resume offset for a blob
func (rt *ResumableTransport) GetResumeOffset(digest v1.Hash) int64 {
rt.offsetsMu.RLock()
defer rt.offsetsMu.RUnlock()
return rt.offsets[digest.String()]
}

// DidResume returns true if the blob was actually resumed (server returned 206)
func (rt *ResumableTransport) DidResume(digest v1.Hash) bool {
rt.resumedMu.RLock()
defer rt.resumedMu.RUnlock()
return rt.resumed[digest.String()]
}

// ClearResumeOffset clears the resume offset for a blob
func (rt *ResumableTransport) ClearResumeOffset(digest v1.Hash) {
rt.offsetsMu.Lock()
defer rt.offsetsMu.Unlock()
delete(rt.offsets, digest.String())
rt.resumedMu.Lock()
delete(rt.resumed, digest.String())
rt.resumedMu.Unlock()
}

// RoundTrip implements http.RoundTripper
func (rt *ResumableTransport) RoundTrip(req *http.Request) (*http.Response, error) {
// Check if this is a blob request that should be resumed
// Blob requests look like: /v2/{name}/blobs/{digest}
if req.Method == http.MethodGet && strings.Contains(req.URL.Path, "/blobs/sha256:") {
// Extract digest from URL
parts := strings.Split(req.URL.Path, "/blobs/")
if len(parts) == 2 {
digestStr := parts[1]
// Remove any query parameters
if idx := strings.Index(digestStr, "?"); idx != -1 {
digestStr = digestStr[:idx]
}

// Check if we have a resume offset for this digest
if strings.HasPrefix(digestStr, "sha256:") {
hash, err := v1.NewHash(digestStr)
if err == nil {
offset := rt.GetResumeOffset(hash)
if offset > 0 {
// Add Range header to resume download
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))

// Make the request
resp, err := rt.base.RoundTrip(req)
if err != nil {
return resp, err
}

// Check if server supported the range request
if resp.StatusCode == http.StatusPartialContent {
// Success! Mark as resumed
rt.resumedMu.Lock()
rt.resumed[hash.String()] = true
rt.resumedMu.Unlock()
return resp, nil
} else if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable || resp.StatusCode == http.StatusOK {
// Server doesn't support range or returned full content
// Close this response and retry without Range header
resp.Body.Close()
req.Header.Del("Range")
// Fall through to regular request below
} else {
// Some other status, return as-is
return resp, nil
}
}
}
}
}
}

return rt.base.RoundTrip(req)
}

// getIncompleteFileSize returns the size of the incomplete file for a given hash, or 0 if it doesn't exist
func (s *LocalStore) getIncompleteFileSize(hash v1.Hash) (int64, error) {
path, err := s.blobPath(hash)
if err != nil {
return 0, err
}

incompletePath := incompletePath(path)
stat, err := os.Stat(incompletePath)
if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
return 0, err
}

return stat.Size(), nil
}
8 changes: 7 additions & 1 deletion pkg/distribution/internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ const (

// LocalStore implements the Store interface for local storage
type LocalStore struct {
rootPath string
rootPath string
resumableTransport *ResumableTransport
}

// SetResumableTransport sets the resumable transport for this store
func (s *LocalStore) SetResumableTransport(rt *ResumableTransport) {
s.resumableTransport = rt
}

// RootPath returns the root path of the store
Expand Down
17 changes: 0 additions & 17 deletions pkg/go-containerregistry/cmd/krane/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ replace github.com/google/go-containerregistry => ../../
require (
github.com/awslabs/amazon-ecr-credential-helper/ecr-login v0.11.0
github.com/chrismellard/docker-credential-acr-env v0.0.0-20230304212654-82a0ddb27589
github.com/google/go-containerregistry v0.20.3
)

require (
cloud.google.com/go/compute/metadata v0.7.0 // indirect
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.30 // indirect
Expand All @@ -36,26 +34,11 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.39.1 // indirect
github.com/aws/smithy-go v1.23.2 // indirect
github.com/containerd/stargz-snapshotter/estargz v0.16.3 // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/docker/cli v28.2.2+incompatible // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/docker-credential-helpers v0.9.4 // indirect
github.com/golang-jwt/jwt/v4 v4.5.2 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.9.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/vbatts/tar-split v0.12.1 // indirect
golang.org/x/crypto v0.38.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.15.0 // indirect
golang.org/x/sys v0.33.0 // indirect
gotest.tools/v3 v3.1.0 // indirect
)
Loading