Skip to content

Commit 23a0dd7

Browse files
committed
solver: add jobCtx support to metadata resolver
Fixes issue where HTTP result can be released after metadata resolve and rest of the build might pull new (potentially different) data in again. Signed-off-by: Tonis Tiigi <[email protected]>
1 parent 80766cd commit 23a0dd7

File tree

5 files changed

+103
-27
lines changed

5 files changed

+103
-27
lines changed

client/client_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ var allTests = []func(t *testing.T, sb integration.Sandbox){
245245
testGitResolveSourceMetadata,
246246
testHTTPResolveSourceMetadata,
247247
testHTTPPruneAfterCacheKey,
248+
testHTTPPruneAfterResolveMeta,
248249
}
249250

250251
func TestIntegration(t *testing.T) {
@@ -12208,6 +12209,67 @@ func testHTTPPruneAfterCacheKey(t *testing.T, sb integration.Sandbox) {
1220812209
<-done
1220912210
}
1221012211

12212+
// testHTTPPruneAfterResolveMeta ensures that pruning after ResolveSourceMetadata
12213+
// doesn't pull in new data for same build. Once URL has been resolved once for a specific
12214+
// build, the data should be considered immutable and remote changes don't affect ongoing build.
12215+
func testHTTPPruneAfterResolveMeta(t *testing.T, sb integration.Sandbox) {
12216+
ctx := sb.Context()
12217+
c, err := New(ctx, sb.Address())
12218+
require.NoError(t, err)
12219+
defer c.Close()
12220+
12221+
resp := &httpserver.Response{
12222+
Etag: identity.NewID(),
12223+
Content: []byte("content1"),
12224+
}
12225+
server := httpserver.NewTestServer(map[string]*httpserver.Response{
12226+
"/foo": resp,
12227+
})
12228+
defer server.Close()
12229+
12230+
dest := t.TempDir()
12231+
12232+
_, err = c.Build(ctx, SolveOpt{
12233+
Exports: []ExportEntry{
12234+
{
12235+
Type: ExporterLocal,
12236+
OutputDir: dest,
12237+
},
12238+
},
12239+
}, "test", func(ctx context.Context, gc gateway.Client) (*gateway.Result, error) {
12240+
id := server.URL + "/foo"
12241+
md, err := gc.ResolveSourceMetadata(ctx, &pb.SourceOp{
12242+
Identifier: id,
12243+
}, sourceresolver.Opt{})
12244+
if err != nil {
12245+
return nil, err
12246+
}
12247+
require.NotNil(t, md.HTTP)
12248+
12249+
// prune all
12250+
err = c.Prune(ctx, nil)
12251+
require.NoError(t, err)
12252+
12253+
resp.Content = []byte("content2") // etag is same so should hit cache if record not pruned
12254+
12255+
st := llb.Scratch().File(llb.Copy(llb.HTTP(id), "foo", "bar"))
12256+
def, err := st.Marshal(sb.Context())
12257+
if err != nil {
12258+
return nil, err
12259+
}
12260+
return gc.Solve(ctx, gateway.SolveRequest{
12261+
Definition: def.ToPB(),
12262+
})
12263+
}, nil)
12264+
require.NoError(t, err)
12265+
12266+
dt, err := os.ReadFile(filepath.Join(dest, "bar"))
12267+
require.NoError(t, err)
12268+
require.Equal(t, "content1", string(dt))
12269+
12270+
checkAllReleasable(t, c, sb, false)
12271+
}
12272+
1221112273
func runInDir(dir string, cmds ...string) error {
1221212274
for _, args := range cmds {
1221312275
var cmd *exec.Cmd

solver/jobs.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type ResolveOpFunc func(Vertex, Builder) (Op, error)
2828

2929
type Builder interface {
3030
Build(ctx context.Context, e Edge) (CachedResultWithProvenance, error)
31-
InContext(ctx context.Context, f func(ctx context.Context, g session.Group) error) error
31+
InContext(ctx context.Context, f func(ctx context.Context, jobCtx JobContext) error) error
3232
EachValue(ctx context.Context, key string, fn func(any) error) error
3333
}
3434

@@ -299,7 +299,7 @@ func (sb *subBuilder) Build(ctx context.Context, e Edge) (CachedResultWithProven
299299
return &withProvenance{CachedResult: res}, nil
300300
}
301301

302-
func (sb *subBuilder) InContext(ctx context.Context, f func(context.Context, session.Group) error) error {
302+
func (sb *subBuilder) InContext(ctx context.Context, f func(context.Context, JobContext) error) error {
303303
ctx = progress.WithProgress(ctx, sb.mpw)
304304
if sb.mspan.Span != nil {
305305
ctx = trace.ContextWithSpan(ctx, sb.mspan)
@@ -328,6 +328,7 @@ type Job struct {
328328
id string
329329
startedTime time.Time
330330
completedTime time.Time
331+
releasers []func() error
331332

332333
progressCloser func(error)
333334
SessionID string
@@ -812,6 +813,13 @@ func (j *Job) Discard() error {
812813
st.mu.Unlock()
813814
}
814815

816+
for _, r := range j.releasers {
817+
if err := r(); err != nil {
818+
bklog.G(context.TODO()).WithError(err).Error("failed to cleanup job resources")
819+
}
820+
}
821+
j.releasers = nil
822+
815823
go func() {
816824
// don't clear job right away. there might still be a status request coming to read progress
817825
time.Sleep(10 * time.Second)
@@ -839,8 +847,19 @@ func (j *Job) UniqueID() string {
839847
return j.uniqueID
840848
}
841849

842-
func (j *Job) InContext(ctx context.Context, f func(context.Context, session.Group) error) error {
843-
return f(progress.WithProgress(ctx, j.pw), session.NewGroup(j.SessionID))
850+
func (j *Job) InContext(ctx context.Context, f func(context.Context, JobContext) error) error {
851+
return f(progress.WithProgress(ctx, j.pw), j)
852+
}
853+
854+
func (j *Job) Session() session.Group {
855+
return session.NewGroup(j.SessionID)
856+
}
857+
858+
func (j *Job) Cleanup(fn func() error) error {
859+
j.mu.Lock()
860+
j.releasers = append(j.releasers, fn)
861+
j.mu.Unlock()
862+
return nil
844863
}
845864

846865
func (j *Job) SetValue(key string, v any) {

solver/llbsolver/bridge.go

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type llbBridge struct {
4949
}
5050

5151
func (b *llbBridge) Warn(ctx context.Context, dgst digest.Digest, msg string, opts frontend.WarnOpts) error {
52-
return b.builder.InContext(ctx, func(ctx context.Context, g session.Group) error {
52+
return b.builder.InContext(ctx, func(ctx context.Context, _ solver.JobContext) error {
5353
pw, ok, _ := progress.NewFromContext(ctx, progress.WithMetadata("vertex", dgst))
5454
if !ok {
5555
return nil
@@ -111,11 +111,15 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp
111111
func(cmID string, im gw.CacheOptionsEntry) {
112112
cm = newLazyCacheManager(cmID, func() (solver.CacheManager, error) {
113113
var cmNew solver.CacheManager
114-
if err := inBuilderContext(context.TODO(), b.builder, "importing cache manifest from "+cmID, "", func(ctx context.Context, g session.Group) error {
114+
if err := inBuilderContext(context.TODO(), b.builder, "importing cache manifest from "+cmID, "", func(ctx context.Context, jobCtx solver.JobContext) error {
115115
resolveCI, ok := b.resolveCacheImporterFuncs[im.Type]
116116
if !ok {
117117
return errors.Errorf("unknown cache importer: %s", im.Type)
118118
}
119+
var g session.Group
120+
if jobCtx != nil {
121+
g = jobCtx.Session()
122+
}
119123
ci, desc, err := resolveCI(ctx, g, im.Attrs)
120124
if err != nil {
121125
return errors.Wrapf(err, "failed to configure %v cache importer", im.Type)
@@ -373,8 +377,8 @@ func (b *llbBridge) ResolveSourceMetadata(ctx context.Context, op *pb.SourceOp,
373377
// policy is evaluated, so we can remove it from the options
374378
opt.SourcePolicies = nil
375379

376-
err = inBuilderContext(ctx, b.builder, opt.LogName, id, func(ctx context.Context, g session.Group) error {
377-
resp, err = w.ResolveSourceMetadata(ctx, op, opt, b.sm, dummyJobContext{g: g})
380+
err = inBuilderContext(ctx, b.builder, opt.LogName, id, func(ctx context.Context, jobCtx solver.JobContext) error {
381+
resp, err = w.ResolveSourceMetadata(ctx, op, opt, b.sm, jobCtx)
378382
return err
379383
})
380384
if err != nil {
@@ -383,18 +387,6 @@ func (b *llbBridge) ResolveSourceMetadata(ctx context.Context, op *pb.SourceOp,
383387
return resp, nil
384388
}
385389

386-
type dummyJobContext struct {
387-
g session.Group
388-
}
389-
390-
func (d dummyJobContext) Session() session.Group {
391-
return d.g
392-
}
393-
394-
func (d dummyJobContext) Cleanup(fn func() error) error {
395-
return errors.Errorf("cleanup not implemented for %T", d)
396-
}
397-
398390
type lazyCacheManager struct {
399391
id string
400392
main solver.CacheManager

solver/llbsolver/solver.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,7 @@ func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j *
776776
i, exp := i, exp
777777
eg.Go(func() (err error) {
778778
id := fmt.Sprint(j.SessionID, "-cache-", i)
779-
err = inBuilderContext(ctx, j, exp.Name(), id, func(ctx context.Context, _ session.Group) error {
779+
err = inBuilderContext(ctx, j, exp.Name(), id, func(ctx context.Context, _ solver.JobContext) error {
780780
prepareDone := progress.OneOff(ctx, "preparing build cache for export")
781781
if err := result.EachRef(cached, inp, func(res solver.CachedResult, ref cache.ImmutableRef) error {
782782
ctx := withDescHandlerCacheOpts(ctx, ref)
@@ -852,7 +852,7 @@ func (s *Solver) runExporters(ctx context.Context, exporters []exporter.Exporter
852852
i, exp := i, exp
853853
eg.Go(func() error {
854854
id := fmt.Sprint(job.SessionID, "-export-", i)
855-
return inBuilderContext(ctx, job, exp.Name(), id, func(ctx context.Context, _ session.Group) error {
855+
return inBuilderContext(ctx, job, exp.Name(), id, func(ctx context.Context, _ solver.JobContext) error {
856856
span, ctx := tracing.StartSpan(ctx, exp.Name())
857857
defer span.End()
858858

@@ -884,7 +884,7 @@ func (s *Solver) runExporters(ctx context.Context, exporters []exporter.Exporter
884884
}
885885

886886
if len(exporters) == 0 && len(warnings) > 0 {
887-
err := inBuilderContext(ctx, job, "Verifying build result", identity.NewID(), func(ctx context.Context, _ session.Group) error {
887+
err := inBuilderContext(ctx, job, "Verifying build result", identity.NewID(), func(ctx context.Context, _ solver.JobContext) error {
888888
pw, _, _ := progress.NewFromContext(ctx)
889889
for _, w := range warnings {
890890
pw.Write(identity.NewID(), w)
@@ -1152,19 +1152,19 @@ func allWorkers(wc *worker.Controller) func(func(w worker.Worker) error) error {
11521152
}
11531153
}
11541154

1155-
func inBuilderContext(ctx context.Context, b solver.Builder, name, id string, f func(ctx context.Context, g session.Group) error) error {
1155+
func inBuilderContext(ctx context.Context, b solver.Builder, name, id string, f func(ctx context.Context, jobCtx solver.JobContext) error) error {
11561156
if id == "" {
11571157
id = name
11581158
}
11591159
v := client.Vertex{
11601160
Digest: digest.FromBytes([]byte(id)),
11611161
Name: name,
11621162
}
1163-
return b.InContext(ctx, func(ctx context.Context, g session.Group) error {
1163+
return b.InContext(ctx, func(ctx context.Context, jobCtx solver.JobContext) error {
11641164
pw, _, ctx := progress.NewFromContext(ctx, progress.WithMetadata("vertex", v.Digest))
11651165
notifyCompleted := notifyStarted(ctx, &v)
11661166
defer pw.Close()
1167-
err := f(ctx, g)
1167+
err := f(ctx, jobCtx)
11681168
notifyCompleted(err)
11691169
return err
11701170
})

source/http/source.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,10 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.
386386
return ref.Release(context.TODO())
387387
}
388388
if jobCtx != nil {
389-
jobCtx.Cleanup(cleanup)
389+
if err := jobCtx.Cleanup(cleanup); err != nil {
390+
_ = cleanup()
391+
return nil, err
392+
}
390393
} else {
391394
cleanup()
392395
}

0 commit comments

Comments
 (0)