Skip to content

Commit 0e1ac96

Browse files
authored
Merge pull request #1 from SamuelTissot/concurrency_fetch
concurrent fetch secrets
2 parents 8ab2f22 + 1191ca4 commit 0e1ac96

File tree

2 files changed

+97
-19
lines changed

2 files changed

+97
-19
lines changed

builder.go

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,30 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"github.com/BurntSushi/toml"
87
"io"
98
"reflect"
109
"strings"
10+
"sync"
11+
"sync/atomic"
12+
13+
"github.com/BurntSushi/toml"
14+
"golang.org/x/sync/errgroup"
1115
)
1216

17+
var builderWorkers atomic.Int32
18+
19+
func init() {
20+
builderWorkers.Store(10)
21+
}
22+
23+
// SetBuilderWorkers set the amount of concurrent request to the secret manager
24+
func SetBuilderWorkers(n int32) {
25+
if n < 1 {
26+
n = 1
27+
}
28+
builderWorkers.Store(n)
29+
}
30+
1331
type Builder struct {
1432
sources []*StaticSource
1533
managers []SecretManager
@@ -59,12 +77,20 @@ func buildFromSources(config any, sources []*StaticSource) error {
5977
}
6078

6179
if err != nil {
62-
return fmt.Errorf("failed to process source: %s, %w", source.name, err)
80+
return fmt.Errorf(
81+
"failed to process source: %s, %w",
82+
source.name,
83+
err,
84+
)
6385
}
6486

6587
err = source.reader.Close()
6688
if err != nil {
67-
return fmt.Errorf("failed to close source: %s, %s", source.name, err)
89+
return fmt.Errorf(
90+
"failed to close source: %s, %s",
91+
source.name,
92+
err,
93+
)
6894
}
6995
}
7096

@@ -80,26 +106,66 @@ func parseJSON(config any, r io.Reader) error {
80106
return json.NewDecoder(r).Decode(config)
81107
}
82108

83-
func resolveSecrets(ctx context.Context, config any, managers []SecretManager) error {
109+
type replacement struct {
110+
old string
111+
new string
112+
}
113+
114+
func resolveSecrets(
115+
ctx context.Context,
116+
config any,
117+
managers []SecretManager,
118+
) error {
119+
g, ctx := errgroup.WithContext(ctx)
120+
g.SetLimit(int(builderWorkers.Load()))
121+
122+
var mu sync.Mutex
123+
84124
strConfig, err := configToJSON(config)
85125
if err != nil {
86126
return err
87127
}
88128

89129
subs := findSubstitutions(strConfig)
130+
replacements := make([]replacement, 0, len(subs))
90131
for _, sub := range subs {
132+
sub := sub
91133
manager, err := getManagerForPrefix(sub.managerPrefix, managers)
92134
if err != nil {
93135
return err
94136
}
95137

96-
secret, err := manager.Secret(ctx, sub.managerKey)
97-
if err != nil {
98-
return err
99-
}
138+
g.Go(func() error {
139+
select {
140+
case <-ctx.Done():
141+
return nil
142+
default:
143+
}
144+
145+
secret, err := manager.Secret(ctx, sub.managerKey)
146+
if err != nil {
147+
return err
148+
}
149+
150+
escapedSecret := escape(secret)
151+
152+
mu.Lock()
153+
replacements = append(
154+
replacements,
155+
replacement{old: sub.value, new: escapedSecret},
156+
)
157+
mu.Unlock()
158+
return nil
159+
})
160+
}
161+
162+
if err := g.Wait(); err != nil {
163+
return err
164+
}
100165

101-
escapedSecret := escape(secret)
102-
strConfig = strings.Replace(strConfig, sub.value, escapedSecret, 1)
166+
// actually do the replacement
167+
for _, r := range replacements {
168+
strConfig = strings.Replace(strConfig, r.old, r.new, 1)
103169
}
104170

105171
return parseJSON(config, strings.NewReader(strConfig))
@@ -151,7 +217,10 @@ func findSubstitutions(str string) []substitutions {
151217
return out
152218
}
153219

154-
func getManagerForPrefix(prefix string, managers []SecretManager) (SecretManager, error) {
220+
func getManagerForPrefix(
221+
prefix string,
222+
managers []SecretManager,
223+
) (SecretManager, error) {
155224
for _, manager := range managers {
156225
if manager.Prefix() == prefix {
157226
return manager, nil

builder_test.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package flowconf_test
22

33
import (
4-
"context"
4+
"testing"
5+
"time"
6+
57
"github.com/SamuelTissot/flowconf"
68
"github.com/SamuelTissot/flowconf/test"
79
"github.com/stretchr/testify/assert"
8-
"testing"
9-
"time"
10+
"github.com/stretchr/testify/mock"
1011
)
1112

1213
func TestBuilder_Build_fromStaticSource(t *testing.T) {
@@ -20,7 +21,10 @@ func TestBuilder_Build_fromStaticSource(t *testing.T) {
2021
{
2122
name: "one source toml",
2223
sources: func() []*flowconf.StaticSource {
23-
sources, err := flowconf.NewSourcesFromEmbeddedFileSystem(test.FileSystem, "data/config.toml")
24+
sources, err := flowconf.NewSourcesFromEmbeddedFileSystem(
25+
test.FileSystem,
26+
"data/config.toml",
27+
)
2428
if err != nil {
2529
t.Fatalf("failed to load sources from embedded filesystem")
2630
}
@@ -43,7 +47,10 @@ func TestBuilder_Build_fromStaticSource(t *testing.T) {
4347
{
4448
name: "one source json",
4549
sources: func() []*flowconf.StaticSource {
46-
sources, err := flowconf.NewSourcesFromEmbeddedFileSystem(test.FileSystem, "data/config.json")
50+
sources, err := flowconf.NewSourcesFromEmbeddedFileSystem(
51+
test.FileSystem,
52+
"data/config.json",
53+
)
4754
if err != nil {
4855
t.Fatalf("failed to load sources from embedded filesystem")
4956
}
@@ -132,7 +139,6 @@ func TestBuilder_Build_fromStaticSource(t *testing.T) {
132139
func TestBuilder_Build_fetchesSecretsFromSecretsManagers(t *testing.T) {
133140
// /////////////////////// GIVEN ///////////////////////
134141
var (
135-
ctx = context.Background()
136142
configSourceFile = "data/config.toml"
137143
prefix = "managerprefix" // this prefix needs to be the same as in the config file [[test/data/config.toml]]
138144
secretKey = "projects/id/secrets/name-of-secret" // needs to be the same as in [[test/data/config.toml]]
@@ -152,12 +158,15 @@ func TestBuilder_Build_fetchesSecretsFromSecretsManagers(t *testing.T) {
152158
)
153159

154160
// setup sources
155-
sources, err := flowconf.NewSourcesFromEmbeddedFileSystem(test.FileSystem, configSourceFile)
161+
sources, err := flowconf.NewSourcesFromEmbeddedFileSystem(
162+
test.FileSystem,
163+
configSourceFile,
164+
)
156165
assert.NoError(t, err)
157166

158167
// setup mock
159168
managerMock.On("Prefix").Return(prefix).Once()
160-
managerMock.On("Secret", ctx, secretKey).Return(secret, nil)
169+
managerMock.On("Secret", mock.Anything, secretKey).Return(secret, nil)
161170

162171
// setup builder
163172
builder := flowconf.NewBuilder(sources...)

0 commit comments

Comments
 (0)