Skip to content

Commit 80766cd

Browse files
committed
http: fix release race between cache and snapshot
Signed-off-by: Tonis Tiigi <[email protected]>
1 parent 7bb9231 commit 80766cd

File tree

7 files changed

+282
-47
lines changed

7 files changed

+282
-47
lines changed

client/client_test.go

Lines changed: 107 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ import (
7676
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
7777
"github.com/pkg/errors"
7878
"github.com/spdx/tools-golang/spdx"
79+
"github.com/stretchr/testify/assert"
7980
"github.com/stretchr/testify/require"
8081
"github.com/tonistiigi/fsutil"
8182
fsutiltypes "github.com/tonistiigi/fsutil/types"
@@ -243,6 +244,7 @@ var allTests = []func(t *testing.T, sb integration.Sandbox){
243244
testMetadataOnlyLocal,
244245
testGitResolveSourceMetadata,
245246
testHTTPResolveSourceMetadata,
247+
testHTTPPruneAfterCacheKey,
246248
}
247249

248250
func TestIntegration(t *testing.T) {
@@ -3223,13 +3225,13 @@ func testBuildHTTPSource(t *testing.T, sb integration.Sandbox) {
32233225

32243226
modTime := time.Now().Add(-24 * time.Hour) // avoid falso positive with current time
32253227

3226-
resp := httpserver.Response{
3228+
resp := &httpserver.Response{
32273229
Etag: identity.NewID(),
32283230
Content: []byte("content1"),
32293231
LastModified: &modTime,
32303232
}
32313233

3232-
server := httpserver.NewTestServer(map[string]httpserver.Response{
3234+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
32333235
"/foo": resp,
32343236
})
32353237
defer server.Close()
@@ -3293,7 +3295,7 @@ func testBuildHTTPSource(t *testing.T, sb integration.Sandbox) {
32933295
require.NoError(t, err)
32943296
require.NoError(t, gw.Close())
32953297
gzipBytes := buf.Bytes()
3296-
respGzip := httpserver.Response{
3298+
respGzip := &httpserver.Response{
32973299
Etag: identity.NewID(),
32983300
Content: gzipBytes,
32993301
LastModified: &modTime,
@@ -3373,18 +3375,18 @@ func testBuildHTTPSourceEtagScope(t *testing.T, sb integration.Sandbox) {
33733375
modTime := time.Now().Add(-24 * time.Hour) // avoid falso positive with current time
33743376

33753377
sharedEtag := identity.NewID()
3376-
resp := httpserver.Response{
3378+
resp := &httpserver.Response{
33773379
Etag: sharedEtag,
33783380
Content: []byte("content1"),
33793381
LastModified: &modTime,
33803382
}
3381-
resp2 := httpserver.Response{
3383+
resp2 := &httpserver.Response{
33823384
Etag: sharedEtag,
33833385
Content: []byte("another"),
33843386
LastModified: &modTime,
33853387
}
33863388

3387-
server := httpserver.NewTestServer(map[string]httpserver.Response{
3389+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
33883390
"/one/foo": resp,
33893391
"/two/foo": resp2,
33903392
})
@@ -3468,13 +3470,13 @@ func testBuildHTTPSourceAuthHeaderSecret(t *testing.T, sb integration.Sandbox) {
34683470

34693471
modTime := time.Now().Add(-24 * time.Hour) // avoid false positive with current time
34703472

3471-
resp := httpserver.Response{
3473+
resp := &httpserver.Response{
34723474
Etag: identity.NewID(),
34733475
Content: []byte("content1"),
34743476
LastModified: &modTime,
34753477
}
34763478

3477-
server := httpserver.NewTestServer(map[string]httpserver.Response{
3479+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
34783480
"/foo": resp,
34793481
})
34803482
defer server.Close()
@@ -3509,13 +3511,13 @@ func testBuildHTTPSourceHostTokenSecret(t *testing.T, sb integration.Sandbox) {
35093511

35103512
modTime := time.Now().Add(-24 * time.Hour) // avoid false positive with current time
35113513

3512-
resp := httpserver.Response{
3514+
resp := &httpserver.Response{
35133515
Etag: identity.NewID(),
35143516
Content: []byte("content1"),
35153517
LastModified: &modTime,
35163518
}
35173519

3518-
server := httpserver.NewTestServer(map[string]httpserver.Response{
3520+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
35193521
"/foo": resp,
35203522
})
35213523
defer server.Close()
@@ -3550,13 +3552,13 @@ func testBuildHTTPSourceHeader(t *testing.T, sb integration.Sandbox) {
35503552

35513553
modTime := time.Now().Add(-24 * time.Hour) // avoid falso positive with current time
35523554

3553-
resp := httpserver.Response{
3555+
resp := &httpserver.Response{
35543556
Etag: identity.NewID(),
35553557
Content: []byte("content1"),
35563558
LastModified: &modTime,
35573559
}
35583560

3559-
server := httpserver.NewTestServer(map[string]httpserver.Response{
3561+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
35603562
"/foo": resp,
35613563
})
35623564
defer server.Close()
@@ -12066,19 +12068,19 @@ func testHTTPResolveSourceMetadata(t *testing.T, sb integration.Sandbox) {
1206612068

1206712069
modTime := time.Now().Add(-24 * time.Hour) // avoid falso positive with current time
1206812070

12069-
resp := httpserver.Response{
12071+
resp := &httpserver.Response{
1207012072
Etag: identity.NewID(),
1207112073
Content: []byte("content1"),
1207212074
LastModified: &modTime,
1207312075
}
1207412076

12075-
resp2 := httpserver.Response{
12077+
resp2 := &httpserver.Response{
1207612078
Etag: identity.NewID(),
1207712079
Content: []byte("content2"),
1207812080
ContentDisposition: "attachment; filename=\"my img.jpg\"",
1207912081
}
1208012082

12081-
server := httpserver.NewTestServer(map[string]httpserver.Response{
12083+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
1208212084
"/foo": resp,
1208312085
"/bar": resp2,
1208412086
})
@@ -12116,6 +12118,96 @@ func testHTTPResolveSourceMetadata(t *testing.T, sb integration.Sandbox) {
1211612118
require.NoError(t, err)
1211712119
}
1211812120

12121+
func testHTTPPruneAfterCacheKey(t *testing.T, sb integration.Sandbox) {
12122+
// this test depends on hitting race condition in internal functions.
12123+
// If debugging and expecting failure you can add small sleep in beginning of source/http.Exec() to hit reliably
12124+
ctx := sb.Context()
12125+
c, err := New(ctx, sb.Address())
12126+
require.NoError(t, err)
12127+
defer c.Close()
12128+
12129+
resp := &httpserver.Response{
12130+
Etag: identity.NewID(),
12131+
Content: []byte("content1"),
12132+
}
12133+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
12134+
"/foo": resp,
12135+
})
12136+
defer server.Close()
12137+
12138+
done := make(chan struct{})
12139+
12140+
startScan := make(chan struct{})
12141+
stopScan := make(chan struct{})
12142+
pauseScan := make(chan struct{})
12143+
12144+
go func() {
12145+
// attempt to prune the HTTP record in between cachekey and snapshot
12146+
defer close(done)
12147+
for {
12148+
select {
12149+
case <-startScan:
12150+
scan:
12151+
for {
12152+
select {
12153+
case <-pauseScan:
12154+
break scan
12155+
default:
12156+
du, err := c.DiskUsage(ctx)
12157+
assert.NoError(t, err)
12158+
for _, entry := range du {
12159+
if entry.Description == "http url "+server.URL+"/foo" {
12160+
if !entry.InUse {
12161+
t.Logf("entry no longer in use, pruning")
12162+
err = c.Prune(ctx, nil)
12163+
assert.NoError(t, err)
12164+
12165+
resp.Etag = identity.NewID()
12166+
resp.Content = []byte("content2")
12167+
}
12168+
}
12169+
}
12170+
}
12171+
}
12172+
case <-stopScan:
12173+
return
12174+
}
12175+
}
12176+
}()
12177+
12178+
const iterations = 10
12179+
for range iterations {
12180+
startScan <- struct{}{}
12181+
resp.Etag = identity.NewID()
12182+
resp.Content = []byte("content1")
12183+
_, err = c.Build(ctx, SolveOpt{}, "test", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
12184+
st := llb.Scratch().File(llb.Copy(llb.HTTP(server.URL+"/foo"), "foo", "bar"))
12185+
def, err := st.Marshal(sb.Context())
12186+
if err != nil {
12187+
return nil, err
12188+
}
12189+
resp, err := c.Solve(ctx, gateway.SolveRequest{
12190+
Definition: def.ToPB(),
12191+
})
12192+
if err != nil {
12193+
return nil, err
12194+
}
12195+
12196+
return resp, nil
12197+
}, nil)
12198+
require.NoError(t, err)
12199+
12200+
pauseScan <- struct{}{}
12201+
12202+
err = c.Prune(ctx, nil)
12203+
require.NoError(t, err)
12204+
12205+
checkAllReleasable(t, c, sb, false)
12206+
}
12207+
close(stopScan)
12208+
<-done
12209+
}
12210+
1211912211
func runInDir(dir string, cmds ...string) error {
1212012212
for _, args := range cmds {
1212112213
var cmd *exec.Cmd

frontend/dockerfile/dockerfile_addchecksum_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ func testAddChecksum(t *testing.T, sb integration.Sandbox) {
2828
f := getFrontend(t, sb)
2929
f.RequiresBuildctl(t)
3030

31-
resp := httpserver.Response{
31+
resp := &httpserver.Response{
3232
Etag: identity.NewID(),
3333
Content: []byte("content1"),
3434
}
35-
server := httpserver.NewTestServer(map[string]httpserver.Response{
35+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
3636
"/foo": resp,
3737
})
3838
defer server.Close()

frontend/dockerfile/dockerfile_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2509,12 +2509,12 @@ RUN echo foo-contents> /foo
25092509
err := os.WriteFile(filepath.Join(srcDir, "Dockerfile"), dockerfile, 0600)
25102510
require.NoError(t, err)
25112511

2512-
resp := httpserver.Response{
2512+
resp := &httpserver.Response{
25132513
Etag: identity.NewID(),
25142514
Content: dockerfile,
25152515
}
25162516

2517-
server := httpserver.NewTestServer(map[string]httpserver.Response{
2517+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
25182518
"/df": resp,
25192519
})
25202520
defer server.Close()
@@ -3061,18 +3061,18 @@ func testDockerfileADDFromURL(t *testing.T, sb integration.Sandbox) {
30613061

30623062
modTime := time.Now().Add(-24 * time.Hour) // avoid falso positive with current time
30633063

3064-
resp := httpserver.Response{
3064+
resp := &httpserver.Response{
30653065
Etag: identity.NewID(),
30663066
Content: []byte("content1"),
30673067
}
30683068

3069-
resp2 := httpserver.Response{
3069+
resp2 := &httpserver.Response{
30703070
Etag: identity.NewID(),
30713071
LastModified: &modTime,
30723072
Content: []byte("content2"),
30733073
}
30743074

3075-
server := httpserver.NewTestServer(map[string]httpserver.Response{
3075+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
30763076
"/foo": resp,
30773077
"/": resp2,
30783078
})
@@ -3271,12 +3271,12 @@ COPY t.tar.gz /
32713271
require.Equal(t, buf2.Bytes(), dt)
32723272

32733273
// ADD from URL doesn't extract
3274-
resp := httpserver.Response{
3274+
resp := &httpserver.Response{
32753275
Etag: identity.NewID(),
32763276
Content: buf2.Bytes(),
32773277
}
32783278

3279-
server := httpserver.NewTestServer(map[string]httpserver.Response{
3279+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
32803280
"/t.tar.gz": resp,
32813281
})
32823282
defer server.Close()
@@ -4707,11 +4707,11 @@ func testAddURLChmod(t *testing.T, sb integration.Sandbox) {
47074707
f := getFrontend(t, sb)
47084708
f.RequiresBuildctl(t)
47094709

4710-
resp := httpserver.Response{
4710+
resp := &httpserver.Response{
47114711
Etag: identity.NewID(),
47124712
Content: []byte("content1"),
47134713
}
4714-
server := httpserver.NewTestServer(map[string]httpserver.Response{
4714+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
47154715
"/foo": resp,
47164716
})
47174717
defer server.Close()
@@ -4952,12 +4952,12 @@ COPY foo bar
49524952

49534953
require.NoError(t, w.Flush())
49544954

4955-
resp := httpserver.Response{
4955+
resp := &httpserver.Response{
49564956
Etag: identity.NewID(),
49574957
Content: buf.Bytes(),
49584958
}
49594959

4960-
server := httpserver.NewTestServer(map[string]httpserver.Response{
4960+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
49614961
"/myurl": resp,
49624962
})
49634963
defer server.Close()

source/containerimage/pull.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,18 @@ func mainManifestKey(desc ocispecs.Descriptor, platform ocispecs.Platform, layer
8787
}
8888

8989
func (p *puller) CacheKey(ctx context.Context, jobCtx solver.JobContext, index int) (cacheKey string, imgDigest string, cacheOpts solver.CacheOpts, cacheDone bool, err error) {
90+
var g session.Group
91+
if jobCtx != nil {
92+
g = jobCtx.Session()
93+
}
9094
var getResolver pull.SessionResolver
9195
switch p.ResolverType {
9296
case ResolverTypeRegistry:
93-
resolver := resolver.DefaultPool.GetResolver(p.RegistryHosts, p.Ref, "pull", p.SessionManager, jobCtx.Session()).WithImageStore(p.ImageStore, p.Mode)
97+
resolver := resolver.DefaultPool.GetResolver(p.RegistryHosts, p.Ref, "pull", p.SessionManager, g).WithImageStore(p.ImageStore, p.Mode)
9498
p.Resolver = resolver
9599
getResolver = func(g session.Group) remotes.Resolver { return resolver.WithSession(g) }
96100
case ResolverTypeOCILayout:
97-
resolver := getOCILayoutResolver(p.store, p.SessionManager, jobCtx.Session())
101+
resolver := getOCILayoutResolver(p.store, p.SessionManager, g)
98102
p.Resolver = resolver
99103
// OCILayout has no need for session
100104
getResolver = func(g session.Group) remotes.Resolver { return resolver }
@@ -207,14 +211,18 @@ func (p *puller) CacheKey(ctx context.Context, jobCtx solver.JobContext, index i
207211
}
208212

209213
func (p *puller) Snapshot(ctx context.Context, jobCtx solver.JobContext) (ir cache.ImmutableRef, err error) {
214+
var g session.Group
215+
if jobCtx != nil {
216+
g = jobCtx.Session()
217+
}
210218
var getResolver pull.SessionResolver
211219
switch p.ResolverType {
212220
case ResolverTypeRegistry:
213-
resolver := resolver.DefaultPool.GetResolver(p.RegistryHosts, p.Ref, "pull", p.SessionManager, jobCtx.Session()).WithImageStore(p.ImageStore, p.Mode)
221+
resolver := resolver.DefaultPool.GetResolver(p.RegistryHosts, p.Ref, "pull", p.SessionManager, g).WithImageStore(p.ImageStore, p.Mode)
214222
p.Resolver = resolver
215223
getResolver = func(g session.Group) remotes.Resolver { return resolver.WithSession(g) }
216224
case ResolverTypeOCILayout:
217-
resolver := getOCILayoutResolver(p.store, p.SessionManager, jobCtx.Session())
225+
resolver := getOCILayoutResolver(p.store, p.SessionManager, g)
218226
p.Resolver = resolver
219227
// OCILayout has no need for session
220228
getResolver = func(g session.Group) remotes.Resolver { return resolver }

source/http/source.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,14 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.
382382
if err != nil {
383383
return nil, err
384384
}
385-
ref.Release(context.TODO())
385+
cleanup := func() error {
386+
return ref.Release(context.TODO())
387+
}
388+
if jobCtx != nil {
389+
jobCtx.Cleanup(cleanup)
390+
} else {
391+
cleanup()
392+
}
386393

387394
var modTime *time.Time
388395
if modTimeStr := resp.Header.Get("Last-Modified"); modTimeStr != "" {

0 commit comments

Comments
 (0)