Skip to content

Commit 8b918c0

Browse files
authored
feat: disable retries for oras, self-control retry at the layer level (#167)
Signed-off-by: chlins <[email protected]>
1 parent bb291a5 commit 8b918c0

File tree

10 files changed

+166
-78
lines changed

10 files changed

+166
-78
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.24.1
44

55
require (
66
github.com/CloudNativeAI/model-spec v0.0.3
7+
github.com/avast/retry-go/v4 v4.6.1
78
github.com/briandowns/spinner v1.23.2
89
github.com/distribution/distribution/v3 v3.0.0
910
github.com/distribution/reference v0.6.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFI
1717
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
1818
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
1919
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
20+
github.com/avast/retry-go/v4 v4.6.1 h1:VkOLRubHdisGrHnTu89g08aQEWEgRU7LVEop3GbIcMk=
21+
github.com/avast/retry-go/v4 v4.6.1/go.mod h1:V6oF8njAwxJ5gRo1Q7Cxab24xs5NCWZBeaHHBklR8mA=
2022
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
2123
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
2224
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=

internal/pb/pb.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ func (p *ProgressBar) Add(prompt, name string, size int64, reader io.Reader) io.
5959
oldBar := p.bars[name]
6060
p.mu.RUnlock()
6161

62+
// If the bar exists, drop and remove it.
6263
if oldBar != nil {
63-
return reader
64+
oldBar.Abort(true)
6465
}
6566

6667
newBar := &progressBar{size: size, msg: fmt.Sprintf("%s %s", prompt, name)}
@@ -101,6 +102,18 @@ func (p *ProgressBar) Complete(name string, msg string) {
101102
}
102103
}
103104

105+
// Abort aborts the progress bar.
106+
func (p *ProgressBar) Abort(name string, err error) {
107+
p.mu.RLock()
108+
bar, ok := p.bars[name]
109+
p.mu.RUnlock()
110+
111+
if ok {
112+
// TODO: Log error message.
113+
bar.Abort(true)
114+
}
115+
}
116+
104117
// Start starts the progress bar.
105118
func (p *ProgressBar) Start() {}
106119

pkg/backend/build.go

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import (
2323
"os"
2424
"path/filepath"
2525

26-
"github.com/CloudNativeAI/modctl/internal/pb"
26+
modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"
27+
retry "github.com/avast/retry-go/v4"
28+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
29+
2730
internalpb "github.com/CloudNativeAI/modctl/internal/pb"
2831
"github.com/CloudNativeAI/modctl/pkg/backend/build"
2932
buildconfig "github.com/CloudNativeAI/modctl/pkg/backend/build/config"
@@ -33,9 +36,6 @@ import (
3336
"github.com/CloudNativeAI/modctl/pkg/config"
3437
"github.com/CloudNativeAI/modctl/pkg/modelfile"
3538
"github.com/CloudNativeAI/modctl/pkg/source"
36-
37-
modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"
38-
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3939
)
4040

4141
const (
@@ -113,34 +113,41 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri
113113
SourceURL: sourceInfo.URL,
114114
SourceRevision: revision,
115115
}
116-
configDesc, err := builder.BuildConfig(ctx, layers, modelConfig, hooks.NewHooks(
117-
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
118-
return pb.Add(internalpb.NormalizePrompt("Building config"), name, size, reader)
119-
}),
120-
hooks.WithOnError(func(name string, err error) {
121-
pb.Complete(name, fmt.Sprintf("Failed to build config: %v", err))
122-
}),
123-
hooks.WithOnComplete(func(name string, desc ocispec.Descriptor) {
124-
pb.Complete(name, fmt.Sprintf("%s %s", internalpb.NormalizePrompt("Built config"), desc.Digest))
125-
}),
126-
))
127-
if err != nil {
116+
117+
var configDesc ocispec.Descriptor
118+
// Build the model config.
119+
if err := retry.Do(func() error {
120+
configDesc, err = builder.BuildConfig(ctx, layers, modelConfig, hooks.NewHooks(
121+
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
122+
return pb.Add(internalpb.NormalizePrompt("Building config"), name, size, reader)
123+
}),
124+
hooks.WithOnError(func(name string, err error) {
125+
pb.Abort(name, fmt.Errorf("failed to build config: %w", err))
126+
}),
127+
hooks.WithOnComplete(func(name string, desc ocispec.Descriptor) {
128+
pb.Complete(name, fmt.Sprintf("%s %s", internalpb.NormalizePrompt("Built config"), desc.Digest))
129+
}),
130+
))
131+
return err
132+
}, retryOpts...); err != nil {
128133
return fmt.Errorf("failed to build model config: %w", err)
129134
}
130135

