Skip to content

Commit da0754b

Browse files
committed
feat: support the concurrency for extract
Signed-off-by: chlins <[email protected]>
1 parent cc76147 commit da0754b

File tree

12 files changed

+85
-53
lines changed

12 files changed

+85
-53
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ LOCALBIN ?= $(shell pwd)/bin
9898
$(LOCALBIN):
9999
mkdir -p $(LOCALBIN)
100100

101-
MOCKERY_VERSION=v2.52.1
101+
MOCKERY_VERSION=v2.53.3
102102

103103
.PHONY: gen
104104
gen: gen-mockery## Generate all we need!

cmd/extract.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ var extractCmd = &cobra.Command{
3838
SilenceUsage: true,
3939
FParseErrWhitelist: cobra.FParseErrWhitelist{UnknownFlags: true},
4040
RunE: func(cmd *cobra.Command, args []string) error {
41+
if err := extractConfig.Validate(); err != nil {
42+
return err
43+
}
44+
4145
return runExtract(context.Background(), args[0])
4246
},
4347
}
@@ -46,6 +50,7 @@ var extractCmd = &cobra.Command{
4650
func init() {
4751
flags := extractCmd.Flags()
4852
flags.StringVar(&extractConfig.Output, "output", "", "specify the output for extracting the model artifact")
53+
flags.IntVar(&extractConfig.Concurrency, "concurrency", extractConfig.Concurrency, "specify the concurrency for extracting the model artifact")
4954

5055
if err := viper.BindPFlags(flags); err != nil {
5156
panic(fmt.Errorf("bind cache extract flags to viper: %w", err))
@@ -63,11 +68,7 @@ func runExtract(ctx context.Context, target string) error {
6368
return fmt.Errorf("target is required")
6469
}
6570

66-
if extractConfig.Output == "" {
67-
return fmt.Errorf("output is required")
68-
}
69-
70-
if err := b.Extract(ctx, target, extractConfig.Output); err != nil {
71+
if err := b.Extract(ctx, target, extractConfig); err != nil {
7172
return err
7273
}
7374

go.mod

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
module github.com/CloudNativeAI/modctl
22

3-
go 1.23.3
4-
toolchain go1.24.1
3+
go 1.24.1
54

65
require (
76
github.com/CloudNativeAI/model-spec v0.0.3

pkg/backend/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type Backend interface {
5353
Inspect(ctx context.Context, target string) (*InspectedModelArtifact, error)
5454

5555
// Extract extracts the model artifact.
56-
Extract(ctx context.Context, target string, output string) error
56+
Extract(ctx context.Context, target string, cfg *config.Extract) error
5757

5858
// Tag creates a new tag that refers to the source model artifact.
5959
Tag(ctx context.Context, source, target string) error

pkg/backend/extract.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import (
2424
"io"
2525

2626
"github.com/CloudNativeAI/modctl/pkg/codec"
27+
"github.com/CloudNativeAI/modctl/pkg/config"
2728
"github.com/CloudNativeAI/modctl/pkg/storage"
2829
modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"
30+
"golang.org/x/sync/errgroup"
2931

3032
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3133
)
@@ -36,7 +38,7 @@ const (
3638
)
3739

3840
// Extract extracts the model artifact.
39-
func (b *backend) Extract(ctx context.Context, target string, output string) error {
41+
func (b *backend) Extract(ctx context.Context, target string, cfg *config.Extract) error {
4042
// parse the repository and tag from the target.
4143
ref, err := ParseReference(target)
4244
if err != nil {
@@ -55,26 +57,33 @@ func (b *backend) Extract(ctx context.Context, target string, output string) err
5557
return fmt.Errorf("failed to unmarshal the manifest: %w", err)
5658
}
5759

58-
return exportModelArtifact(ctx, b.store, manifest, repo, output)
60+
return exportModelArtifact(ctx, b.store, manifest, repo, cfg)
5961
}
6062

6163
// exportModelArtifact exports the target model artifact to the output directory, which will open the artifact and extract to restore the original repo structure.
62-
func exportModelArtifact(ctx context.Context, store storage.Storage, manifest ocispec.Manifest, repo, outputDir string) error {
64+
func exportModelArtifact(ctx context.Context, store storage.Storage, manifest ocispec.Manifest, repo string, cfg *config.Extract) error {
65+
g := &errgroup.Group{}
66+
g.SetLimit(cfg.Concurrency)
67+
6368
for _, layer := range manifest.Layers {
64-
// pull the blob from the storage.
65-
reader, err := store.PullBlob(ctx, repo, layer.Digest.String())
66-
if err != nil {
67-
return fmt.Errorf("failed to pull the blob from storage: %w", err)
68-
}
69-
defer reader.Close()
70-
71-
bufferedReader := bufio.NewReaderSize(reader, defaultBufferSize)
72-
if err := extractLayer(layer, outputDir, bufferedReader); err != nil {
73-
return fmt.Errorf("failed to extract layer %s: %w", layer.Digest.String(), err)
74-
}
69+
g.Go(func() error {
70+
// pull the blob from the storage.
71+
reader, err := store.PullBlob(ctx, repo, layer.Digest.String())
72+
if err != nil {
73+
return fmt.Errorf("failed to pull the blob from storage: %w", err)
74+
}
75+
defer reader.Close()
76+
77+
bufferedReader := bufio.NewReaderSize(reader, defaultBufferSize)
78+
if err := extractLayer(layer, cfg.Output, bufferedReader); err != nil {
79+
return fmt.Errorf("failed to extract layer %s: %w", layer.Digest.String(), err)
80+
}
81+
82+
return nil
83+
})
7584
}
7685

77-
return nil
86+
return g.Wait()
7887
}
7988

8089
// extractLayer extracts the layer to the output directory.

pkg/backend/pull.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,9 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err
150150

151151
// export the target model artifact to the output directory if needed.
152152
if cfg.ExtractDir != "" {
153-
if err := exportModelArtifact(ctx, dst, manifest, repo, cfg.ExtractDir); err != nil {
153+
// set the concurrency to 1 because the pull already has concurrency control.
154+
extractCfg := &config.Extract{Concurrency: 1, Output: cfg.ExtractDir}
155+
if err := exportModelArtifact(ctx, dst, manifest, repo, extractCfg); err != nil {
154156
return fmt.Errorf("failed to export the artifact to the output directory: %w", err)
155157
}
156158
}

pkg/config/extract.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,33 @@
1616

1717
package config
1818

19+
import "fmt"
20+
21+
const (
22+
// defaultExtractConcurrency is the default number of concurrent extracts.
23+
defaultExtractConcurrency = 5
24+
)
25+
1926
type Extract struct {
20-
Output string
27+
Output string
28+
Concurrency int
2129
}
2230

2331
func NewExtract() *Extract {
2432
return &Extract{
25-
Output: "",
33+
Output: "",
34+
Concurrency: defaultExtractConcurrency,
35+
}
36+
}
37+
38+
func (e *Extract) Validate() error {
39+
if e.Concurrency <= 0 {
40+
return fmt.Errorf("concurrency must be greater than 0")
41+
}
42+
43+
if e.Output == "" {
44+
return fmt.Errorf("output is required")
2645
}
46+
47+
return nil
2748
}

test/mocks/backend/backend.go

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test/mocks/backend/build/builder.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test/mocks/backend/build/output_strategy.go

Lines changed: 11 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)