Skip to content

Commit fcb3247

Browse files
authored
feat: support concurrent for build operation(5 concurrency by default) (#98)
1 parent 6d49d3d commit fcb3247

File tree

17 files changed

+213
-47
lines changed

17 files changed

+213
-47
lines changed

cmd/build.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var buildCmd = &cobra.Command{
4949
// init initializes build command.
5050
func init() {
5151
flags := buildCmd.Flags()
52+
flags.IntVarP(&buildConfig.Concurrency, "concurrency", "c", buildConfig.Concurrency, "specify the number of concurrent build operations")
5253
flags.StringVarP(&buildConfig.Target, "target", "t", "", "target model artifact name")
5354
flags.StringVarP(&buildConfig.Modelfile, "modelfile", "f", "Modelfile", "model file path")
5455

@@ -64,7 +65,11 @@ func runBuild(ctx context.Context, workDir string) error {
6465
return err
6566
}
6667

67-
if err := b.Build(ctx, buildConfig.Modelfile, workDir, buildConfig.Target); err != nil {
68+
opts := []backend.Option{
69+
backend.WithConcurrency(buildConfig.Concurrency),
70+
}
71+
72+
if err := b.Build(ctx, buildConfig.Modelfile, workDir, buildConfig.Target, opts...); err != nil {
6873
return err
6974
}
7075

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.23.3
44

55
require (
66
github.com/CloudNativeAI/model-spec v0.0.2
7+
github.com/chelnak/ysmrr v0.6.0
78
github.com/distribution/distribution/v3 v3.0.0-rc.3
89
github.com/distribution/reference v0.6.0
910
github.com/dustin/go-humanize v1.0.1
@@ -28,6 +29,7 @@ require (
2829
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2930
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3031
github.com/docker/go-metrics v0.0.1 // indirect
32+
github.com/fatih/color v1.18.0 // indirect
3133
github.com/fsnotify/fsnotify v1.7.0 // indirect
3234
github.com/go-logr/logr v1.4.2 // indirect
3335
github.com/go-logr/stdr v1.2.2 // indirect
@@ -39,6 +41,8 @@ require (
3941
github.com/inconshreveable/mousetrap v1.1.0 // indirect
4042
github.com/klauspost/compress v1.17.11 // indirect
4143
github.com/magiconair/properties v1.8.7 // indirect
44+
github.com/mattn/go-colorable v0.1.14 // indirect
45+
github.com/mattn/go-isatty v0.0.20 // indirect
4246
github.com/mattn/go-runewidth v0.0.16 // indirect
4347
github.com/mitchellh/mapstructure v1.5.0 // indirect
4448
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect

go.sum

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
1414
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
1515
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
1616
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
17+
github.com/chelnak/ysmrr v0.6.0 h1:kMhO0oI02tl/9szvxrOE0yeImtrK4KQhER0oXu1K/iM=
18+
github.com/chelnak/ysmrr v0.6.0/go.mod h1:56JSrmQgb7/7xoMvuD87h3PE/qW6K1+BQcrgWtVLTUo=
1719
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
1820
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1921
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -29,6 +31,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
2931
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
3032
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
3133
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
34+
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
35+
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
3236
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
3337
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
3438
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
@@ -82,6 +86,10 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
8286
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
8387
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
8488
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
89+
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
90+
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
91+
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
92+
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
8593
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
8694
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
8795
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
@@ -231,6 +239,7 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h
231239
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
232240
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
233241
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
242+
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
234243
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
235244
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
236245
golang.org/x/term v0.29.0 h1:L6pJp37ocefwRRtYPKSWOWzOtWSxVajvz2ldH/xi3iU=

pkg/backend/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type Backend interface {
3131
Logout(ctx context.Context, registry string) error
3232

3333
// Build builds the user materials into the OCI image which follows the Model Spec.
34-
Build(ctx context.Context, modelfilePath, workDir, target string) error
34+
Build(ctx context.Context, modelfilePath, workDir, target string, opts ...Option) error
3535

3636
// Pull pulls an artifact from a registry.
3737
Pull(ctx context.Context, target string, opts ...Option) error

pkg/backend/build.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@ import (
3030
)
3131

3232
// Build builds the user materials into the OCI image which follows the Model Spec.
33-
func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target string) error {
33+
func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target string, opts ...Option) error {
34+
// apply options.
35+
options := &Options{}
36+
for _, opt := range opts {
37+
opt(options)
38+
}
39+
3440
// parse the repo name and tag name from target.
3541
ref, err := ParseReference(target)
3642
if err != nil {
@@ -48,7 +54,7 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri
4854
}
4955

5056
layers := []ocispec.Descriptor{}
51-
layerDescs, err := b.process(ctx, workDir, repo, b.getProcessors(modelfile)...)
57+
layerDescs, err := b.process(ctx, workDir, repo, options, b.getProcessors(modelfile)...)
5258
if err != nil {
5359
return fmt.Errorf("failed to process files: %w", err)
5460
}
@@ -61,15 +67,15 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri
6167
return fmt.Errorf("failed to build image config: %w", err)
6268
}
6369

64-
fmt.Printf("%-15s => %s (%s)\n", "Built config", configDesc.Digest, humanize.IBytes(uint64(configDesc.Size)))
70+
fmt.Printf("%s => %s (%s)\n", "Built config", configDesc.Digest, humanize.IBytes(uint64(configDesc.Size)))
6571

6672
// build the image manifest.
6773
manifestDesc, err := build.BuildManifest(ctx, b.store, repo, tag, layers, configDesc, manifestAnnotation())
6874
if err != nil {
6975
return fmt.Errorf("failed to build image manifest: %w", err)
7076
}
7177

72-
fmt.Printf("%-15s => %s (%s)\n", "Built manifest", manifestDesc.Digest, humanize.IBytes(uint64(manifestDesc.Size)))
78+
fmt.Printf("%s => %s (%s)\n", "Built manifest", manifestDesc.Digest, humanize.IBytes(uint64(manifestDesc.Size)))
7379
return nil
7480
}
7581

@@ -96,10 +102,10 @@ func (b *backend) getProcessors(modelfile modelfile.Modelfile) []processor.Proce
96102
}
97103

98104
// process walks the user work directory and process the identified files.
99-
func (b *backend) process(ctx context.Context, workDir string, repo string, processors ...processor.Processor) ([]ocispec.Descriptor, error) {
105+
func (b *backend) process(ctx context.Context, workDir string, repo string, opts *Options, processors ...processor.Processor) ([]ocispec.Descriptor, error) {
100106
descriptors := []ocispec.Descriptor{}
101107
for _, p := range processors {
102-
descs, err := p.Process(ctx, workDir, repo)
108+
descs, err := p.Process(ctx, workDir, repo, processor.WithConcurrency(opts.concurrency))
103109
if err != nil {
104110
return nil, err
105111
}

pkg/backend/build_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestGetProcessors(t *testing.T) {
3535
processors := b.getProcessors(modelfile)
3636

3737
assert.Len(t, processors, 4)
38-
assert.Equal(t, "model_config", processors[0].Name())
38+
assert.Equal(t, "config", processors[0].Name())
3939
assert.Equal(t, "model", processors[1].Name())
4040
assert.Equal(t, "code", processors[2].Name())
4141
assert.Equal(t, "doc", processors[3].Name())

pkg/backend/processor/base.go

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,21 @@ import (
2121
"fmt"
2222
"path/filepath"
2323
"sort"
24+
"sync"
25+
"sync/atomic"
2426

2527
"github.com/CloudNativeAI/modctl/pkg/backend/build"
2628
"github.com/CloudNativeAI/modctl/pkg/storage"
2729

30+
"github.com/chelnak/ysmrr"
2831
humanize "github.com/dustin/go-humanize"
2932
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
33+
"golang.org/x/sync/errgroup"
3034
)
3135

3236
type base struct {
37+
// name is the name of the processor.
38+
name string
3339
// store is the underlying storage backend.
3440
store storage.Storage
3541
// mediaType is the media type of the processed content.
@@ -39,7 +45,12 @@ type base struct {
3945
}
4046

4147
// Process implements the Processor interface, which can be reused by other processors.
42-
func (b *base) Process(ctx context.Context, workDir, repo string) ([]ocispec.Descriptor, error) {
48+
func (b *base) Process(ctx context.Context, workDir, repo string, opts ...Option) ([]ocispec.Descriptor, error) {
49+
baseOpts := &options{}
50+
for _, opt := range opts {
51+
opt(baseOpts)
52+
}
53+
4354
absWorkDir, err := filepath.Abs(workDir)
4455
if err != nil {
4556
return nil, err
@@ -57,16 +68,55 @@ func (b *base) Process(ctx context.Context, workDir, repo string) ([]ocispec.Des
5768

5869
sort.Strings(matchedPaths)
5970

60-
var descriptors []ocispec.Descriptor
71+
var (
72+
idx atomic.Int64
73+
mu sync.Mutex
74+
eg errgroup.Group
75+
descriptors []ocispec.Descriptor
76+
)
77+
78+
// Set default concurrency limit to 1 if not specified.
79+
if baseOpts.concurrency > 0 {
80+
eg.SetLimit(baseOpts.concurrency)
81+
} else {
82+
eg.SetLimit(1)
83+
}
84+
85+
total := int64(len(matchedPaths))
86+
sm := ysmrr.NewSpinnerManager()
87+
sm.Start()
88+
6189
for _, path := range matchedPaths {
62-
desc, err := build.BuildLayer(ctx, b.store, b.mediaType, workDir, repo, path)
63-
if err != nil {
64-
return nil, err
65-
}
90+
eg.Go(func() error {
91+
relPath, err := filepath.Rel(absWorkDir, path)
92+
if err != nil {
93+
return err
94+
}
6695

67-
fmt.Printf("%-15s => %s (%s)\n", "Built blob", desc.Digest, humanize.IBytes(uint64(desc.Size)))
68-
descriptors = append(descriptors, desc)
96+
blobMsg := fmt.Sprintf("blob [%s %d/%d]", b.name, idx.Add(1), total)
97+
sp := sm.AddSpinner(fmt.Sprintf("Building %s => %s", blobMsg, relPath))
98+
99+
desc, err := build.BuildLayer(ctx, b.store, b.mediaType, workDir, repo, path)
100+
if err != nil {
101+
sp.ErrorWithMessagef("Failed to build blob %s: %v", path, relPath)
102+
return err
103+
}
104+
105+
sp.CompleteWithMessagef("%s => %s (%s)", fmt.Sprintf("Built %s", blobMsg), desc.Digest, humanize.IBytes(uint64(desc.Size)))
106+
107+
mu.Lock()
108+
descriptors = append(descriptors, desc)
109+
mu.Unlock()
110+
111+
return nil
112+
})
69113
}
70114

115+
if err := eg.Wait(); err != nil {
116+
return nil, err
117+
}
118+
119+
sm.Stop()
120+
71121
return descriptors, nil
72122
}

pkg/backend/processor/code.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,15 @@ import (
2424
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2525
)
2626

27+
const (
28+
codeProcessorName = "code"
29+
)
30+
2731
// NewCodeProcessor creates a new code processor.
2832
func NewCodeProcessor(store storage.Storage, mediaType string, patterns []string) Processor {
2933
return &codeProcessor{
3034
base: &base{
35+
name: codeProcessorName,
3136
store: store,
3237
mediaType: mediaType,
3338
patterns: patterns,
@@ -41,9 +46,9 @@ type codeProcessor struct {
4146
}
4247

4348
func (p *codeProcessor) Name() string {
44-
return "code"
49+
return codeProcessorName
4550
}
4651

47-
func (p *codeProcessor) Process(ctx context.Context, workDir, repo string) ([]ocispec.Descriptor, error) {
48-
return p.base.Process(ctx, workDir, repo)
52+
func (p *codeProcessor) Process(ctx context.Context, workDir, repo string, opts ...Option) ([]ocispec.Descriptor, error) {
53+
return p.base.Process(ctx, workDir, repo, opts...)
4954
}

pkg/backend/processor/doc.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,19 @@ import (
2020
"context"
2121

2222
"github.com/CloudNativeAI/modctl/pkg/storage"
23+
2324
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2425
)
2526

27+
const (
28+
docProcessorName = "doc"
29+
)
30+
2631
// NewDocProcessor creates a new doc processor.
2732
func NewDocProcessor(store storage.Storage, mediaType string, patterns []string) Processor {
2833
return &docProcessor{
2934
base: &base{
35+
name: docProcessorName,
3036
store: store,
3137
mediaType: mediaType,
3238
patterns: patterns,
@@ -40,9 +46,9 @@ type docProcessor struct {
4046
}
4147

4248
func (p *docProcessor) Name() string {
43-
return "doc"
49+
return docProcessorName
4450
}
4551

46-
func (p *docProcessor) Process(ctx context.Context, workDir, repo string) ([]ocispec.Descriptor, error) {
47-
return p.base.Process(ctx, workDir, repo)
52+
func (p *docProcessor) Process(ctx context.Context, workDir, repo string, opts ...Option) ([]ocispec.Descriptor, error) {
53+
return p.base.Process(ctx, workDir, repo, opts...)
4854
}

pkg/backend/processor/model.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,15 @@ import (
2424
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2525
)
2626

27+
const (
28+
modelProcessorName = "model"
29+
)
30+
2731
// NewModelProcessor creates a new model processor.
2832
func NewModelProcessor(store storage.Storage, mediaType string, patterns []string) Processor {
2933
return &modelProcessor{
3034
base: &base{
35+
name: modelProcessorName,
3136
store: store,
3237
mediaType: mediaType,
3338
patterns: patterns,
@@ -41,9 +46,9 @@ type modelProcessor struct {
4146
}
4247

4348
func (p *modelProcessor) Name() string {
44-
return "model"
49+
return modelProcessorName
4550
}
4651

47-
func (p *modelProcessor) Process(ctx context.Context, workDir, repo string) ([]ocispec.Descriptor, error) {
48-
return p.base.Process(ctx, workDir, repo)
52+
func (p *modelProcessor) Process(ctx context.Context, workDir, repo string, opts ...Option) ([]ocispec.Descriptor, error) {
53+
return p.base.Process(ctx, workDir, repo, opts...)
4954
}

0 commit comments

Comments
 (0)