Skip to content

Commit df4d2ae

Browse files
committed
solver: add ResolverCache support
New ResolverCache interface in JobContext allows build jobs to memorize and synchronize accesses to mutable remote resources. This is to make sure that when multiple parts of the same build job, or build job and source metadata resolver access the same remote resources, it remains the same for the duration of the single build request, even if data happens to change on the remote side. Fix such a possible case in the HTTP source. Even if the server now returns completely different data, if the same URL was accessed once for the ongoing build, then the initial contents are always used until the build completes. Signed-off-by: Tonis Tiigi <[email protected]>
1 parent bc3666b commit df4d2ae

File tree

8 files changed

+701
-18
lines changed

8 files changed

+701
-18
lines changed

client/client_test.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,8 @@ var allTests = []func(t *testing.T, sb integration.Sandbox){
246246
testHTTPResolveSourceMetadata,
247247
testHTTPPruneAfterCacheKey,
248248
testHTTPPruneAfterResolveMeta,
249+
testHTTPResolveMetaReuse,
250+
testHTTPResolveMultiBuild,
249251
}
250252

251253
func TestIntegration(t *testing.T) {
@@ -12270,6 +12272,147 @@ func testHTTPPruneAfterResolveMeta(t *testing.T, sb integration.Sandbox) {
1227012272
checkAllReleasable(t, c, sb, false)
1227112273
}
1227212274

12275+
func testHTTPResolveMetaReuse(t *testing.T, sb integration.Sandbox) {
12276+
// the difference with testHTTPPruneAfterResolveMeta is that here we change content with the etag on the server
12277+
// but because the URL was already resolved once, the new content should not be seen
12278+
ctx := sb.Context()
12279+
c, err := New(ctx, sb.Address())
12280+
require.NoError(t, err)
12281+
defer c.Close()
12282+
12283+
resp := &httpserver.Response{
12284+
Etag: identity.NewID(),
12285+
Content: []byte("content1"),
12286+
}
12287+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
12288+
"/foo": resp,
12289+
})
12290+
defer server.Close()
12291+
12292+
dest := t.TempDir()
12293+
_, err = c.Build(ctx, SolveOpt{
12294+
Exports: []ExportEntry{
12295+
{
12296+
Type: ExporterLocal,
12297+
OutputDir: dest,
12298+
},
12299+
},
12300+
}, "test", func(ctx context.Context, gc gateway.Client) (*gateway.Result, error) {
12301+
id := server.URL + "/foo"
12302+
md, err := gc.ResolveSourceMetadata(ctx, &pb.SourceOp{
12303+
Identifier: id,
12304+
}, sourceresolver.Opt{})
12305+
if err != nil {
12306+
return nil, err
12307+
}
12308+
require.NotNil(t, md.HTTP)
12309+
12310+
resp.Etag = identity.NewID()
12311+
resp.Content = []byte("content2") // etag changed so new content would be returned if re-resolving
12312+
12313+
st := llb.Scratch().File(llb.Copy(llb.HTTP(id), "foo", "bar"))
12314+
def, err := st.Marshal(sb.Context())
12315+
if err != nil {
12316+
return nil, err
12317+
}
12318+
return gc.Solve(ctx, gateway.SolveRequest{
12319+
Definition: def.ToPB(),
12320+
})
12321+
}, nil)
12322+
require.NoError(t, err)
12323+
12324+
dt, err := os.ReadFile(filepath.Join(dest, "bar"))
12325+
require.NoError(t, err)
12326+
require.Equal(t, "content1", string(dt))
12327+
}
12328+
12329+
// testHTTPResolveMultiBuild is a negative test for testHTTPResolveMetaReuse to ensure that
12330+
// URLs are resolved in between separate builds
12331+
func testHTTPResolveMultiBuild(t *testing.T, sb integration.Sandbox) {
12332+
ctx := sb.Context()
12333+
c, err := New(ctx, sb.Address())
12334+
require.NoError(t, err)
12335+
defer c.Close()
12336+
12337+
resp := &httpserver.Response{
12338+
Etag: identity.NewID(),
12339+
Content: []byte("content1"),
12340+
}
12341+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
12342+
"/foo": resp,
12343+
})
12344+
defer server.Close()
12345+
12346+
dest := t.TempDir()
12347+
_, err = c.Build(ctx, SolveOpt{
12348+
Exports: []ExportEntry{
12349+
{
12350+
Type: ExporterLocal,
12351+
OutputDir: dest,
12352+
},
12353+
},
12354+
}, "test", func(ctx context.Context, gc gateway.Client) (*gateway.Result, error) {
12355+
id := server.URL + "/foo"
12356+
md, err := gc.ResolveSourceMetadata(ctx, &pb.SourceOp{
12357+
Identifier: id,
12358+
}, sourceresolver.Opt{})
12359+
if err != nil {
12360+
return nil, err
12361+
}
12362+
require.NotNil(t, md.HTTP)
12363+
require.Equal(t, digest.FromBytes(resp.Content), md.HTTP.Digest)
12364+
12365+
st := llb.Scratch().File(llb.Copy(llb.HTTP(id), "foo", "bar"))
12366+
def, err := st.Marshal(sb.Context())
12367+
if err != nil {
12368+
return nil, err
12369+
}
12370+
return gc.Solve(ctx, gateway.SolveRequest{
12371+
Definition: def.ToPB(),
12372+
})
12373+
}, nil)
12374+
require.NoError(t, err)
12375+
12376+
dt, err := os.ReadFile(filepath.Join(dest, "bar"))
12377+
require.NoError(t, err)
12378+
require.Equal(t, "content1", string(dt))
12379+
12380+
resp.Etag = identity.NewID()
12381+
resp.Content = []byte("content2")
12382+
12383+
_, err = c.Build(ctx, SolveOpt{
12384+
Exports: []ExportEntry{
12385+
{
12386+
Type: ExporterLocal,
12387+
OutputDir: dest,
12388+
},
12389+
},
12390+
}, "test", func(ctx context.Context, gc gateway.Client) (*gateway.Result, error) {
12391+
id := server.URL + "/foo"
12392+
md, err := gc.ResolveSourceMetadata(ctx, &pb.SourceOp{
12393+
Identifier: id,
12394+
}, sourceresolver.Opt{})
12395+
if err != nil {
12396+
return nil, err
12397+
}
12398+
require.NotNil(t, md.HTTP)
12399+
require.Equal(t, digest.FromBytes(resp.Content), md.HTTP.Digest)
12400+
st := llb.Scratch().File(llb.Copy(llb.HTTP(id), "foo", "bar"))
12401+
def, err := st.Marshal(sb.Context())
12402+
if err != nil {
12403+
return nil, err
12404+
}
12405+
return gc.Solve(ctx, gateway.SolveRequest{
12406+
Definition: def.ToPB(),
12407+
})
12408+
}, nil)
12409+
require.NoError(t, err)
12410+
12411+
dt, err = os.ReadFile(filepath.Join(dest, "bar"))
12412+
require.NoError(t, err)
12413+
require.Equal(t, "content2", string(dt))
12414+
}
12415+
1227312416
func runInDir(dir string, cmds ...string) error {
1227412417
for _, args := range cmds {
1227512418
var cmd *exec.Cmd

solver/jobs.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,21 @@ func (s *state) Cleanup(fn func() error) error {
8585
return nil
8686
}
8787

88+
func (s *state) ResolverCache() ResolverCache {
89+
return s
90+
}
91+
92+
func (s *state) Lock(key any) (values []any, release func(any) error, err error) {
93+
var rcs []ResolverCache
94+
s.mu.Lock()
95+
for j := range s.jobs {
96+
rcs = append(rcs, j.resolverCache)
97+
}
98+
s.mu.Unlock()
99+
100+
return combinedResolverCache(rcs).Lock(key)
101+
}
102+
88103
func (s *state) SessionIterator() session.Iterator {
89104
return s.sessionIterator()
90105
}
@@ -329,6 +344,7 @@ type Job struct {
329344
startedTime time.Time
330345
completedTime time.Time
331346
releasers []func() error
347+
resolverCache *resolverCache
332348

333349
progressCloser func(error)
334350
SessionID string
@@ -645,6 +661,7 @@ func (jl *Solver) NewJob(id string) (*Job, error) {
645661
id: id,
646662
startedTime: time.Now(),
647663
uniqueID: identity.NewID(),
664+
resolverCache: newResolverCache(),
648665
}
649666
jl.jobs[id] = j
650667

@@ -862,6 +879,10 @@ func (j *Job) Cleanup(fn func() error) error {
862879
return nil
863880
}
864881

882+
func (j *Job) ResolverCache() ResolverCache {
883+
return j.resolverCache
884+
}
885+
865886
func (j *Job) SetValue(key string, v any) {
866887
j.values.Store(key, v)
867888
}

solver/llbsolver/ops/exec_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,3 +307,7 @@ func (j *jobCtx) Session() session.Group {
307307
func (j *jobCtx) Cleanup(f func() error) error {
308308
return errors.Errorf("cleanup not implemented for %T", j)
309309
}
310+
311+
func (j *jobCtx) ResolverCache() solver.ResolverCache {
312+
return nil
313+
}

solver/resolvercache.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package solver
2+
3+
import (
4+
"slices"
5+
"sync"
6+
)
7+
8+
type resolverCache struct {
9+
mu sync.Mutex
10+
locks map[any]*entry
11+
}
12+
13+
var _ ResolverCache = &resolverCache{}
14+
15+
type entry struct {
16+
waiting []chan struct{}
17+
values []any
18+
locked bool
19+
}
20+
21+
func newResolverCache() *resolverCache {
22+
return &resolverCache{locks: make(map[any]*entry)}
23+
}
24+
25+
func (r *resolverCache) Lock(key any) (values []any, release func(any) error, err error) {
26+
r.mu.Lock()
27+
e, ok := r.locks[key]
28+
if !ok {
29+
e = &entry{}
30+
r.locks[key] = e
31+
}
32+
if !e.locked {
33+
e.locked = true
34+
values = slices.Clone(e.values)
35+
r.mu.Unlock()
36+
return values, func(v any) error {
37+
r.mu.Lock()
38+
defer r.mu.Unlock()
39+
if v != nil {
40+
e.values = append(e.values, v)
41+
}
42+
for _, ch := range e.waiting {
43+
close(ch)
44+
}
45+
e.waiting = nil
46+
e.locked = false
47+
if len(e.values) == 0 {
48+
delete(r.locks, key)
49+
}
50+
return nil
51+
}, nil
52+
}
53+
54+
ch := make(chan struct{})
55+
e.waiting = append(e.waiting, ch)
56+
r.mu.Unlock()
57+
58+
<-ch // wait for unlock
59+
60+
r.mu.Lock()
61+
defer r.mu.Unlock()
62+
e2, ok := r.locks[key]
63+
if !ok {
64+
return nil, nil, nil // key deleted
65+
}
66+
values = slices.Clone(e2.values)
67+
if e2.locked {
68+
// shouldn't happen, but protect against logic errors
69+
return values, func(any) error { return nil }, nil
70+
}
71+
e2.locked = true
72+
return values, func(v any) error {
73+
r.mu.Lock()
74+
defer r.mu.Unlock()
75+
if v != nil {
76+
e2.values = append(e2.values, v)
77+
}
78+
for _, ch := range e2.waiting {
79+
close(ch)
80+
}
81+
e2.waiting = nil
82+
e2.locked = false
83+
if len(e2.values) == 0 {
84+
delete(r.locks, key)
85+
}
86+
return nil
87+
}, nil
88+
}
89+
90+
// combinedResolverCache returns a ResolverCache that wraps multiple caches.
91+
// Lock() calls each underlying cache in parallel, merges their values, and
92+
// returns a combined release that releases all sublocks.
93+
func combinedResolverCache(rcs []ResolverCache) ResolverCache {
94+
return &combinedCache{rcs: rcs}
95+
}
96+
97+
type combinedCache struct {
98+
rcs []ResolverCache
99+
}
100+
101+
func (c *combinedCache) Lock(key any) (values []any, release func(any) error, err error) {
102+
if len(c.rcs) == 0 {
103+
return nil, func(any) error { return nil }, nil
104+
}
105+
106+
var (
107+
mu sync.Mutex
108+
wg sync.WaitGroup
109+
valuesAll []any
110+
releasers []func(any) error
111+
firstErr error
112+
)
113+
114+
wg.Add(len(c.rcs))
115+
for _, rc := range c.rcs {
116+
go func(rc ResolverCache) {
117+
defer wg.Done()
118+
vals, rel, e := rc.Lock(key)
119+
if e != nil {
120+
mu.Lock()
121+
if firstErr == nil {
122+
firstErr = e
123+
}
124+
mu.Unlock()
125+
return
126+
}
127+
128+
mu.Lock()
129+
valuesAll = append(valuesAll, vals...)
130+
releasers = append(releasers, rel)
131+
mu.Unlock()
132+
}(rc)
133+
}
134+
135+
wg.Wait()
136+
137+
if firstErr != nil {
138+
// rollback all acquired locks
139+
for _, r := range releasers {
140+
_ = r(nil)
141+
}
142+
return nil, nil, firstErr
143+
}
144+
145+
release = func(v any) error {
146+
var errOnce error
147+
for _, r := range releasers {
148+
if e := r(v); e != nil && errOnce == nil {
149+
errOnce = e
150+
}
151+
}
152+
return errOnce
153+
}
154+
155+
return valuesAll, release, nil
156+
}

0 commit comments

Comments
 (0)