131136
// Build the model manifest.
132-
_, err = builder.BuildManifest(ctx, layers, configDesc, manifestAnnotation(modelfile), hooks.NewHooks(
133-
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
134-
return pb.Add(internalpb.NormalizePrompt("Building manifest"), name, size, reader)
135-
}),
136-
hooks.WithOnError(func(name string, err error) {
137-
pb.Complete(name, fmt.Sprintf("Failed to build manifest: %v", err))
138-
}),
139-
hooks.WithOnComplete(func(name string, desc ocispec.Descriptor) {
140-
pb.Complete(name, fmt.Sprintf("%s %s", internalpb.NormalizePrompt("Built manifest"), desc.Digest))
141-
}),
142-
))
143-
if err != nil {
137+
if err := retry.Do(func() error {
138+
_, err = builder.BuildManifest(ctx, layers, configDesc, manifestAnnotation(modelfile), hooks.NewHooks(
139+
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
140+
return pb.Add(internalpb.NormalizePrompt("Building manifest"), name, size, reader)
141+
}),
142+
hooks.WithOnError(func(name string, err error) {
143+
pb.Abort(name, fmt.Errorf("failed to build manifest: %w", err))
144+
}),
145+
hooks.WithOnComplete(func(name string, desc ocispec.Descriptor) {
146+
pb.Complete(name, fmt.Sprintf("%s %s", internalpb.NormalizePrompt("Built manifest"), desc.Digest))
147+
}),
148+
))
149+
return err
150+
}, retryOpts...); err != nil {
144151
return fmt.Errorf("failed to build model manifest: %w", err)
145152
}
146153

@@ -170,7 +177,7 @@ func (b *backend) getProcessors(modelfile modelfile.Modelfile) []processor.Proce
170177
}
171178

172179
// process walks the user work directory and process the identified files.
173-
func (b *backend) process(ctx context.Context, builder build.Builder, workDir string, pb *pb.ProgressBar, cfg *config.Build, processors ...processor.Processor) ([]ocispec.Descriptor, error) {
180+
func (b *backend) process(ctx context.Context, builder build.Builder, workDir string, pb *internalpb.ProgressBar, cfg *config.Build, processors ...processor.Processor) ([]ocispec.Descriptor, error) {
174181
descriptors := []ocispec.Descriptor{}
175182
for _, p := range processors {
176183
descs, err := p.Process(ctx, builder, workDir, processor.WithConcurrency(cfg.Concurrency), processor.WithProgressTracker(pb))

pkg/backend/processor/base.go

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/CloudNativeAI/modctl/pkg/storage"
3131

3232
modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"
33+
"github.com/avast/retry-go/v4"
3334
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3435
"golang.org/x/sync/errgroup"
3536
)
@@ -101,27 +102,29 @@ func (b *base) Process(ctx context.Context, builder build.Builder, workDir strin
101102
}
102103

103104
eg.Go(func() error {
104-
desc, err := builder.BuildLayer(ctx, b.mediaType, workDir, path, hooks.NewHooks(
105-
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
106-
return tracker.Add(internalpb.NormalizePrompt("Building layer"), name, size, reader)
107-
}),
108-
hooks.WithOnError(func(name string, err error) {
109-
tracker.Complete(name, fmt.Sprintf("Failed to build layer: %v", err))
110-
}),
111-
hooks.WithOnComplete(func(name string, desc ocispec.Descriptor) {
112-
tracker.Complete(name, fmt.Sprintf("%s %s", internalpb.NormalizePrompt("Built layer"), desc.Digest))
113-
}),
114-
))
115-
if err != nil {
116-
cancel()
117-
return err
118-
}
119-
120-
mu.Lock()
121-
descriptors = append(descriptors, desc)
122-
mu.Unlock()
123-
124-
return nil
105+
return retry.Do(func() error {
106+
desc, err := builder.BuildLayer(ctx, b.mediaType, workDir, path, hooks.NewHooks(
107+
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
108+
return tracker.Add(internalpb.NormalizePrompt("Building layer"), name, size, reader)
109+
}),
110+
hooks.WithOnError(func(name string, err error) {
111+
tracker.Abort(name, fmt.Errorf("failed to build layer: %w", err))
112+
}),
113+
hooks.WithOnComplete(func(name string, desc ocispec.Descriptor) {
114+
tracker.Complete(name, fmt.Sprintf("%s %s", internalpb.NormalizePrompt("Built layer"), desc.Digest))
115+
}),
116+
))
117+
if err != nil {
118+
cancel()
119+
return err
120+
}
121+
122+
mu.Lock()
123+
descriptors = append(descriptors, desc)
124+
mu.Unlock()
125+
126+
return nil
127+
}, retryOpts...)
125128
})
126129
}
127130

pkg/backend/processor/options.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@
1616

1717
package processor
1818

19-
import "github.com/CloudNativeAI/modctl/internal/pb"
19+
import (
20+
"time"
21+
22+
retry "github.com/avast/retry-go/v4"
23+
24+
"github.com/CloudNativeAI/modctl/internal/pb"
25+
)
2026

2127
type ProcessOption func(*processOptions)
2228

@@ -38,3 +44,10 @@ func WithProgressTracker(tracker *pb.ProgressBar) ProcessOption {
3844
o.progressTracker = tracker
3945
}
4046
}
47+
48+
var retryOpts = []retry.Option{
49+
retry.Attempts(3),
50+
retry.DelayType(retry.BackOffDelay),
51+
retry.Delay(1 * time.Second),
52+
retry.MaxDelay(5 * time.Second),
53+
}

