Skip to content

Commit b357fd6

Browse files
authored
feat: add concurrency support for pull and push operations (#45)
Signed-off-by: chlins <[email protected]>
1 parent 44c0c19 commit b357fd6

File tree

8 files changed

+93
-33
lines changed

8 files changed

+93
-33
lines changed

cmd/login.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,8 @@ func runLogin(ctx context.Context, registry string) error {
8989

9090
fmt.Println("\nLogging In...")
9191

92-
opts := []backend.Option{}
93-
if loginConfig.PlainHTTP {
94-
opts = append(opts, backend.WithPlainHTTP())
92+
opts := []backend.Option{
93+
backend.WithPlainHTTP(loginConfig.PlainHTTP),
9594
}
9695

9796
if err := b.Login(ctx, registry, loginConfig.Username, loginConfig.Password, opts...); err != nil {

cmd/pull.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,18 @@ var pullCmd = &cobra.Command{
3838
SilenceUsage: true,
3939
FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: true},
4040
RunE: func(cmd *cobra.Command, args []string) error {
41+
if err := pullConfig.Validate(); err != nil {
42+
return err
43+
}
44+
4145
return runPull(context.Background(), args[0])
4246
},
4347
}
4448

4549
// init initializes pull command.
4650
func init() {
4751
flags := pullCmd.Flags()
52+
flags.IntVar(&pullConfig.Concurrency, "concurrency", pullConfig.Concurrency, "specify the number of concurrent pull operations (default: 3)")
4853
flags.BoolVar(&pullConfig.PlainHTTP, "plain-http", false, "use plain HTTP instead of HTTPS")
4954
flags.BoolVar(&pullConfig.Insecure, "insecure", false, "use insecure connection for the pull operation and skip TLS verification")
5055
flags.StringVar(&pullConfig.Proxy, "proxy", "", "use proxy for the pull operation")
@@ -66,9 +71,10 @@ func runPull(ctx context.Context, target string) error {
6671
return fmt.Errorf("target is required")
6772
}
6873

69-
opts := []backend.Option{backend.WithInsecure(pullConfig.Insecure)}
70-
if pullConfig.PlainHTTP {
71-
opts = append(opts, backend.WithPlainHTTP())
74+
opts := []backend.Option{
75+
backend.WithInsecure(pullConfig.Insecure),
76+
backend.WithPlainHTTP(pullConfig.PlainHTTP),
77+
backend.WithConcurrency(pullConfig.Concurrency),
7278
}
7379

7480
if pullConfig.Proxy != "" {

cmd/push.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,18 @@ var pushCmd = &cobra.Command{
3838
SilenceUsage: true,
3939
FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: true},
4040
RunE: func(cmd *cobra.Command, args []string) error {
41+
if err := pushConfig.Validate(); err != nil {
42+
return err
43+
}
44+
4145
return runPush(context.Background(), args[0])
4246
},
4347
}
4448

4549
// init initializes push command.
4650
func init() {
4751
flags := pushCmd.Flags()
52+
flags.IntVar(&pushConfig.Concurrency, "concurrency", pushConfig.Concurrency, "specify the number of concurrent push operations (default: 3)")
4853
flags.BoolVar(&pushConfig.PlainHTTP, "plain-http", false, "use plain HTTP instead of HTTPS")
4954

5055
if err := viper.BindPFlags(flags); err != nil {
@@ -59,9 +64,9 @@ func runPush(ctx context.Context, target string) error {
5964
return err
6065
}
6166

62-
opts := []backend.Option{}
63-
if pushConfig.PlainHTTP {
64-
opts = append(opts, backend.WithPlainHTTP())
67+
opts := []backend.Option{
68+
backend.WithPlainHTTP(pushConfig.PlainHTTP),
69+
backend.WithConcurrency(pushConfig.Concurrency),
6570
}
6671

6772
if err := b.Push(ctx, target, opts...); err != nil {

pkg/backend/options.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,24 @@ package backend
1919
type Option func(*Options)
2020

2121
type Options struct {
22-
plainHTTP bool
23-
proxy string
24-
insecure bool
25-
output string
22+
concurrency int
23+
plainHTTP bool
24+
proxy string
25+
insecure bool
26+
output string
27+
}
28+
29+
// WithConcurrency sets the concurrency option.
30+
func WithConcurrency(concurrency int) Option {
31+
return func(opts *Options) {
32+
opts.concurrency = concurrency
33+
}
2634
}
2735

2836
// WithPlainHTTP sets the plain HTTP option.
29-
func WithPlainHTTP() Option {
37+
func WithPlainHTTP(plainHTTP bool) Option {
3038
return func(opts *Options) {
31-
opts.plainHTTP = true
39+
opts.plainHTTP = plainHTTP
3240
}
3341
}
3442

pkg/backend/pull.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/CloudNativeAI/modctl/pkg/storage"
2929

3030
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
31+
"golang.org/x/sync/errgroup"
3132
"oras.land/oras-go/v2/registry/remote"
3233
"oras.land/oras-go/v2/registry/remote/auth"
3334
"oras.land/oras-go/v2/registry/remote/credentials"
@@ -110,12 +111,15 @@ func (b *backend) Pull(ctx context.Context, target string, opts ...Option) error
110111
// note: the order is important, manifest should be pushed at last.
111112

112113
// copy the layers.
113-
// TODO: parallelize the layer copy.
114114
dst := b.store
115+
g := &errgroup.Group{}
116+
g.SetLimit(options.concurrency)
115117
for _, layer := range manifest.Layers {
116-
if err := pullIfNotExist(ctx, pb, promptCopyingBlob, src, dst, layer, repo, tag); err != nil {
117-
return fmt.Errorf("failed to pull blob to local: %w", err)
118-
}
118+
g.Go(func() error { return pullIfNotExist(ctx, pb, promptCopyingBlob, src, dst, layer, repo, tag) })
119+
}
120+
121+
if err := g.Wait(); err != nil {
122+
return fmt.Errorf("failed to pull blob to local: %w", err)
119123
}
120124

121125
// copy the config.

pkg/backend/push.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
godigest "github.com/opencontainers/go-digest"
2828
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
29+
"golang.org/x/sync/errgroup"
2930
"oras.land/oras-go/v2/registry/remote"
3031
"oras.land/oras-go/v2/registry/remote/auth"
3132
"oras.land/oras-go/v2/registry/remote/credentials"
@@ -91,11 +92,14 @@ func (b *backend) Push(ctx context.Context, target string, opts ...Option) error
9192
// note: the order is important, manifest should be pushed at last.
9293

9394
// copy the layers.
94-
// TODO: parallelize the layer copy.
95+
g := &errgroup.Group{}
96+
g.SetLimit(options.concurrency)
9597
for _, layer := range manifest.Layers {
96-
if err := pushIfNotExist(ctx, pb, promptCopyingBlob, src, dst, layer, repo, tag); err != nil {
97-
return fmt.Errorf("failed to push blob to remote: %w", err)
98-
}
98+
g.Go(func() error { return pushIfNotExist(ctx, pb, promptCopyingBlob, src, dst, layer, repo, tag) })
99+
}
100+
101+
if err := g.Wait(); err != nil {
102+
return fmt.Errorf("failed to push blob to remote: %w", err)
99103
}
100104

101105
// copy the config.

pkg/config/pull.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,35 @@
1616

1717
package config
1818

19+
import "fmt"
20+
21+
const (
22+
// defaultPullConcurrency is the default number of concurrent pull operations.
23+
defaultPullConcurrency = 3
24+
)
25+
1926
type Pull struct {
20-
PlainHTTP bool
21-
Proxy string
22-
Insecure bool
23-
ExtractDir string
27+
Concurrency int
28+
PlainHTTP bool
29+
Proxy string
30+
Insecure bool
31+
ExtractDir string
2432
}
2533

2634
func NewPull() *Pull {
2735
return &Pull{
28-
PlainHTTP: false,
29-
Proxy: "",
30-
Insecure: false,
31-
ExtractDir: "",
36+
Concurrency: defaultPullConcurrency,
37+
PlainHTTP: false,
38+
Proxy: "",
39+
Insecure: false,
40+
ExtractDir: "",
41+
}
42+
}
43+
44+
func (p *Pull) Validate() error {
45+
if p.Concurrency < 1 {
46+
return fmt.Errorf("invalid concurrency: %d", p.Concurrency)
3247
}
48+
49+
return nil
3350
}

pkg/config/push.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,29 @@
1616

1717
package config
1818

19+
import "fmt"
20+
21+
const (
22+
// defaultPushConcurrency is the default number of concurrent push operations.
23+
defaultPushConcurrency = 3
24+
)
25+
1926
type Push struct {
20-
PlainHTTP bool
27+
Concurrency int
28+
PlainHTTP bool
2129
}
2230

2331
func NewPush() *Pull {
2432
return &Pull{
25-
PlainHTTP: false,
33+
Concurrency: defaultPushConcurrency,
34+
PlainHTTP: false,
35+
}
36+
}
37+
38+
func (p *Push) Validate() error {
39+
if p.Concurrency < 1 {
40+
return fmt.Errorf("invalid concurrency: %d", p.Concurrency)
2641
}
42+
43+
return nil
2744
}

0 commit comments

Comments
 (0)