-
Notifications
You must be signed in to change notification settings - Fork 40
extend process spawn to allow allocating a tty, attaching #96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
f0f1f14
7d69152
33e6f74
b382d62
55adbad
b4ff4e0
4f6272a
7d47214
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,8 @@ import ( | |
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "net" | ||
| "net/http" | ||
| "os" | ||
| "os/exec" | ||
| "os/user" | ||
|
|
@@ -19,10 +21,12 @@ import ( | |
| "syscall" | ||
| "time" | ||
|
|
||
| "github.com/creack/pty" | ||
| "github.com/google/uuid" | ||
| openapi_types "github.com/oapi-codegen/runtime/types" | ||
| "github.com/onkernel/kernel-images/server/lib/logger" | ||
| oapi "github.com/onkernel/kernel-images/server/lib/oapi" | ||
| "golang.org/x/sys/unix" | ||
| ) | ||
|
|
||
| type processHandle struct { | ||
|
|
@@ -34,9 +38,13 @@ type processHandle struct { | |
| stdin io.WriteCloser | ||
| stdout io.ReadCloser | ||
| stderr io.ReadCloser | ||
| ptyFile *os.File | ||
| isTTY bool | ||
| outCh chan oapi.ProcessStreamEvent | ||
| doneCh chan struct{} | ||
| mu sync.RWMutex | ||
| // attachActive guards PTY attach sessions; only one client may be attached at a time. | ||
| attachActive bool | ||
| } | ||
|
|
||
| func (h *processHandle) state() string { | ||
|
|
@@ -223,21 +231,64 @@ func (s *ApiService) ProcessSpawn(ctx context.Context, request oapi.ProcessSpawn | |
| return oapi.ProcessSpawn400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: err.Error()}}, nil | ||
| } | ||
|
|
||
| stdout, err := cmd.StdoutPipe() | ||
| if err != nil { | ||
| return oapi.ProcessSpawn500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to open stdout"}}, nil | ||
| } | ||
| stderr, err := cmd.StderrPipe() | ||
| if err != nil { | ||
| return oapi.ProcessSpawn500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to open stderr"}}, nil | ||
| } | ||
| stdin, err := cmd.StdinPipe() | ||
| if err != nil { | ||
| return oapi.ProcessSpawn500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to open stdin"}}, nil | ||
| } | ||
| if err := cmd.Start(); err != nil { | ||
| log.Error("failed to start process", "err", err) | ||
| return oapi.ProcessSpawn500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to start process"}}, nil | ||
| var ( | ||
| stdout io.ReadCloser | ||
| stderr io.ReadCloser | ||
| stdin io.WriteCloser | ||
| ptyFile *os.File | ||
| isTTY bool | ||
| ) | ||
| // PTY mode when requested | ||
| if request.Body.AllocateTty != nil && *request.Body.AllocateTty { | ||
| // Ensure TERM and initial size env | ||
| hasTerm := false | ||
| for _, kv := range cmd.Env { | ||
| if strings.HasPrefix(kv, "TERM=") { | ||
| hasTerm = true | ||
| break | ||
| } | ||
| } | ||
| if !hasTerm { | ||
| cmd.Env = append(cmd.Env, "TERM=xterm-256color") | ||
| } | ||
| // Start with PTY | ||
| var errStart error | ||
| ptyFile, errStart = pty.Start(cmd) | ||
| if errStart != nil { | ||
| log.Error("failed to start PTY process", "err", errStart) | ||
| return oapi.ProcessSpawn500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to start process"}}, nil | ||
| } | ||
| // Set initial size if provided | ||
| var rows, cols uint16 | ||
| if request.Body.Rows != nil && *request.Body.Rows > 0 { | ||
| rows = uint16(*request.Body.Rows) | ||
| } | ||
| if request.Body.Cols != nil && *request.Body.Cols > 0 { | ||
| cols = uint16(*request.Body.Cols) | ||
| } | ||
| if rows > 0 && cols > 0 { | ||
| _ = pty.Setsize(ptyFile, &pty.Winsize{Rows: rows, Cols: cols}) | ||
| } | ||
| stdout = ptyFile | ||
| stdin = ptyFile | ||
| isTTY = true | ||
| } else { | ||
| stdout, err = cmd.StdoutPipe() | ||
| if err != nil { | ||
| return oapi.ProcessSpawn500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to open stdout"}}, nil | ||
| } | ||
| stderr, err = cmd.StderrPipe() | ||
| if err != nil { | ||
| return oapi.ProcessSpawn500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to open stderr"}}, nil | ||
| } | ||
| stdin, err = cmd.StdinPipe() | ||
| if err != nil { | ||
| return oapi.ProcessSpawn500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to open stdin"}}, nil | ||
| } | ||
| if err := cmd.Start(); err != nil { | ||
| log.Error("failed to start process", "err", err) | ||
| return oapi.ProcessSpawn500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to start process"}}, nil | ||
| } | ||
| } | ||
|
|
||
| id := openapi_types.UUID(uuid.New()) | ||
|
|
@@ -249,6 +300,8 @@ func (s *ApiService) ProcessSpawn(ctx context.Context, request oapi.ProcessSpawn | |
| stdin: stdin, | ||
| stdout: stdout, | ||
| stderr: stderr, | ||
| ptyFile: ptyFile, | ||
| isTTY: isTTY, | ||
| outCh: make(chan oapi.ProcessStreamEvent, 256), | ||
| doneCh: make(chan struct{}), | ||
| } | ||
|
|
@@ -262,37 +315,40 @@ func (s *ApiService) ProcessSpawn(ctx context.Context, request oapi.ProcessSpawn | |
| s.procMu.Unlock() | ||
|
|
||
| // Reader goroutines | ||
| go func() { | ||
| reader := bufio.NewReader(stdout) | ||
| buf := make([]byte, 4096) | ||
| for { | ||
| n, err := reader.Read(buf) | ||
| if n > 0 { | ||
| data := base64.StdEncoding.EncodeToString(buf[:n]) | ||
| stream := oapi.ProcessStreamEventStream("stdout") | ||
| h.outCh <- oapi.ProcessStreamEvent{Stream: &stream, DataB64: &data} | ||
| } | ||
| if err != nil { | ||
| break | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| go func() { | ||
| reader := bufio.NewReader(stderr) | ||
| buf := make([]byte, 4096) | ||
| for { | ||
| n, err := reader.Read(buf) | ||
| if n > 0 { | ||
| data := base64.StdEncoding.EncodeToString(buf[:n]) | ||
| stream := oapi.ProcessStreamEventStream("stderr") | ||
| h.outCh <- oapi.ProcessStreamEvent{Stream: &stream, DataB64: &data} | ||
| // In PTY mode, do NOT read from the PTY here to avoid competing with the /attach endpoint. | ||
| // In non‑PTY mode, stdout and stderr are separate pipes, so we run two readers and tag chunks accordingly. | ||
| if !isTTY { | ||
| go func() { | ||
| reader := bufio.NewReader(stdout) | ||
| buf := make([]byte, 4096) | ||
| for { | ||
| n, err := reader.Read(buf) | ||
| if n > 0 { | ||
| data := base64.StdEncoding.EncodeToString(buf[:n]) | ||
| stream := oapi.ProcessStreamEventStream("stdout") | ||
| h.outCh <- oapi.ProcessStreamEvent{Stream: &stream, DataB64: &data} | ||
| } | ||
| if err != nil { | ||
| break | ||
| } | ||
| } | ||
| if err != nil { | ||
| break | ||
| }() | ||
| go func() { | ||
| reader := bufio.NewReader(stderr) | ||
| buf := make([]byte, 4096) | ||
| for { | ||
| n, err := reader.Read(buf) | ||
| if n > 0 { | ||
| data := base64.StdEncoding.EncodeToString(buf[:n]) | ||
| stream := oapi.ProcessStreamEventStream("stderr") | ||
| h.outCh <- oapi.ProcessStreamEvent{Stream: &stream, DataB64: &data} | ||
| } | ||
| if err != nil { | ||
| break | ||
| } | ||
| } | ||
| } | ||
| }() | ||
| }() | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Waiter goroutine | ||
| go func() { | ||
|
|
@@ -490,3 +546,158 @@ func (s *ApiService) ProcessStdoutStream(ctx context.Context, request oapi.Proce | |
| } | ||
|
|
||
| func ptrOf[T any](v T) *T { return &v } | ||
|
|
||
| // Resize PTY-backed process | ||
| // (POST /process/{process_id}/resize) | ||
| func (s *ApiService) ProcessResize(ctx context.Context, request oapi.ProcessResizeRequestObject) (oapi.ProcessResizeResponseObject, error) { | ||
| id := request.ProcessId.String() | ||
| if request.Body == nil { | ||
| return oapi.ProcessResize400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "request body required"}}, nil | ||
| } | ||
| rows := request.Body.Rows | ||
| cols := request.Body.Cols | ||
| if rows <= 0 || cols <= 0 { | ||
| return oapi.ProcessResize400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "rows and cols must be > 0"}}, nil | ||
| } | ||
| s.procMu.RLock() | ||
| h, ok := s.procs[id] | ||
| s.procMu.RUnlock() | ||
| if !ok { | ||
| return oapi.ProcessResize404JSONResponse{NotFoundErrorJSONResponse: oapi.NotFoundErrorJSONResponse{Message: "process not found"}}, nil | ||
| } | ||
| if !h.isTTY || h.ptyFile == nil { | ||
| return oapi.ProcessResize400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "process is not PTY-backed"}}, nil | ||
| } | ||
| ws := &pty.Winsize{Rows: uint16(rows), Cols: uint16(cols)} | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if err := pty.Setsize(h.ptyFile, ws); err != nil { | ||
| return oapi.ProcessResize500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to resize PTY"}}, nil | ||
| } | ||
| return oapi.ProcessResize200JSONResponse(oapi.OkResponse{Ok: true}), nil | ||
| } | ||
|
|
||
| // HandleProcessAttach performs a raw HTTP hijack and shuttles bytes between the client and the PTY. | ||
| // This endpoint is intentionally not defined in OpenAPI. | ||
| func (s *ApiService) HandleProcessAttach(w http.ResponseWriter, r *http.Request, id string) { | ||
| s.procMu.RLock() | ||
| h, ok := s.procs[id] | ||
| s.procMu.RUnlock() | ||
| if !ok { | ||
| http.Error(w, "process not found", http.StatusNotFound) | ||
| return | ||
| } | ||
| if !h.isTTY || h.ptyFile == nil { | ||
| http.Error(w, "process is not PTY-backed", http.StatusBadRequest) | ||
| return | ||
| } | ||
| // Enforce single concurrent attach per PTY-backed process to avoid I/O corruption. | ||
| h.mu.Lock() | ||
| if h.attachActive { | ||
| h.mu.Unlock() | ||
| http.Error(w, "process already has an active attach session", http.StatusConflict) | ||
| return | ||
| } | ||
| h.attachActive = true | ||
| h.mu.Unlock() | ||
| // Ensure the flag is cleared when this handler exits (client disconnects or process ends). | ||
| defer func() { | ||
| h.mu.Lock() | ||
| h.attachActive = false | ||
| h.mu.Unlock() | ||
| }() | ||
| hj, ok := w.(http.Hijacker) | ||
| if !ok { | ||
| http.Error(w, "hijacking not supported", http.StatusInternalServerError) | ||
| return | ||
| } | ||
| conn, buf, err := hj.Hijack() | ||
| if err != nil { | ||
| http.Error(w, "failed to hijack connection", http.StatusInternalServerError) | ||
| return | ||
| } | ||
| // Write minimal HTTP response and switch to raw I/O | ||
| _, _ = buf.WriteString("HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n") | ||
| _ = buf.Flush() | ||
|
|
||
| clientConn := conn | ||
| processRW := h.ptyFile | ||
| // Coordinate shutdown so that both pumps exit when either side closes. | ||
| done := make(chan struct{}) | ||
| var once sync.Once | ||
| shutdown := func() { | ||
| once.Do(func() { | ||
| _ = clientConn.Close() | ||
| close(done) | ||
| }) | ||
| } | ||
|
|
||
| // Pipe: client -> process | ||
| go func() { | ||
| _, _ = io.Copy(processRW, clientConn) | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| shutdown() | ||
|
||
| }() | ||
| // Pipe: process -> client (non-blocking reads to allow early shutdown) | ||
| go func() { | ||
| copyPTYToConn(processRW, clientConn, done) | ||
| shutdown() | ||
| }() | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Close when process exits | ||
| go func() { | ||
| <-h.doneCh | ||
| shutdown() | ||
| }() | ||
|
|
||
| // Keep handler alive until shutdown triggered | ||
| <-done | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // copyPTYToConn copies from a PTY (os.File) to a net.Conn without mutating the | ||
| // PTY's file status flags. It uses readiness polling so we can wake up and exit | ||
| // when stop is closed, avoiding goroutine leaks and preserving blocking mode. | ||
| func copyPTYToConn(ptyFile *os.File, conn net.Conn, stop <-chan struct{}) { | ||
| fd := int(ptyFile.Fd()) | ||
| buf := make([]byte, 32*1024) | ||
| // Poll in short intervals so we can react quickly to stop signal. | ||
| for { | ||
| // Check for stop first to avoid extra reads after shutdown. | ||
| select { | ||
| case <-stop: | ||
| return | ||
| default: | ||
| } | ||
| pfds := []unix.PollFd{ | ||
| {Fd: int32(fd), Events: unix.POLLIN}, | ||
| } | ||
| _, perr := unix.Poll(pfds, 100) // 100ms | ||
| if perr != nil && perr != syscall.EINTR { | ||
| return | ||
| } | ||
| // If readable (or hangup/err), attempt a read. | ||
| if pfds[0].Revents&(unix.POLLIN|unix.POLLHUP|unix.POLLERR) == 0 { | ||
| // Not ready; loop around and re-check stop. | ||
| continue | ||
| } | ||
| n, rerr := ptyFile.Read(buf) | ||
| if n > 0 { | ||
| if _, werr := conn.Write(buf[:n]); werr != nil { | ||
| return | ||
| } | ||
| } | ||
| if rerr != nil { | ||
| if rerr == io.EOF { | ||
| return | ||
| } | ||
| if errno, ok := rerr.(syscall.Errno); ok { | ||
| // EIO is observed on PTY when the slave closes; treat as EOF. | ||
| if errno == syscall.EIO { | ||
| return | ||
| } | ||
| // Spurious would-block after poll; just continue. | ||
| if errno == syscall.EAGAIN || errno == syscall.EWOULDBLOCK { | ||
| continue | ||
| } | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incorrect error type assertion fails to handle errnoThe type assertion |
||
| return | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.