Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func toRepoOnly(in string) (string, error) {
}

type (
EvaluateFunc func(ctx context.Context, name string, c gateway.Client, res *gateway.Result) error
EvaluateFunc func(ctx context.Context, name string, c gateway.Client, res *gateway.Result, opt Options) error
Handler struct {
Evaluate EvaluateFunc
}
Expand Down Expand Up @@ -525,7 +525,7 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opts map[

// invoke custom evaluate handler if it is present
if bh != nil && bh.Evaluate != nil {
if err := bh.Evaluate(ctx, k, c, res); err != nil {
if err := bh.Evaluate(ctx, k, c, res, opt); err != nil {
return nil, err
}
} else if forceEval {
Expand Down
215 changes: 115 additions & 100 deletions dap/adapter.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package dap

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"path"
"sync"
"sync/atomic"

"github.com/docker/buildx/build"
"github.com/google/go-dap"
Expand All @@ -28,6 +31,9 @@ type Adapter[T any] struct {
threads map[int]*thread
threadsMu sync.RWMutex
nextThreadID int

sourceMap sourceMap
idPool *idPool
}

func New[T any](cfg *build.InvokeConfig) *Adapter[T] {
Expand All @@ -38,6 +44,7 @@ func New[T any](cfg *build.InvokeConfig) *Adapter[T] {
evaluateReqCh: make(chan *evaluateRequest),
threads: make(map[int]*thread),
nextThreadID: 1,
idPool: new(idPool),
}
if cfg != nil {
d.cfg = *cfg
Expand Down Expand Up @@ -131,7 +138,16 @@ func (d *Adapter[T]) Continue(c Context, req *dap.ContinueRequest, resp *dap.Con
t := d.threads[req.Arguments.ThreadId]
d.threadsMu.RUnlock()

t.Resume(c)
t.Continue()
return nil
}

func (d *Adapter[T]) Next(c Context, req *dap.NextRequest, resp *dap.NextResponse) error {
d.threadsMu.RLock()
t := d.threads[req.Arguments.ThreadId]
d.threadsMu.RUnlock()

t.Next()
return nil
}

Expand Down Expand Up @@ -182,7 +198,7 @@ func (d *Adapter[T]) launch(c Context) {
started := c.Go(func(c Context) {
defer d.deleteThread(c, t)
defer close(req.errCh)
req.errCh <- t.Evaluate(c, req.c, req.ref, req.meta, d.cfg)
req.errCh <- t.Evaluate(c, req.c, req.ref, req.meta, req.inputs)
})

if !started {
Expand All @@ -197,8 +213,10 @@ func (d *Adapter[T]) newThread(ctx Context, name string) (t *thread) {
d.threadsMu.Lock()
id := d.nextThreadID
t = &thread{
id: id,
name: name,
id: id,
name: name,
sourceMap: &d.sourceMap,
idPool: d.idPool,
}
d.threads[t.id] = t
d.nextThreadID++
Expand Down Expand Up @@ -236,41 +254,43 @@ func (d *Adapter[T]) deleteThread(ctx Context, t *thread) {
}

type evaluateRequest struct {
name string
c gateway.Client
ref gateway.Reference
meta map[string][]byte
errCh chan<- error
name string
c gateway.Client
ref gateway.Reference
meta map[string][]byte
inputs build.Inputs
errCh chan<- error
}

func (d *Adapter[T]) EvaluateResult(ctx context.Context, name string, c gateway.Client, res *gateway.Result) error {
func (d *Adapter[T]) EvaluateResult(ctx context.Context, name string, c gateway.Client, res *gateway.Result, inputs build.Inputs) error {
eg, _ := errgroup.WithContext(ctx)
if res.Ref != nil {
eg.Go(func() error {
return d.evaluateRef(ctx, name, c, res.Ref, res.Metadata)
return d.evaluateRef(ctx, name, c, res.Ref, res.Metadata, inputs)
})
}

for k, ref := range res.Refs {
refName := fmt.Sprintf("%s (%s)", name, k)
eg.Go(func() error {
return d.evaluateRef(ctx, refName, c, ref, res.Metadata)
return d.evaluateRef(ctx, refName, c, ref, res.Metadata, inputs)
})
}
return eg.Wait()
}

func (d *Adapter[T]) evaluateRef(ctx context.Context, name string, c gateway.Client, ref gateway.Reference, meta map[string][]byte) error {
func (d *Adapter[T]) evaluateRef(ctx context.Context, name string, c gateway.Client, ref gateway.Reference, meta map[string][]byte, inputs build.Inputs) error {
errCh := make(chan error, 1)

// Send a solve request to the launch routine
// which will perform the solve in the context of the server.
ereq := &evaluateRequest{
name: name,
c: c,
ref: ref,
meta: meta,
errCh: errCh,
name: name,
c: c,
ref: ref,
meta: meta,
inputs: inputs,
errCh: errCh,
}
select {
case d.evaluateReqCh <- ereq:
Expand Down Expand Up @@ -307,16 +327,28 @@ func (d *Adapter[T]) StackTrace(c Context, req *dap.StackTraceRequest, resp *dap
return errors.Errorf("no such thread: %d", req.Arguments.ThreadId)
}

resp.Body.StackFrames = t.StackFrames()
resp.Body.StackFrames = t.StackTrace()
return nil
}

func (d *Adapter[T]) Source(c Context, req *dap.SourceRequest, resp *dap.SourceResponse) error {
fname := req.Arguments.Source.Path

dt, ok := d.sourceMap.Get(fname)
if !ok {
return errors.Errorf("file not found: %s", fname)
}

resp.Body.Content = string(dt)
return nil
}

func (d *Adapter[T]) evaluate(ctx context.Context, name string, c gateway.Client, res *gateway.Result) error {
func (d *Adapter[T]) evaluate(ctx context.Context, name string, c gateway.Client, res *gateway.Result, opt build.Options) error {
errCh := make(chan error, 1)

started := d.srv.Go(func(ctx Context) {
defer close(errCh)
errCh <- d.EvaluateResult(ctx, name, c, res)
errCh <- d.EvaluateResult(ctx, name, c, res, opt.Inputs)
})
if !started {
return context.Canceled
Expand All @@ -341,93 +373,16 @@ func (d *Adapter[T]) dapHandler() Handler {
Initialize: d.Initialize,
Launch: d.Launch,
Continue: d.Continue,
Next: d.Next,
SetBreakpoints: d.SetBreakpoints,
ConfigurationDone: d.ConfigurationDone,
Disconnect: d.Disconnect,
Threads: d.Threads,
StackTrace: d.StackTrace,
Source: d.Source,
}
}

type thread struct {
id int
name string

paused chan struct{}
rCtx *build.ResultHandle
mu sync.Mutex
}

func (t *thread) Evaluate(ctx Context, c gateway.Client, ref gateway.Reference, meta map[string][]byte, cfg build.InvokeConfig) error {
err := ref.Evaluate(ctx)
if reason, desc := t.needsDebug(cfg, err); reason != "" {
rCtx := build.NewResultHandle(ctx, c, ref, meta, err)

select {
case <-t.pause(ctx, rCtx, reason, desc):
case <-ctx.Done():
t.Resume(ctx)
return context.Cause(ctx)
}
}
return err
}

func (t *thread) needsDebug(cfg build.InvokeConfig, err error) (reason, desc string) {
if !cfg.NeedsDebug(err) {
return
}

if err != nil {
reason = "exception"
desc = "Encountered an error during result evaluation"
} else {
reason = "pause"
desc = "Result evaluation completed"
}
return
}

func (t *thread) pause(c Context, rCtx *build.ResultHandle, reason, desc string) <-chan struct{} {
if t.paused == nil {
t.paused = make(chan struct{})
}
t.rCtx = rCtx

c.C() <- &dap.StoppedEvent{
Event: dap.Event{Event: "stopped"},
Body: dap.StoppedEventBody{
Reason: reason,
Description: desc,
ThreadId: t.id,
},
}
return t.paused
}

func (t *thread) Resume(c Context) {
t.mu.Lock()
defer t.mu.Unlock()

if t.paused == nil {
return
}

if t.rCtx != nil {
t.rCtx.Done()
t.rCtx = nil
}

close(t.paused)
t.paused = nil
}

// TODO: return a suitable stack frame for the thread.
// For now, just returns nothing.
func (t *thread) StackFrames() []dap.StackFrame {
return []dap.StackFrame{}
}

func (d *Adapter[T]) Out() io.Writer {
return &adapterWriter[T]{d}
}
Expand All @@ -454,3 +409,63 @@ func (d *adapterWriter[T]) Write(p []byte) (n int, err error) {
}
return n, nil
}

type idPool struct {
next atomic.Int64
}

func (p *idPool) Get() int64 {
return p.next.Add(1)
}

func (p *idPool) Put(x int64) {
// noop
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's unlikely happen but it's safe to return an error or panic if the id is exhausted.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly just adding this struct in case we ever wanted to reuse ids for some reason. That's the main reason why this function exists. I can also just remove this entirely and we can add that later if it's ever used.

}

type sourceMap struct {
m sync.Map
}

func (s *sourceMap) Put(c Context, fname string, dt []byte) {
for {
old, loaded := s.m.LoadOrStore(fname, dt)
if !loaded {
c.C() <- &dap.LoadedSourceEvent{
Event: dap.Event{Event: "loadedSource"},
Body: dap.LoadedSourceEventBody{
Reason: "new",
Source: dap.Source{
Name: path.Base(fname),
Path: fname,
},
},
}
}

if bytes.Equal(old.([]byte), dt) {
// Nothing to do.
return
}

if s.m.CompareAndSwap(fname, old, dt) {
c.C() <- &dap.LoadedSourceEvent{
Event: dap.Event{Event: "loadedSource"},
Body: dap.LoadedSourceEventBody{
Reason: "changed",
Source: dap.Source{
Name: path.Base(fname),
Path: fname,
},
},
}
}
}
}

func (s *sourceMap) Get(fname string) ([]byte, bool) {
v, ok := s.m.Load(fname)
if !ok {
return nil, false
}
return v.([]byte), true
}
2 changes: 2 additions & 0 deletions dap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ type Handler struct {
Disconnect HandlerFunc[*dap.DisconnectRequest, *dap.DisconnectResponse]
Terminate HandlerFunc[*dap.TerminateRequest, *dap.TerminateResponse]
Continue HandlerFunc[*dap.ContinueRequest, *dap.ContinueResponse]
Next HandlerFunc[*dap.NextRequest, *dap.NextResponse]
Restart HandlerFunc[*dap.RestartRequest, *dap.RestartResponse]
Threads HandlerFunc[*dap.ThreadsRequest, *dap.ThreadsResponse]
StackTrace HandlerFunc[*dap.StackTraceRequest, *dap.StackTraceResponse]
Evaluate HandlerFunc[*dap.EvaluateRequest, *dap.EvaluateResponse]
Source HandlerFunc[*dap.SourceRequest, *dap.SourceResponse]
}
4 changes: 4 additions & 0 deletions dap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func (s *Server) handleMessage(c Context, m dap.Message) (dap.ResponseMessage, e
return s.h.Terminate.Do(c, req)
case *dap.ContinueRequest:
return s.h.Continue.Do(c, req)
case *dap.NextRequest:
return s.h.Next.Do(c, req)
case *dap.RestartRequest:
return s.h.Restart.Do(c, req)
case *dap.ThreadsRequest:
Expand All @@ -125,6 +127,8 @@ func (s *Server) handleMessage(c Context, m dap.Message) (dap.ResponseMessage, e
return s.h.StackTrace.Do(c, req)
case *dap.EvaluateRequest:
return s.h.Evaluate.Do(c, req)
case *dap.SourceRequest:
return s.h.Source.Do(c, req)
default:
return nil, errors.New("not implemented")
}
Expand Down
Loading
Loading