pkg/backend/pull.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/CloudNativeAI/modctl/pkg/config"
2828
"github.com/CloudNativeAI/modctl/pkg/storage"
2929

30+
retry "github.com/avast/retry-go/v4"
3031
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3132
"golang.org/x/sync/errgroup"
3233
)
@@ -85,7 +86,7 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err
8586
}
8687

8788
for _, layer := range manifest.Layers {
88-
g.Go(func() error { return fn(layer) })
89+
g.Go(func() error { return retry.Do(func() error { return fn(layer) }, retryOpts...) })
8990
}
9091

9192
if err := g.Wait(); err != nil {
@@ -99,12 +100,16 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err
99100
}
100101

101102
// copy the config.
102-
if err := pullIfNotExist(ctx, pb, internalpb.NormalizePrompt("Pulling config"), src, dst, manifest.Config, repo, tag); err != nil {
103+
if err := retry.Do(func() error {
104+
return pullIfNotExist(ctx, pb, internalpb.NormalizePrompt("Pulling config"), src, dst, manifest.Config, repo, tag)
105+
}, retryOpts...); err != nil {
103106
return fmt.Errorf("failed to pull config to local: %w", err)
104107
}
105108

106109
// copy the manifest.
107-
if err := pullIfNotExist(ctx, pb, internalpb.NormalizePrompt("Pulling manifest"), src, dst, manifestDesc, repo, tag); err != nil {
110+
if err := retry.Do(func() error {
111+
return pullIfNotExist(ctx, pb, internalpb.NormalizePrompt("Pulling manifest"), src, dst, manifestDesc, repo, tag)
112+
}, retryOpts...); err != nil {
108113
return fmt.Errorf("failed to pull manifest to local: %w", err)
109114
}
110115

@@ -138,7 +143,8 @@ func pullIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt stri
138143
// check whether the content exists in the destination storage.
139144
exist, err := dst.StatManifest(ctx, repo, desc.Digest.String())
140145
if err != nil {
141-
pb.Complete(desc.Digest.String(), fmt.Sprintf("Failed to check manifest %s, err: %v", desc.Digest.String(), err))
146+
err = fmt.Errorf("failed to check manifest %s, err: %w", desc.Digest.String(), err)
147+
pb.Abort(desc.Digest.String(), err)
142148
return err
143149
}
144150

@@ -149,18 +155,21 @@ func pullIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt stri
149155

150156
body, err := io.ReadAll(reader)
151157
if err != nil {
152-
pb.Complete(desc.Digest.String(), fmt.Sprintf("Failed to read manifest %s, err: %v", desc.Digest.String(), err))
158+
err = fmt.Errorf("failed to read manifest %s, err: %w", desc.Digest.String(), err)
159+
pb.Abort(desc.Digest.String(), err)
153160
return err
154161
}
155162

156163
if _, err := dst.PushManifest(ctx, repo, tag, body); err != nil {
157-
pb.Complete(desc.Digest.String(), fmt.Sprintf("Failed to store manifest %s, err: %v", desc.Digest.String(), err))
164+
err = fmt.Errorf("failed to store manifest %s, err: %w", desc.Digest.String(), err)
165+
pb.Abort(desc.Digest.String(), err)
158166
return err
159167
}
160168
} else {
161169
exist, err := dst.StatBlob(ctx, repo, desc.Digest.String())
162170
if err != nil {
163-
pb.Complete(desc.Digest.String(), fmt.Sprintf("Failed to check blob %s, err: %v", desc.Digest.String(), err))
171+
err = fmt.Errorf("failed to check blob %s, err: %w", desc.Digest.String(), err)
172+
pb.Abort(desc.Digest.String(), err)
164173
return err
165174
}
166175

@@ -170,7 +179,8 @@ func pullIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt stri
170179
}
171180

172181
if _, _, err := dst.PushBlob(ctx, repo, reader, desc); err != nil {
173-
pb.Complete(desc.Digest.String(), fmt.Sprintf("Failed to store blob %s, err: %v", desc.Digest.String(), err))
182+
err = fmt.Errorf("failed to store blob %s, err: %w", desc.Digest.String(), err)
183+
pb.Abort(desc.Digest.String(), err)
174184
return err
175185
}
176186
}
@@ -190,8 +200,9 @@ func pullAndExtractFromRemote(ctx context.Context, pb *internalpb.ProgressBar, p
190200

191201
reader := pb.Add(prompt, desc.Digest.String(), desc.Size, content)
192202
if err := extractLayer(desc, outputDir, reader); err != nil {
193-
pb.Complete(desc.Digest.String(), fmt.Sprintf("Failed to pull and extract blob %s from remote, err: %v", desc.Digest.String(), err))
194-
return fmt.Errorf("failed to extract the blob %s to output directory: %w", desc.Digest.String(), err)
203+
err = fmt.Errorf("failed to extract the blob %s to output directory: %w", desc.Digest.String(), err)
204+
pb.Abort(desc.Digest.String(), err)
205+
return err
195206
}
196207

197208
return nil

0 commit comments

Comments
 (0)