Skip to content

Commit dff4d49

Browse files
committed
introduce MutateServices as a reusable concurrent service mutator
Signed-off-by: Nicolas De Loof <[email protected]>
1 parent ef843ed commit dff4d49

File tree

2 files changed

+89
-58
lines changed

2 files changed

+89
-58
lines changed

types/project.go

Lines changed: 64 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package types
1818

1919
import (
2020
"bytes"
21+
"context"
2122
"encoding/json"
2223
"fmt"
2324
"os"
@@ -536,65 +537,29 @@ func (p *Project) WithServicesDisabled(names ...string) *Project {
536537
// WithImagesResolved updates services images to include digest computed by a resolver function
537538
// It returns a new Project instance with the changes and keep the original Project unchanged
538539
func (p *Project) WithImagesResolved(resolver func(named reference.Named) (godigest.Digest, error)) (*Project, error) {
539-
servicesWithImage := p.Services.Filter(func(service ServiceConfig) bool {
540-
return service.Image != ""
541-
})
542-
if len(servicesWithImage) == 0 {
543-
return p, nil
544-
}
545-
546-
type result struct {
547-
service string
548-
digest string
549-
}
550-
resultCh := make(chan result)
551-
newProject := p.deepCopy()
552-
553-
eg := errgroup.Group{}
554-
eg.Go(func() error {
555-
expect := len(servicesWithImage)
556-
for ; expect > 0; {
557-
r, ok := <-resultCh
558-
if !ok {
559-
// interrupted
560-
return nil
561-
}
562-
service := newProject.Services[r.service]
563-
service.Image = r.digest
564-
newProject.Services[r.service] = service
565-
expect--
540+
return p.WithServicesTransform(func(name string, service ServiceConfig) (ServiceConfig, error) {
541+
if service.Image == "" {
542+
return service, nil
543+
}
544+
named, err := reference.ParseDockerRef(service.Image)
545+
if err != nil {
546+
return service, err
566547
}
567-
return nil
568-
})
569-
for n, s := range servicesWithImage {
570-
name := n
571-
service := s
572-
eg.Go(func() error {
573-
named, err := reference.ParseDockerRef(service.Image)
574-
if err != nil {
575-
return err
576-
}
577548

578-
if _, ok := named.(reference.Canonical); !ok {
579-
// image is named but not digested reference
580-
digest, err := resolver(named)
581-
if err != nil {
582-
return err
583-
}
584-
named, err = reference.WithDigest(named, digest)
585-
if err != nil {
586-
return err
587-
}
549+
if _, ok := named.(reference.Canonical); !ok {
550+
// image is named but not digested reference
551+
digest, err := resolver(named)
552+
if err != nil {
553+
return service, err
588554
}
589-
590-
resultCh <- result{
591-
service: name,
592-
digest: named.String(),
555+
named, err = reference.WithDigest(named, digest)
556+
if err != nil {
557+
return service, err
593558
}
594-
return nil
595-
})
596-
}
597-
return newProject, eg.Wait()
559+
}
560+
service.Image = named.String()
561+
return service, nil
562+
})
598563
}
599564

600565
// MarshalYAML marshal Project into a yaml tree
@@ -688,3 +653,47 @@ func (p *Project) deepCopy() *Project {
688653
}
689654
return instance.(*Project)
690655
}
656+
657+
// WithServicesTransform applies a transformation to project services and return a new project with transformation results
658+
func (p *Project) WithServicesTransform(fn func(name string, s ServiceConfig) (ServiceConfig, error)) (*Project, error) {
659+
type result struct {
660+
name string
661+
service ServiceConfig
662+
}
663+
resultCh := make(chan result)
664+
newProject := p.deepCopy()
665+
666+
eg, ctx := errgroup.WithContext(context.Background())
667+
eg.Go(func() error {
668+
expect := len(newProject.Services)
669+
s := Services{}
670+
for expect > 0 {
671+
select {
672+
case <-ctx.Done():
673+
// interrupted as some goroutine returned an error
674+
return nil
675+
case r := <-resultCh:
676+
s[r.name] = r.service
677+
expect--
678+
}
679+
}
680+
newProject.Services = s
681+
return nil
682+
})
683+
for n, s := range newProject.Services {
684+
name := n
685+
service := s
686+
eg.Go(func() error {
687+
updated, err := fn(name, service)
688+
if err != nil {
689+
return err
690+
}
691+
resultCh <- result{
692+
name: name,
693+
service: updated,
694+
}
695+
return nil
696+
})
697+
}
698+
return newProject, eg.Wait()
699+
}

types/project_test.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package types
1818

1919
import (
2020
_ "crypto/sha256"
21+
"errors"
2122
"fmt"
2223
"testing"
2324

@@ -208,19 +209,40 @@ func Test_ResolveImages(t *testing.T) {
208209
}
209210

210211
func Test_ResolveImages_concurrent(t *testing.T) {
212+
const garfield = "sha256:1234567890123456789012345678901234567890123456789012345678901234"
211213
resolver := func(named reference.Named) (digest.Digest, error) {
212-
return "sha256:1234567890123456789012345678901234567890123456789012345678901234", nil
214+
return garfield, nil
213215
}
214-
p := Project{
216+
p := &Project{
215217
Services: Services{},
216218
}
217219
for i := 0; i < 1000; i++ {
218220
p.Services[fmt.Sprintf("service_%d", i)] = ServiceConfig{
219221
Image: fmt.Sprintf("image_%d", i),
220222
}
221223
}
222-
_, err := p.WithImagesResolved(resolver)
224+
p, err := p.WithImagesResolved(resolver)
223225
assert.NilError(t, err)
226+
for i := 0; i < 1000; i++ {
227+
assert.Equal(t, p.Services[fmt.Sprintf("service_%d", i)].Image,
228+
fmt.Sprintf("docker.io/library/image_%d:latest@%s", i, garfield))
229+
}
230+
}
231+
232+
func Test_ResolveImages_concurrent_interrupted(t *testing.T) {
233+
resolver := func(named reference.Named) (digest.Digest, error) {
234+
return "", errors.New("something went wrong")
235+
}
236+
p := Project{
237+
Services: Services{},
238+
}
239+
for i := 0; i < 10; i++ {
240+
p.Services[fmt.Sprintf("service_%d", i)] = ServiceConfig{
241+
Image: fmt.Sprintf("image_%d", i),
242+
}
243+
}
244+
_, err := p.WithImagesResolved(resolver)
245+
assert.Error(t, err, "something went wrong")
224246
}
225247

226248
func TestWithServices(t *testing.T) {

0 commit comments

Comments
 (0)