Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/execd-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ jobs:
./tests/smoke.sh

sleep 5
cat execd.log
cat /tmp/jupyter.log
curl -v localhost:44772/ping
python3 tests/smoke_api.py
- name: Show logs
if: always()
run: cat components/execd/execd.log
44 changes: 28 additions & 16 deletions components/execd/pkg/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os/exec"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -59,10 +60,14 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest
cmd.Env = mergeEnvs(os.Environ(), loadExtraEnvFromFile())

done := make(chan struct{}, 1)
var wg sync.WaitGroup
wg.Add(2)
safego.Go(func() {
defer wg.Done()
c.tailStdPipe(stdoutPath, request.Hooks.OnExecuteStdout, done)
})
safego.Go(func() {
defer wg.Done()
c.tailStdPipe(stderrPath, request.Hooks.OnExecuteStderr, done)
})

Expand All @@ -79,11 +84,13 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest
}

kernel := &commandKernel{
pid: cmd.Process.Pid,
stdoutPath: stdoutPath,
stderrPath: stderrPath,
startedAt: startAt,
running: true,
pid: cmd.Process.Pid,
stdoutPath: stdoutPath,
stderrPath: stderrPath,
startedAt: startAt,
running: true,
content: request.Code,
isBackground: false,
}
c.storeCommandKernel(session, kernel)
request.Hooks.OnExecuteInit(session)
Expand All @@ -107,6 +114,7 @@ func (c *Controller) runCommand(ctx context.Context, request *ExecuteCodeRequest

err = cmd.Wait()
close(done)
wg.Wait()
if err != nil {
var eName, eValue string
var eCode int
Expand Down Expand Up @@ -146,12 +154,12 @@ func (c *Controller) runBackgroundCommand(_ context.Context, request *ExecuteCod
session := c.newContextID()
request.Hooks.OnExecuteInit(session)

stdout, stderr, err := c.stdLogDescriptor(session)
pipe, err := c.combinedOutputDescriptor(session)
if err != nil {
return fmt.Errorf("failed to get stdlog descriptor: %w", err)
return fmt.Errorf("failed to get combined output descriptor: %w", err)
}
stdoutPath := c.stdoutFileName(session)
stderrPath := c.stderrFileName(session)
stdoutPath := c.combinedOutputFileName(session)
stderrPath := c.combinedOutputFileName(session)

signals := make(chan os.Signal, 1)
defer close(signals)
Expand All @@ -164,21 +172,25 @@ func (c *Controller) runBackgroundCommand(_ context.Context, request *ExecuteCod

cmd.Dir = request.Cwd
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
cmd.Stdout = stdout
cmd.Stderr = stderr
cmd.Stdout = pipe
cmd.Stderr = pipe
cmd.Env = mergeEnvs(os.Environ(), loadExtraEnvFromFile())

// use DevNull as stdin so interactive programs exit immediately.
cmd.Stdin = os.NewFile(uintptr(syscall.Stdin), os.DevNull)

safego.Go(func() {
defer pipe.Close()

err := cmd.Start()
kernel := &commandKernel{
pid: -1,
stdoutPath: stdoutPath,
stderrPath: stderrPath,
startedAt: startAt,
running: true,
pid: -1,
stdoutPath: stdoutPath,
stderrPath: stderrPath,
startedAt: startAt,
running: true,
content: request.Code,
isBackground: true,
}

if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions components/execd/pkg/runtime/command_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (c *Controller) stdLogDescriptor(session string) (io.WriteCloser, io.WriteC
return stdout, stderr, nil
}

func (c *Controller) combinedOutputDescriptor(session string) (io.WriteCloser, error) {
return os.OpenFile(c.combinedOutputFileName(session), os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
}

// stdoutFileName constructs the stdout log path.
func (c *Controller) stdoutFileName(session string) string {
return filepath.Join(os.TempDir(), session+".stdout")
Expand All @@ -80,6 +84,10 @@ func (c *Controller) stderrFileName(session string) string {
return filepath.Join(os.TempDir(), session+".stderr")
}

func (c *Controller) combinedOutputFileName(session string) string {
return filepath.Join(os.TempDir(), session+".output")
}

// readFromPos streams new content from a file starting at startPos.
func (c *Controller) readFromPos(filepath string, startPos int64, onExecute func(string)) int64 {
file, err := os.Open(filepath)
Expand Down
45 changes: 29 additions & 16 deletions components/execd/pkg/runtime/command_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package runtime

import (
"fmt"
"io"
"os"
"time"
)
Expand All @@ -28,6 +29,7 @@ type CommandStatus struct {
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
Content string `json:"content,omitempty"`
}

// CommandOutput contains non-streamed stdout/stderr plus status.
Expand Down Expand Up @@ -64,36 +66,47 @@ func (c *Controller) GetCommandStatus(session string) (*CommandStatus, error) {
Error: kernel.errMsg,
StartedAt: kernel.startedAt,
FinishedAt: kernel.finishedAt,
Content: kernel.content,
}
return status, nil
}

// GetCommandOutput returns accumulated stdout/stderr and status for a session.
func (c *Controller) GetCommandOutput(session string) (*CommandOutput, error) {
// SeekBackgroundCommandOutput returns accumulated stdout/stderr and status for a session.
func (c *Controller) SeekBackgroundCommandOutput(session string, cursor int64) ([]byte, int64, error) {
kernel := c.commandSnapshot(session)
if kernel == nil {
return nil, fmt.Errorf("command not found: %s", session)
return nil, -1, fmt.Errorf("command not found: %s", session)
}

if !kernel.isBackground {
return nil, -1, fmt.Errorf("command %s is not running in background", session)
}

status, err := c.GetCommandStatus(session)
file, err := os.Open(kernel.stdoutPath)
if err != nil {
return nil, err
return nil, -1, fmt.Errorf("error open combined output file for command %s: %w", session, err)
}
defer file.Close()

stdout, err := os.ReadFile(kernel.stdoutPath)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("read stdout: %w", err)
// Seek to the cursor position
_, err = file.Seek(cursor, 0)
if err != nil {
return nil, -1, fmt.Errorf("error seek file: %w", err)
}
stderr, err := os.ReadFile(kernel.stderrPath)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("read stderr: %w", err)

// Read all content from cursor to end
data, err := io.ReadAll(file)
if err != nil {
return nil, -1, fmt.Errorf("error read file: %w", err)
}

// Get current file position (end of file)
currentPos, err := file.Seek(0, 1)
if err != nil {
return nil, -1, fmt.Errorf("error get current position: %w", err)
}

return &CommandOutput{
CommandStatus: *status,
Stdout: string(stdout),
Stderr: string(stderr),
}, nil
return data, currentPos, nil
}

// markCommandFinished updates bookkeeping when a command exits.
Expand Down
Loading
Loading