Skip to content

Commit 01d7739

Browse files
authored
Merge pull request moby#5022 from tonistiigi/flightcontrol-cachedgroup
flightcontrol: add cachedgroup struct
2 parents 5f130fa + 9f66e2a commit 01d7739

File tree

5 files changed

+186
-59
lines changed

5 files changed

+186
-59
lines changed

client/llb/async.go

Lines changed: 16 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,76 +6,46 @@ import (
66
"github.com/moby/buildkit/solver/pb"
77
"github.com/moby/buildkit/util/flightcontrol"
88
digest "github.com/opencontainers/go-digest"
9-
"github.com/pkg/errors"
109
)
1110

1211
type asyncState struct {
13-
f func(context.Context, State, *Constraints) (State, error)
14-
prev State
15-
target State
16-
set bool
17-
err error
18-
g flightcontrol.Group[State]
12+
f func(context.Context, State, *Constraints) (State, error)
13+
prev State
14+
g flightcontrol.CachedGroup[State]
1915
}
2016

2117
func (as *asyncState) Output() Output {
2218
return as
2319
}
2420

2521
func (as *asyncState) Vertex(ctx context.Context, c *Constraints) Vertex {
26-
err := as.Do(ctx, c)
22+
target, err := as.Do(ctx, c)
2723
if err != nil {
2824
return &errVertex{err}
2925
}
30-
if as.set {
31-
out := as.target.Output()
32-
if out == nil {
33-
return nil
34-
}
35-
return out.Vertex(ctx, c)
26+
out := target.Output()
27+
if out == nil {
28+
return nil
3629
}
37-
return nil
30+
return out.Vertex(ctx, c)
3831
}
3932

4033
func (as *asyncState) ToInput(ctx context.Context, c *Constraints) (*pb.Input, error) {
41-
err := as.Do(ctx, c)
34+
target, err := as.Do(ctx, c)
4235
if err != nil {
4336
return nil, err
4437
}
45-
if as.set {
46-
out := as.target.Output()
47-
if out == nil {
48-
return nil, nil
49-
}
50-
return out.ToInput(ctx, c)
38+
out := target.Output()
39+
if out == nil {
40+
return nil, nil
5141
}
52-
return nil, nil
42+
return out.ToInput(ctx, c)
5343
}
5444

55-
func (as *asyncState) Do(ctx context.Context, c *Constraints) error {
56-
_, err := as.g.Do(ctx, "", func(ctx context.Context) (State, error) {
57-
if as.set {
58-
return as.target, as.err
59-
}
60-
res, err := as.f(ctx, as.prev, c)
61-
if err != nil {
62-
select {
63-
case <-ctx.Done():
64-
if errors.Is(err, context.Cause(ctx)) {
65-
return res, err
66-
}
67-
default:
68-
}
69-
}
70-
as.target = res
71-
as.err = err
72-
as.set = true
73-
return res, err
45+
func (as *asyncState) Do(ctx context.Context, c *Constraints) (State, error) {
46+
return as.g.Do(ctx, "", func(ctx context.Context) (State, error) {
47+
return as.f(ctx, as.prev, c)
7448
})
75-
if err != nil {
76-
return err
77-
}
78-
return as.err
7949
}
8050

8151
type errVertex struct {

client/llb/state.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,11 @@ func (s State) getValue(k interface{}) func(context.Context, *Constraints) (inte
105105
}
106106
if s.async != nil {
107107
return func(ctx context.Context, c *Constraints) (interface{}, error) {
108-
err := s.async.Do(ctx, c)
108+
target, err := s.async.Do(ctx, c)
109109
if err != nil {
110110
return nil, err
111111
}
112-
return s.async.target.getValue(k)(ctx, c)
112+
return target.getValue(k)(ctx, c)
113113
}
114114
}
115115
if s.prev == nil {
@@ -119,8 +119,13 @@ func (s State) getValue(k interface{}) func(context.Context, *Constraints) (inte
119119
}
120120

121121
func (s State) Async(f func(context.Context, State, *Constraints) (State, error)) State {
122+
as := &asyncState{
123+
f: f,
124+
prev: s,
125+
}
126+
as.g.CacheError = true
122127
s2 := State{
123-
async: &asyncState{f: f, prev: s},
128+
async: as,
124129
}
125130
return s2
126131
}

frontend/dockerui/config.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ type Client struct {
7878
Config
7979
client client.Client
8080
ignoreCache []string
81-
bctx *buildContext
82-
g flightcontrol.Group[*buildContext]
81+
g flightcontrol.CachedGroup[*buildContext]
8382
bopts client.BuildOpts
8483

8584
dockerignore []byte
@@ -288,14 +287,7 @@ func (bc *Client) init() error {
288287

289288
func (bc *Client) buildContext(ctx context.Context) (*buildContext, error) {
290289
return bc.g.Do(ctx, "initcontext", func(ctx context.Context) (*buildContext, error) {
291-
if bc.bctx != nil {
292-
return bc.bctx, nil
293-
}
294-
bctx, err := bc.initContext(ctx)
295-
if err == nil {
296-
bc.bctx = bctx
297-
}
298-
return bctx, err
290+
return bc.initContext(ctx)
299291
})
300292
}
301293

util/flightcontrol/cached.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package flightcontrol
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/pkg/errors"
8+
)
9+
10+
// Group is a flightcontrol synchronization group that memoizes the results of a function
11+
// and returns the cached result if the function is called with the same key.
12+
// Don't use with long-running groups as the results are cached indefinitely.
13+
type CachedGroup[T any] struct {
14+
// CacheError defines if error results should also be cached.
15+
// It is not safe to change this value after the first call to Do.
16+
// Context cancellation errors are never cached.
17+
CacheError bool
18+
g Group[T]
19+
mu sync.Mutex
20+
cache map[string]result[T]
21+
}
22+
23+
type result[T any] struct {
24+
v T
25+
err error
26+
}
27+
28+
// Do executes a context function syncronized by the key or returns the cached result for the key.
29+
func (g *CachedGroup[T]) Do(ctx context.Context, key string, fn func(ctx context.Context) (T, error)) (T, error) {
30+
return g.g.Do(ctx, key, func(ctx context.Context) (T, error) {
31+
g.mu.Lock()
32+
if v, ok := g.cache[key]; ok {
33+
g.mu.Unlock()
34+
if v.err != nil {
35+
if g.CacheError {
36+
return v.v, v.err
37+
}
38+
} else {
39+
return v.v, nil
40+
}
41+
}
42+
g.mu.Unlock()
43+
v, err := fn(ctx)
44+
if err != nil {
45+
select {
46+
case <-ctx.Done():
47+
if errors.Is(err, context.Cause(ctx)) {
48+
return v, err
49+
}
50+
default:
51+
}
52+
}
53+
if err == nil || g.CacheError {
54+
g.mu.Lock()
55+
if g.cache == nil {
56+
g.cache = make(map[string]result[T])
57+
}
58+
g.cache[key] = result[T]{v: v, err: err}
59+
g.mu.Unlock()
60+
}
61+
return v, err
62+
})
63+
}

util/flightcontrol/cached_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package flightcontrol
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/pkg/errors"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestCached(t *testing.T) {
13+
var g CachedGroup[int]
14+
15+
ctx := context.TODO()
16+
17+
v, err := g.Do(ctx, "11", func(ctx context.Context) (int, error) {
18+
return 1, nil
19+
})
20+
require.NoError(t, err)
21+
require.Equal(t, 1, v)
22+
23+
v, err = g.Do(ctx, "22", func(ctx context.Context) (int, error) {
24+
return 2, nil
25+
})
26+
require.NoError(t, err)
27+
require.Equal(t, 2, v)
28+
29+
didCall := false
30+
v, err = g.Do(ctx, "11", func(ctx context.Context) (int, error) {
31+
didCall = true
32+
return 3, nil
33+
})
34+
require.NoError(t, err)
35+
require.Equal(t, 1, v)
36+
require.Equal(t, false, didCall)
37+
38+
// by default, errors are not cached
39+
_, err = g.Do(ctx, "33", func(ctx context.Context) (int, error) {
40+
return 0, errors.Errorf("some error")
41+
})
42+
43+
require.Error(t, err)
44+
require.ErrorContains(t, err, "some error")
45+
46+
v, err = g.Do(ctx, "33", func(ctx context.Context) (int, error) {
47+
return 3, nil
48+
})
49+
50+
require.NoError(t, err)
51+
require.Equal(t, 3, v)
52+
}
53+
54+
func TestCachedError(t *testing.T) {
55+
var g CachedGroup[string]
56+
g.CacheError = true
57+
58+
ctx := context.TODO()
59+
60+
_, err := g.Do(ctx, "11", func(ctx context.Context) (string, error) {
61+
return "", errors.Errorf("first error")
62+
})
63+
require.Error(t, err)
64+
require.ErrorContains(t, err, "first error")
65+
66+
_, err = g.Do(ctx, "11", func(ctx context.Context) (string, error) {
67+
return "never-ran", nil
68+
})
69+
require.Error(t, err)
70+
require.ErrorContains(t, err, "first error")
71+
72+
// context errors are never cached
73+
ctx, cancel := context.WithTimeoutCause(context.TODO(), 10*time.Millisecond, nil)
74+
defer cancel()
75+
_, err = g.Do(ctx, "22", func(ctx context.Context) (string, error) {
76+
select {
77+
case <-ctx.Done():
78+
return "", context.Cause(ctx)
79+
case <-time.After(10 * time.Second):
80+
return "", errors.Errorf("unexpected error")
81+
}
82+
})
83+
require.Error(t, err)
84+
require.ErrorContains(t, err, "context deadline exceeded")
85+
86+
select {
87+
case <-ctx.Done():
88+
default:
89+
require.Fail(t, "expected context to be done")
90+
}
91+
92+
v, err := g.Do(ctx, "22", func(ctx context.Context) (string, error) {
93+
return "did-run", nil
94+
})
95+
require.NoError(t, err)
96+
require.Equal(t, "did-run", v)
97+
}

0 commit comments

Comments
 (0)