diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go index 94d207de1..8b009f91a 100644 --- a/pkg/v1/remote/write.go +++ b/pkg/v1/remote/write.go @@ -461,8 +461,17 @@ func unpackTaggable(t Taggable) ([]byte, *v1.Descriptor, error) { }, nil } -// commitSubjectReferrers is responsible for updating the fallback tag manifest to track descriptors referring to a subject for registries that don't yet support the Referrers API. -// TODO: use conditional requests to avoid race conditions +// maxFallbackTagRetries is the maximum number of times to retry a fallback +// tag update when a conditional request returns 412 Precondition Failed. +const maxFallbackTagRetries = 3 + +// commitSubjectReferrers is responsible for updating the fallback tag manifest +// to track descriptors referring to a subject for registries that don't yet +// support the Referrers API. +// +// It uses conditional requests (ETag / If-Match / If-None-Match) to prevent +// lost updates when multiple writers attach referrers to the same subject +// concurrently. func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, add v1.Descriptor) error { // Check if the registry supports Referrers API. // TODO: This should be done once per registry, not once per subject. @@ -486,24 +495,51 @@ func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, ad return nil } - // The registry doesn't support Referrers API, we need to update the manifest tagged with the fallback tag. - // Make the request to GET the current manifest. t := fallbackTag(sub) - u = w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), t.Identifier())) - req, err = http.NewRequest(http.MethodGet, u.String(), nil) - if err != nil { + + for attempt := 0; attempt < maxFallbackTagRetries; attempt++ { + etag, err := w.tryCommitFallbackTag(ctx, t, add) + if err == nil { + return nil + } + // If we got a 412, the tag was updated by another writer between + // our GET and PUT. Re-read and retry. + var terr *transport.Error + if errors.As(err, &terr) && terr.StatusCode == http.StatusPreconditionFailed { + logs.Progress.Printf("fallback tag %s was updated concurrently (attempt %d/%d, etag=%s), retrying", + t.Identifier(), attempt+1, maxFallbackTagRetries, etag) + continue + } return err } + + return fmt.Errorf("failed to update fallback tag %s after %d attempts due to concurrent updates", + t.Identifier(), maxFallbackTagRetries) +} + +// tryCommitFallbackTag performs a single GET-modify-PUT cycle for the fallback +// tag with conditional request headers. It returns the ETag used (for logging) +// and any error. A 412 error signals a concurrent update. +func (w *writer) tryCommitFallbackTag(ctx context.Context, t name.Tag, add v1.Descriptor) (string, error) { + // GET the current manifest. + u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), t.Identifier())) + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return "", err + } req.Header.Set("Accept", string(types.OCIImageIndex)) - resp, err = w.client.Do(req.WithContext(ctx)) + resp, err := w.client.Do(req.WithContext(ctx)) if err != nil { - return err + return "", err } defer resp.Body.Close() var im v1.IndexManifest + etag := resp.Header.Get("ETag") + existed := false + if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil { - return err + return etag, err } else if resp.StatusCode == http.StatusNotFound { // Not found just means there are no attachments. Start with an empty index. im = v1.IndexManifest{ @@ -512,20 +548,21 @@ func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, ad Manifests: []v1.Descriptor{add}, } } else { + existed = true if err := json.NewDecoder(resp.Body).Decode(&im); err != nil { - return err + return etag, err } if im.SchemaVersion != 2 { - return fmt.Errorf("fallback tag manifest is not a schema version 2: %d", im.SchemaVersion) + return etag, fmt.Errorf("fallback tag manifest is not a schema version 2: %d", im.SchemaVersion) } if im.MediaType != types.OCIImageIndex { - return fmt.Errorf("fallback tag manifest is not an OCI image index: %s", im.MediaType) + return etag, fmt.Errorf("fallback tag manifest is not an OCI image index: %s", im.MediaType) } for _, desc := range im.Manifests { if desc.Digest == add.Digest { // The digest is already attached, nothing to do. logs.Progress.Printf("fallback tag %s already had referrer", t.Identifier()) - return nil + return etag, nil } } // Append the new descriptor to the index. @@ -536,8 +573,35 @@ func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, ad sort.Slice(im.Manifests, func(i, j int) bool { return im.Manifests[i].Digest.String() < im.Manifests[j].Digest.String() }) + + raw, err := json.Marshal(im) + if err != nil { + return etag, err + } + + // PUT with conditional headers. + req, err = http.NewRequest(http.MethodPut, u.String(), bytes.NewReader(raw)) + if err != nil { + return etag, err + } + req.Header.Set("Content-Type", string(types.OCIImageIndex)) + if existed && etag != "" { + req.Header.Set("If-Match", etag) + } else if !existed { + req.Header.Set("If-None-Match", "*") + } + logs.Progress.Printf("updating fallback tag %s with new referrer", t.Identifier()) - return w.commitManifest(ctx, fallbackTaggable{im}, t) + resp, err = w.client.Do(req.WithContext(ctx)) + if err != nil { + return etag, err + } + defer resp.Body.Close() + + if err := transport.CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil { + return etag, err + } + return etag, nil } type fallbackTaggable struct {