Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 79 additions & 15 deletions pkg/v1/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,17 @@ func unpackTaggable(t Taggable) ([]byte, *v1.Descriptor, error) {
}, nil
}

// commitSubjectReferrers is responsible for updating the fallback tag manifest to track descriptors referring to a subject for registries that don't yet support the Referrers API.
// TODO: use conditional requests to avoid race conditions
// maxFallbackTagRetries is the maximum number of times to retry a fallback
// tag update when a conditional request returns 412 Precondition Failed.
const maxFallbackTagRetries = 3

// commitSubjectReferrers is responsible for updating the fallback tag manifest
// to track descriptors referring to a subject for registries that don't yet
// support the Referrers API.
//
// It uses conditional requests (ETag / If-Match / If-None-Match) to prevent
// lost updates when multiple writers attach referrers to the same subject
// concurrently.
func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, add v1.Descriptor) error {
// Check if the registry supports Referrers API.
// TODO: This should be done once per registry, not once per subject.
Expand All @@ -486,24 +495,51 @@ func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, ad
return nil
}

// The registry doesn't support Referrers API, we need to update the manifest tagged with the fallback tag.
// Make the request to GET the current manifest.
t := fallbackTag(sub)
u = w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), t.Identifier()))
req, err = http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {

for attempt := 0; attempt < maxFallbackTagRetries; attempt++ {
etag, err := w.tryCommitFallbackTag(ctx, t, add)
if err == nil {
return nil
}
// If we got a 412, the tag was updated by another writer between
// our GET and PUT. Re-read and retry.
var terr *transport.Error
if errors.As(err, &terr) && terr.StatusCode == http.StatusPreconditionFailed {
logs.Progress.Printf("fallback tag %s was updated concurrently (attempt %d/%d, etag=%s), retrying",
t.Identifier(), attempt+1, maxFallbackTagRetries, etag)
continue
}
return err
}

return fmt.Errorf("failed to update fallback tag %s after %d attempts due to concurrent updates",
t.Identifier(), maxFallbackTagRetries)
}

// tryCommitFallbackTag performs a single GET-modify-PUT cycle for the fallback
// tag with conditional request headers. It returns the ETag used (for logging)
// and any error. A 412 error signals a concurrent update.
func (w *writer) tryCommitFallbackTag(ctx context.Context, t name.Tag, add v1.Descriptor) (string, error) {
// GET the current manifest.
u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), t.Identifier()))
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return "", err
}
req.Header.Set("Accept", string(types.OCIImageIndex))
resp, err = w.client.Do(req.WithContext(ctx))
resp, err := w.client.Do(req.WithContext(ctx))
if err != nil {
return err
return "", err
}
defer resp.Body.Close()

var im v1.IndexManifest
etag := resp.Header.Get("ETag")
existed := false

if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil {
return err
return etag, err
} else if resp.StatusCode == http.StatusNotFound {
// Not found just means there are no attachments. Start with an empty index.
im = v1.IndexManifest{
Expand All @@ -512,20 +548,21 @@ func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, ad
Manifests: []v1.Descriptor{add},
}
} else {
existed = true
if err := json.NewDecoder(resp.Body).Decode(&im); err != nil {
return err
return etag, err
}
if im.SchemaVersion != 2 {
return fmt.Errorf("fallback tag manifest is not a schema version 2: %d", im.SchemaVersion)
return etag, fmt.Errorf("fallback tag manifest is not a schema version 2: %d", im.SchemaVersion)
}
if im.MediaType != types.OCIImageIndex {
return fmt.Errorf("fallback tag manifest is not an OCI image index: %s", im.MediaType)
return etag, fmt.Errorf("fallback tag manifest is not an OCI image index: %s", im.MediaType)
}
for _, desc := range im.Manifests {
if desc.Digest == add.Digest {
// The digest is already attached, nothing to do.
logs.Progress.Printf("fallback tag %s already had referrer", t.Identifier())
return nil
return etag, nil
}
}
// Append the new descriptor to the index.
Expand All @@ -536,8 +573,35 @@ func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, ad
sort.Slice(im.Manifests, func(i, j int) bool {
return im.Manifests[i].Digest.String() < im.Manifests[j].Digest.String()
})

raw, err := json.Marshal(im)
if err != nil {
return etag, err
}

// PUT with conditional headers.
req, err = http.NewRequest(http.MethodPut, u.String(), bytes.NewReader(raw))
if err != nil {
return etag, err
}
req.Header.Set("Content-Type", string(types.OCIImageIndex))
if existed && etag != "" {
req.Header.Set("If-Match", etag)
} else if !existed {
req.Header.Set("If-None-Match", "*")
}

logs.Progress.Printf("updating fallback tag %s with new referrer", t.Identifier())
return w.commitManifest(ctx, fallbackTaggable{im}, t)
resp, err = w.client.Do(req.WithContext(ctx))
if err != nil {
return etag, err
}
defer resp.Body.Close()

if err := transport.CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil {
return etag, err
}
return etag, nil
}

type fallbackTaggable struct {
Expand Down