Skip to content

Commit bac335d

Browse files
committed
dap: support evaluate request to invoke a container
Supports using the `evaluate` request in REPL mode to start a container with the `exec` command. Presently doesn't support any arguments. This improves the dap server so it is capable of sending reverse requests and receiving the response. It also adds a hidden command `dap attach` that attaches to the socket created by `evaluate`. This requires the client to support `runInTerminal`. Likely needs some additional work to make sure resources are cleaned up cleanly especially when the build is unpaused or terminated, but it should work as a decent base. Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
1 parent b160d3d commit bac335d

File tree

5 files changed

+252
-7
lines changed

5 files changed

+252
-7
lines changed

commands/dap.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ package commands
33
import (
44
"context"
55
"io"
6+
"net"
7+
"os"
68

79
"github.com/containerd/console"
810
"github.com/docker/buildx/dap"
911
"github.com/docker/buildx/util/cobrautil"
1012
"github.com/docker/buildx/util/ioset"
1113
"github.com/docker/buildx/util/progress"
14+
"github.com/docker/cli/cli"
1215
"github.com/docker/cli/cli/command"
1316
"github.com/pkg/errors"
1417
"github.com/spf13/cobra"
@@ -28,6 +31,7 @@ func dapCmd(dockerCli command.Cli, rootOpts *rootOptions) *cobra.Command {
2831
cobrautil.MarkFlagsExperimental(flags, "on")
2932

3033
cmd.AddCommand(buildCmd(dockerCli, rootOpts, &options))
34+
cmd.AddCommand(dapAttachCmd())
3135
return cmd
3236
}
3337

@@ -92,3 +96,39 @@ func (fakeConsole) Fd() uintptr {
9296
func (fakeConsole) Name() string {
9397
return ""
9498
}
99+
100+
func dapAttachCmd() *cobra.Command {
101+
cmd := &cobra.Command{
102+
Use: "attach PATH",
103+
Short: "Attach to a container created by the dap evaluate request",
104+
Args: cli.ExactArgs(1),
105+
Hidden: true,
106+
RunE: func(cmd *cobra.Command, args []string) error {
107+
c, err := console.ConsoleFromFile(os.Stdout)
108+
if err != nil {
109+
return err
110+
}
111+
112+
if err := c.SetRaw(); err != nil {
113+
return err
114+
}
115+
116+
conn, err := net.Dial("unix", args[0])
117+
if err != nil {
118+
return err
119+
}
120+
121+
fwd := ioset.NewSingleForwarder()
122+
fwd.SetReader(os.Stdin)
123+
fwd.SetWriter(conn, func() io.WriteCloser {
124+
return conn
125+
})
126+
127+
if _, err := io.Copy(os.Stdout, conn); err != nil && !errors.Is(err, io.EOF) {
128+
return err
129+
}
130+
return nil
131+
},
132+
}
133+
return cmd
134+
}

dap/adapter.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type Adapter struct {
2727
threads map[int]*thread
2828
threadsMu sync.RWMutex
2929
nextThreadID int
30+
31+
supportsExec bool
3032
}
3133

3234
func New(cfg *build.InvokeConfig) *Adapter {
@@ -86,6 +88,9 @@ func (d *Adapter) Stop() error {
8688
func (d *Adapter) Initialize(c Context, req *dap.InitializeRequest, resp *dap.InitializeResponse) error {
8789
close(d.initialized)
8890

91+
// Set parameters based on passed client capabilities.
92+
d.supportsExec = req.Arguments.SupportsRunInTerminalRequest
93+
8994
// Set capabilities.
9095
resp.Body.SupportsConfigurationDoneRequest = true
9196
return nil
@@ -202,6 +207,20 @@ func (d *Adapter) getThread(id int) (t *thread) {
202207
return t
203208
}
204209

210+
func (d *Adapter) getCurrentThread() (t *thread) {
211+
d.threadsMu.Lock()
212+
defer d.threadsMu.Unlock()
213+
214+
for _, thread := range d.threads {
215+
if thread.isPaused() {
216+
if t == nil || thread.id < t.id {
217+
t = thread
218+
}
219+
}
220+
}
221+
return t
222+
}
223+
205224
func (d *Adapter) deleteThread(ctx Context, t *thread) {
206225
d.threadsMu.Lock()
207226
delete(d.threads, t.id)
@@ -325,6 +344,7 @@ func (d *Adapter) dapHandler() Handler {
325344
ConfigurationDone: d.ConfigurationDone,
326345
Threads: d.Threads,
327346
StackTrace: d.StackTrace,
347+
Evaluate: d.Evaluate,
328348
}
329349
}
330350

@@ -379,6 +399,13 @@ func (t *thread) pause(c Context, rCtx *build.ResultHandle, reason, desc string)
379399
return t.paused
380400
}
381401

402+
func (t *thread) isPaused() bool {
403+
t.mu.Lock()
404+
defer t.mu.Unlock()
405+
406+
return t.paused != nil
407+
}
408+
382409
func (t *thread) Resume(c Context) {
383410
t.mu.Lock()
384411
defer t.mu.Unlock()

dap/eval.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package dap
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"os"
8+
"path/filepath"
9+
10+
"github.com/docker/buildx/build"
11+
"github.com/docker/cli/cli-plugins/metadata"
12+
"github.com/google/go-dap"
13+
"github.com/google/shlex"
14+
"github.com/pkg/errors"
15+
)
16+
17+
func (d *Adapter) Evaluate(ctx Context, req *dap.EvaluateRequest, resp *dap.EvaluateResponse) error {
18+
if req.Arguments.Context != "repl" {
19+
return errors.Errorf("unsupported evaluate context: %s", req.Arguments.Context)
20+
}
21+
22+
args, err := shlex.Split(req.Arguments.Expression)
23+
if err != nil {
24+
return errors.Wrapf(err, "cannot parse expression")
25+
}
26+
27+
if len(args) == 0 {
28+
return nil
29+
}
30+
31+
switch arg0 := args[0]; arg0 {
32+
case "exec":
33+
if !d.supportsExec {
34+
return errors.New("dap client does not support runInTerminalRequest")
35+
}
36+
37+
t := d.getCurrentThread()
38+
if t == nil {
39+
return errors.New("no paused thread for exec command")
40+
}
41+
42+
argv := args[1:]
43+
if err := t.Exec(ctx, argv, resp); err != nil {
44+
return err
45+
}
46+
return nil
47+
default:
48+
return errors.Errorf("unknown evalute command: %q", arg0)
49+
}
50+
}
51+
52+
func (t *thread) Exec(ctx Context, args []string, eresp *dap.EvaluateResponse) (retErr error) {
53+
cfg := &build.InvokeConfig{Tty: true}
54+
if len(cfg.Entrypoint) == 0 && len(cfg.Cmd) == 0 {
55+
cfg.Entrypoint = []string{"/bin/sh"} // launch shell by default
56+
cfg.Cmd = []string{}
57+
cfg.NoCmd = false
58+
}
59+
60+
ctr, err := build.NewContainer(ctx, t.rCtx, cfg)
61+
if err != nil {
62+
return err
63+
}
64+
defer func() {
65+
if retErr != nil {
66+
ctr.Cancel()
67+
}
68+
}()
69+
70+
dir, err := os.MkdirTemp("", "buildx-dap-exec")
71+
if err != nil {
72+
return err
73+
}
74+
defer func() {
75+
if retErr != nil {
76+
os.RemoveAll(dir)
77+
}
78+
}()
79+
80+
socketPath := filepath.Join(dir, "s.sock")
81+
l, err := net.Listen("unix", socketPath)
82+
if err != nil {
83+
return err
84+
}
85+
86+
go func() {
87+
defer os.RemoveAll(dir)
88+
t.runExec(l, ctr, cfg)
89+
}()
90+
91+
// TODO: this should work in standalone mode too.
92+
docker := os.Getenv(metadata.ReexecEnvvar)
93+
req := &dap.RunInTerminalRequest{
94+
Request: dap.Request{
95+
Command: "runInTerminal",
96+
},
97+
Arguments: dap.RunInTerminalRequestArguments{
98+
Kind: "integrated",
99+
Args: []string{docker, "buildx", "dap", "attach", socketPath},
100+
Env: map[string]any{
101+
"BUILDX_EXPERIMENTAL": "1",
102+
},
103+
},
104+
}
105+
106+
resp := ctx.Request(req)
107+
if !resp.GetResponse().Success {
108+
return errors.New(resp.GetResponse().Message)
109+
}
110+
111+
eresp.Body.Result = fmt.Sprintf("Started process attached to %s.", socketPath)
112+
return nil
113+
}
114+
115+
func (t *thread) runExec(l net.Listener, ctr *build.Container, cfg *build.InvokeConfig) {
116+
defer l.Close()
117+
defer ctr.Cancel()
118+
119+
conn, err := l.Accept()
120+
if err != nil {
121+
return
122+
}
123+
defer conn.Close()
124+
125+
// start a background goroutine to politely refuse any subsequent connections.
126+
go func() {
127+
for {
128+
conn, err := l.Accept()
129+
if err != nil {
130+
return
131+
}
132+
fmt.Fprint(conn, "Error: Already connected to exec instance.")
133+
conn.Close()
134+
}
135+
}()
136+
ctr.Exec(context.Background(), cfg, conn, conn, conn)
137+
}

dap/handler.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Context interface {
1212
context.Context
1313
C() chan<- dap.Message
1414
Go(f func(c Context)) bool
15+
Request(req dap.RequestMessage) dap.ResponseMessage
1516
}
1617

1718
type dispatchContext struct {
@@ -28,6 +29,14 @@ func (c *dispatchContext) Go(f func(c Context)) bool {
2829
return c.srv.Go(f)
2930
}
3031

32+
func (c *dispatchContext) Request(req dap.RequestMessage) dap.ResponseMessage {
33+
respCh := make(chan dap.ResponseMessage, 1)
34+
c.srv.doRequest(c, req, func(c Context, resp dap.ResponseMessage) {
35+
respCh <- resp
36+
})
37+
return <-respCh
38+
}
39+
3140
type HandlerFunc[Req dap.RequestMessage, Resp dap.ResponseMessage] func(c Context, req Req, resp Resp) error
3241

3342
func (h HandlerFunc[Req, Resp]) Do(c Context, req Req) (resp Resp, err error) {

dap/server.go

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@ package dap
33
import (
44
"context"
55
"sync"
6+
"sync/atomic"
67

78
"github.com/docker/buildx/build"
89
"github.com/google/go-dap"
910
"github.com/pkg/errors"
1011
"golang.org/x/sync/errgroup"
1112
)
1213

14+
type RequestCallback func(c Context, resp dap.ResponseMessage)
15+
1316
type Server struct {
1417
h Handler
1518

@@ -20,6 +23,8 @@ type Server struct {
2023
ctx context.Context
2124
cancel context.CancelCauseFunc
2225

26+
seq atomic.Int64
27+
requests sync.Map
2328
initialized bool
2429
}
2530

@@ -72,14 +77,18 @@ func (s *Server) readLoop(conn Conn) error {
7277

7378
switch m := m.(type) {
7479
case dap.RequestMessage:
75-
if ok := s.dispatch(m); !ok {
80+
if ok := s.dispatchRequest(m); !ok {
81+
return nil
82+
}
83+
case dap.ResponseMessage:
84+
if ok := s.dispatchResponse(m); !ok {
7685
return nil
7786
}
7887
}
7988
}
8089
}
8190

82-
func (s *Server) dispatch(m dap.RequestMessage) bool {
91+
func (s *Server) dispatchRequest(m dap.RequestMessage) bool {
8392
fn := func(c Context) {
8493
rmsg, err := s.handleMessage(c, m)
8594
if err != nil {
@@ -94,6 +103,19 @@ func (s *Server) dispatch(m dap.RequestMessage) bool {
94103
return s.Go(fn)
95104
}
96105

106+
func (s *Server) dispatchResponse(m dap.ResponseMessage) bool {
107+
fn := func(c Context) {
108+
reqID := m.GetResponse().RequestSeq
109+
if v, loaded := s.requests.LoadAndDelete(reqID); loaded {
110+
callback := v.(RequestCallback)
111+
s.Go(func(c Context) {
112+
callback(c, m)
113+
})
114+
}
115+
}
116+
return s.Go(fn)
117+
}
118+
97119
func (s *Server) handleMessage(c Context, m dap.Message) (dap.ResponseMessage, error) {
98120
switch req := m.(type) {
99121
case *dap.InitializeRequest:
@@ -143,20 +165,24 @@ func (s *Server) handleInitialize(c Context, req *dap.InitializeRequest) (*dap.I
143165
}
144166

145167
func (s *Server) writeLoop(conn Conn, respCh <-chan dap.Message) error {
146-
var seq int
147168
for m := range respCh {
148169
switch m := m.(type) {
149170
case dap.RequestMessage:
150-
m.GetRequest().Seq = seq
171+
if req := m.GetRequest(); req.Seq == 0 {
172+
req.Seq = int(s.seq.Add(1))
173+
}
151174
m.GetRequest().Type = "request"
152175
case dap.EventMessage:
153-
m.GetEvent().Seq = seq
176+
if event := m.GetEvent(); event.Seq == 0 {
177+
event.Seq = int(s.seq.Add(1))
178+
}
154179
m.GetEvent().Type = "event"
155180
case dap.ResponseMessage:
156-
m.GetResponse().Seq = seq
181+
if resp := m.GetResponse(); resp.Seq == 0 {
182+
resp.Seq = int(s.seq.Add(1))
183+
}
157184
m.GetResponse().Type = "response"
158185
}
159-
seq++
160186

161187
if err := conn.SendMsg(m); err != nil {
162188
return err
@@ -196,6 +222,12 @@ func (s *Server) Go(fn func(c Context)) bool {
196222
return <-started
197223
}
198224

225+
func (s *Server) doRequest(c Context, req dap.RequestMessage, callback RequestCallback) {
226+
req.GetRequest().Seq = int(s.seq.Add(1))
227+
s.requests.Store(req.GetRequest().Seq, callback)
228+
c.C() <- req
229+
}
230+
199231
func (s *Server) Stop() {
200232
s.mu.Lock()
201233
s.ch = nil

0 commit comments

Comments
 (0)