Skip to content

Commit 9213176

Browse files
authored
fix(build-index): Add retry logic for manifest download during dependency resolution (#502)
* fix(build-index): Add retry logic for manifest download during dependency resolution * Fix * Fix * Fix * Revert log * Test retries * Increase timeout, fix
1 parent 0485100 commit 9213176

File tree

6 files changed

+309
-23
lines changed

6 files changed

+309
-23
lines changed

build-index/tagserver/server.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,15 +180,15 @@ func (s *Server) putTagHandler(w http.ResponseWriter, r *http.Request) error {
180180
}
181181
replicate, err := strconv.ParseBool(httputil.GetQueryArg(r, "replicate", "false"))
182182
if err != nil {
183-
return handler.Errorf("parse query arg `replicate`: %s", err)
183+
return fmt.Errorf("parse query arg `replicate`: %w", err)
184184
}
185185

186186
log.With("tag", tag, "digest", d.String(), "replicate", replicate).Info("Putting tag")
187187

188188
deps, err := s.depResolver.Resolve(tag, d)
189189
if err != nil {
190190
log.With("tag", tag, "digest", d.String(), "error", err).Error("Failed to resolve dependencies")
191-
return fmt.Errorf("resolve dependencies: %s", err)
191+
return fmt.Errorf("resolve dependencies: %w", err)
192192
}
193193

194194
log.With("tag", tag, "digest", d.String(), "dependency_count", len(deps)).Debug("Resolved dependencies")
@@ -252,7 +252,6 @@ func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error {
252252
d, err := s.store.Get(tag)
253253
if err != nil {
254254
if err == tagstore.ErrTagNotFound {
255-
log.With("tag", tag).Debug("Tag not found")
256255
return handler.ErrorStatus(http.StatusNotFound)
257256
}
258257
log.With("tag", tag).Errorf("Failed to get tag from storage: %s", err)
@@ -465,21 +464,18 @@ func (s *Server) getOriginHandler(w http.ResponseWriter, r *http.Request) error
465464
func (s *Server) putTag(tag string, d core.Digest, deps core.DigestList) error {
466465
log.With("tag", tag, "digest", d.String(), "dependency_count", len(deps)).Debug("Validating tag dependencies")
467466

468-
for i, dep := range deps {
467+
for _, dep := range deps {
469468
if _, err := s.localOriginClient.Stat(tag, dep); err == blobclient.ErrBlobNotFound {
470-
log.With("tag", tag, "digest", d.String(), "missing_dependency", dep.String(), "dependency_index", i).Error("Missing dependency blob")
471-
return handler.Errorf("cannot upload tag, missing dependency %s", dep)
469+
return fmt.Errorf("cannot upload tag, missing dependency %s", dep)
472470
} else if err != nil {
473-
log.With("tag", tag, "digest", d.String(), "dependency", dep.String(), "dependency_index", i).Errorf("Failed to check dependency blob: %s", err)
474-
return handler.Errorf("check blob: %s", err)
471+
return fmt.Errorf("check blob: %w", err)
475472
}
476473
}
477474

478475
log.With("tag", tag, "digest", d.String()).Debug("All dependencies validated successfully")
479476

480477
if err := s.store.Put(tag, d, 0); err != nil {
481-
log.With("tag", tag, "digest", d.String(), "error", err).Error("Failed to store tag")
482-
return handler.Errorf("storage: %s", err)
478+
return fmt.Errorf("storage: %w", err)
483479
}
484480

485481
log.With("tag", tag, "digest", d.String()).Info("Tag stored locally")
@@ -526,8 +522,7 @@ func (s *Server) replicateTag(tag string, d core.Digest, deps core.DigestList) e
526522
for _, dest := range destinations {
527523
task := tagreplication.NewTask(tag, d, deps, dest, 0)
528524
if err := s.tagReplicationManager.Add(task); err != nil {
529-
log.With("tag", tag, "digest", d.String(), "destination", dest).Errorf("Failed to add remote replication task: %s", err)
530-
return handler.Errorf("add replicate task: %s", err)
525+
return fmt.Errorf("add replicate task: %w", err)
531526
}
532527
log.With("tag", tag, "digest", d.String(), "destination", dest).Debug("Added remote replication task")
533528
}

build-index/tagtype/docker_resolver.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,66 @@ import (
1717
"bytes"
1818
"fmt"
1919

20+
"github.com/cenkalti/backoff"
2021
"github.com/docker/distribution"
22+
2123
"github.com/uber/kraken/core"
2224
"github.com/uber/kraken/origin/blobclient"
2325
"github.com/uber/kraken/utils/dockerutil"
26+
"github.com/uber/kraken/utils/httputil"
27+
"github.com/uber/kraken/utils/log"
2428
)
2529

2630
type dockerResolver struct {
27-
originClient blobclient.ClusterClient
31+
originClient blobclient.ClusterClient
32+
backoffConfig httputil.ExponentialBackOffConfig
2833
}
2934

3035
// Resolve returns all layers + manifest of given tag as its dependencies.
3136
func (r *dockerResolver) Resolve(tag string, d core.Digest) (core.DigestList, error) {
3237
m, err := r.downloadManifest(tag, d)
3338
if err != nil {
34-
return nil, err
39+
return nil, fmt.Errorf("download manifest: %w", err)
3540
}
3641
deps, err := dockerutil.GetManifestReferences(m)
3742
if err != nil {
38-
return nil, fmt.Errorf("get manifest references: %s", err)
43+
return nil, fmt.Errorf("get manifest references: %w", err)
3944
}
4045
return append(deps, d), nil
4146
}
4247

4348
func (r *dockerResolver) downloadManifest(tag string, d core.Digest) (distribution.Manifest, error) {
4449
buf := &bytes.Buffer{}
45-
if err := r.originClient.DownloadBlob(tag, d, buf); err != nil {
46-
return nil, fmt.Errorf("download blob: %s", err)
50+
attempt := 0
51+
52+
retryFunc := func() error {
53+
attempt++
54+
buf.Reset()
55+
56+
err := r.originClient.DownloadBlob(tag, d, buf)
57+
if err == nil {
58+
return nil
59+
}
60+
61+
if attempt > 1 {
62+
log.With("tag", tag, "digest", d.Hex(), "attempt", attempt, "error", err).
63+
Warn("Manifest download failed, will retry")
64+
}
65+
66+
if err != blobclient.ErrBlobNotFound &&
67+
!httputil.IsNetworkError(err) {
68+
return backoff.Permanent(err)
69+
}
70+
71+
return err
72+
}
73+
74+
if err := backoff.Retry(retryFunc, r.backoffConfig.Build()); err != nil {
75+
return nil, err
4776
}
4877
manifest, _, err := dockerutil.ParseManifest(buf)
4978
if err != nil {
50-
return nil, fmt.Errorf("parse manifest: %s", err)
79+
return nil, fmt.Errorf("parse manifest: %w", err)
5180
}
5281
return manifest, nil
5382
}
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package tagtype
15+
16+
import (
17+
"io"
18+
"testing"
19+
"time"
20+
21+
"github.com/golang/mock/gomock"
22+
"github.com/stretchr/testify/require"
23+
24+
"github.com/uber/kraken/core"
25+
mockblobclient "github.com/uber/kraken/mocks/origin/blobclient"
26+
"github.com/uber/kraken/origin/blobclient"
27+
"github.com/uber/kraken/utils/dockerutil"
28+
"github.com/uber/kraken/utils/httputil"
29+
"github.com/uber/kraken/utils/mockutil"
30+
)
31+
32+
func newTestDockerResolver(ctrl *gomock.Controller) (*dockerResolver, *mockblobclient.MockClusterClient) {
33+
originClient := mockblobclient.NewMockClusterClient(ctrl)
34+
35+
// Use minimal backoff for fast tests
36+
backoffConfig := httputil.ExponentialBackOffConfig{
37+
Enabled: true,
38+
InitialInterval: 1 * time.Millisecond,
39+
RandomizationFactor: 0,
40+
Multiplier: 2,
41+
MaxInterval: 10 * time.Millisecond,
42+
MaxRetries: 3,
43+
}
44+
45+
resolver := &dockerResolver{
46+
originClient: originClient,
47+
backoffConfig: backoffConfig,
48+
}
49+
50+
return resolver, originClient
51+
}
52+
53+
// setupDockerResolverTest sets up common test dependencies.
54+
// Returns require, ctrl, resolver, and mockOrigin.
55+
func setupDockerResolverTest(t *testing.T) (*require.Assertions, *gomock.Controller, *dockerResolver, *mockblobclient.MockClusterClient) {
56+
require := require.New(t)
57+
ctrl := gomock.NewController(t)
58+
t.Cleanup(ctrl.Finish)
59+
60+
resolver, mockOrigin := newTestDockerResolver(ctrl)
61+
62+
return require, ctrl, resolver, mockOrigin
63+
}
64+
65+
// setupDockerResolverTestWithManifest sets up common test dependencies with manifest fixtures.
66+
// Returns require, ctrl, resolver, mockOrigin, tag, layers, manifest digest, and manifest bytes.
67+
func setupDockerResolverTestWithManifest(t *testing.T) (*require.Assertions, *gomock.Controller, *dockerResolver, *mockblobclient.MockClusterClient, string, core.DigestList, core.Digest, []byte) {
68+
require, ctrl, resolver, mockOrigin := setupDockerResolverTest(t)
69+
70+
tag := "repo/image:v1.0"
71+
layers := core.DigestListFixture(3)
72+
manifest, manifestBytes := dockerutil.ManifestFixture(layers[0], layers[1], layers[2])
73+
74+
return require, ctrl, resolver, mockOrigin, tag, layers, manifest, manifestBytes
75+
}
76+
77+
func TestDockerResolver_DownloadManifest_Success(t *testing.T) {
78+
require, _, resolver, mockOrigin, tag, _, manifest, manifestBytes := setupDockerResolverTestWithManifest(t)
79+
80+
// Expect successful download on first attempt
81+
mockOrigin.EXPECT().
82+
DownloadBlob(tag, manifest, mockutil.MatchWriter(manifestBytes)).
83+
Return(nil)
84+
85+
result, err := resolver.downloadManifest(tag, manifest)
86+
require.NoError(err)
87+
require.NotNil(result)
88+
}
89+
90+
func TestDockerResolver_DownloadManifest_RetryOnBlobNotFound(t *testing.T) {
91+
require, _, resolver, mockOrigin, tag, _, manifest, manifestBytes := setupDockerResolverTestWithManifest(t)
92+
93+
// First attempt fails with blob not found, second succeeds
94+
gomock.InOrder(
95+
mockOrigin.EXPECT().
96+
DownloadBlob(tag, manifest, gomock.Any()).
97+
Return(blobclient.ErrBlobNotFound),
98+
mockOrigin.EXPECT().
99+
DownloadBlob(tag, manifest, mockutil.MatchWriter(manifestBytes)).
100+
Return(nil),
101+
)
102+
103+
result, err := resolver.downloadManifest(tag, manifest)
104+
require.NoError(err)
105+
require.NotNil(result)
106+
}
107+
108+
func TestDockerResolver_DownloadManifest_ExhaustedRetries(t *testing.T) {
109+
require, _, resolver, mockOrigin := setupDockerResolverTest(t)
110+
111+
tag := "repo/image:v1.0"
112+
manifest := core.DigestFixture()
113+
114+
// All attempts fail with blob not found (MaxRetries=3, so 4 total attempts)
115+
mockOrigin.EXPECT().
116+
DownloadBlob(tag, manifest, gomock.Any()).
117+
Return(blobclient.ErrBlobNotFound).
118+
Times(4)
119+
120+
result, err := resolver.downloadManifest(tag, manifest)
121+
require.Error(err)
122+
require.Nil(result)
123+
require.Equal(blobclient.ErrBlobNotFound, err)
124+
}
125+
126+
func TestDockerResolver_DownloadManifest_PermanentError(t *testing.T) {
127+
require, _, resolver, mockOrigin := setupDockerResolverTest(t)
128+
129+
tag := "repo/image:v1.0"
130+
manifest := core.DigestFixture()
131+
132+
// 401 Unauthorized is a permanent error (4xx except 404)
133+
permanentErr := httputil.StatusError{Status: 401}
134+
135+
mockOrigin.EXPECT().
136+
DownloadBlob(tag, manifest, gomock.Any()).
137+
Return(permanentErr).
138+
Times(1) // Should only be called once, no retries
139+
140+
result, err := resolver.downloadManifest(tag, manifest)
141+
require.Error(err)
142+
require.Nil(result)
143+
}
144+
145+
func TestDockerResolver_DownloadManifest_BufferResetBetweenRetries(t *testing.T) {
146+
require, _, resolver, mockOrigin, tag, _, manifest, manifestBytes := setupDockerResolverTestWithManifest(t)
147+
148+
partialData := []byte("partial corrupt data")
149+
150+
// First attempt writes partial data then fails
151+
// Second attempt succeeds with full data
152+
gomock.InOrder(
153+
mockOrigin.EXPECT().
154+
DownloadBlob(tag, manifest, gomock.Any()).
155+
DoAndReturn(func(tag string, d core.Digest, dst io.Writer) error {
156+
dst.Write(partialData)
157+
return blobclient.ErrBlobNotFound
158+
}),
159+
mockOrigin.EXPECT().
160+
DownloadBlob(tag, manifest, mockutil.MatchWriter(manifestBytes)).
161+
Return(nil),
162+
)
163+
164+
result, err := resolver.downloadManifest(tag, manifest)
165+
require.NoError(err)
166+
require.NotNil(result)
167+
// If buffer wasn't reset, parsing would fail due to partial data
168+
}
169+
170+
func TestDockerResolver_DownloadManifest_InvalidManifestFormat(t *testing.T) {
171+
require, _, resolver, mockOrigin := setupDockerResolverTest(t)
172+
173+
tag := "repo/image:v1.0"
174+
manifest := core.DigestFixture()
175+
176+
// Download succeeds but returns invalid manifest data
177+
mockOrigin.EXPECT().
178+
DownloadBlob(tag, manifest, gomock.Any()).
179+
DoAndReturn(func(tag string, d core.Digest, dst io.Writer) error {
180+
dst.Write([]byte("invalid manifest json"))
181+
return nil
182+
})
183+
184+
result, err := resolver.downloadManifest(tag, manifest)
185+
require.Error(err)
186+
require.Nil(result)
187+
require.Contains(err.Error(), "parse manifest")
188+
}
189+
190+
func TestDockerResolver_Resolve_Success(t *testing.T) {
191+
require, _, resolver, mockOrigin, tag, layers, manifest, manifestBytes := setupDockerResolverTestWithManifest(t)
192+
193+
mockOrigin.EXPECT().
194+
DownloadBlob(tag, manifest, mockutil.MatchWriter(manifestBytes)).
195+
Return(nil)
196+
197+
deps, err := resolver.Resolve(tag, manifest)
198+
require.NoError(err)
199+
require.Equal(core.DigestList(append(layers, manifest)), deps)
200+
}
201+
202+
func TestDockerResolver_Resolve_DownloadError(t *testing.T) {
203+
require, _, resolver, mockOrigin := setupDockerResolverTest(t)
204+
205+
tag := "repo/image:v1.0"
206+
manifest := core.DigestFixture()
207+
208+
// All retries exhausted
209+
mockOrigin.EXPECT().
210+
DownloadBlob(tag, manifest, gomock.Any()).
211+
Return(blobclient.ErrBlobNotFound).
212+
Times(4)
213+
214+
deps, err := resolver.Resolve(tag, manifest)
215+
require.Error(err)
216+
require.Nil(deps)
217+
require.Contains(err.Error(), "download manifest")
218+
}
219+
220+
func TestDockerResolver_Resolve_WithRetries(t *testing.T) {
221+
require, _, resolver, mockOrigin, tag, layers, manifest, manifestBytes := setupDockerResolverTestWithManifest(t)
222+
223+
// Fails twice, succeeds on third attempt
224+
gomock.InOrder(
225+
mockOrigin.EXPECT().
226+
DownloadBlob(tag, manifest, gomock.Any()).
227+
Return(blobclient.ErrBlobNotFound),
228+
mockOrigin.EXPECT().
229+
DownloadBlob(tag, manifest, gomock.Any()).
230+
Return(blobclient.ErrBlobNotFound),
231+
mockOrigin.EXPECT().
232+
DownloadBlob(tag, manifest, mockutil.MatchWriter(manifestBytes)).
233+
Return(nil),
234+
)
235+
236+
deps, err := resolver.Resolve(tag, manifest)
237+
require.NoError(err)
238+
require.NotNil(deps)
239+
require.Equal(core.DigestList(append(layers, manifest)), deps)
240+
}

0 commit comments

Comments
 (0)