|
1 | 1 | package exec |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "bufio" |
4 | 5 | "bytes" |
5 | 6 | "context" |
| 7 | + "io" |
6 | 8 | "os/exec" |
7 | 9 |
|
8 | 10 | "github.com/valyentdev/ravel/api" |
@@ -49,3 +51,92 @@ func Exec(ctx context.Context, opts api.ExecOptions) (*api.ExecResult, error) { |
49 | 51 | ExitCode: exitCode, |
50 | 52 | }, nil |
51 | 53 | } |
| 54 | + |
| 55 | +// ExecOutputLine represents a single line of output from a streaming exec. |
| 56 | +type ExecOutputLine struct { |
| 57 | + Stream string `json:"stream"` // "stdout" or "stderr" |
| 58 | + Data string `json:"data"` |
| 59 | +} |
| 60 | + |
| 61 | +// ExecStreamResult is sent when the command completes. |
| 62 | +type ExecStreamResult struct { |
| 63 | + ExitCode int `json:"exit_code"` |
| 64 | +} |
| 65 | + |
| 66 | +// ExecStream executes a command and streams output line by line. |
| 67 | +// This is useful for long-running commands in AI sandboxes. |
| 68 | +func ExecStream(ctx context.Context, opts api.ExecOptions, outputCh chan<- ExecOutputLine) (*ExecStreamResult, error) { |
| 69 | + defer close(outputCh) |
| 70 | + |
| 71 | + if len(opts.Cmd) == 0 { |
| 72 | + return nil, errdefs.NewInvalidArgument("cmd cannot be empty") |
| 73 | + } |
| 74 | + |
| 75 | + name := opts.Cmd[0] |
| 76 | + args := opts.Cmd[1:] |
| 77 | + |
| 78 | + timeoutCtx, cancel := context.WithTimeout(ctx, opts.GetTimeout()) |
| 79 | + defer cancel() |
| 80 | + |
| 81 | + cmd := exec.CommandContext(timeoutCtx, name, args...) |
| 82 | + if cmd.Err != nil { |
| 83 | + return nil, errdefs.NewInvalidArgument(cmd.Err.Error()) |
| 84 | + } |
| 85 | + |
| 86 | + // Get stdout and stderr pipes |
| 87 | + stdoutPipe, err := cmd.StdoutPipe() |
| 88 | + if err != nil { |
| 89 | + return nil, errdefs.NewUnknown("failed to create stdout pipe: " + err.Error()) |
| 90 | + } |
| 91 | + |
| 92 | + stderrPipe, err := cmd.StderrPipe() |
| 93 | + if err != nil { |
| 94 | + return nil, errdefs.NewUnknown("failed to create stderr pipe: " + err.Error()) |
| 95 | + } |
| 96 | + |
| 97 | + cmd.Stdin = nil |
| 98 | + |
| 99 | + // Start the command |
| 100 | + if err := cmd.Start(); err != nil { |
| 101 | + return nil, errdefs.NewUnknown("failed to start command: " + err.Error()) |
| 102 | + } |
| 103 | + |
| 104 | + // Stream stdout and stderr concurrently |
| 105 | + done := make(chan struct{}, 2) |
| 106 | + |
| 107 | + go streamPipe(stdoutPipe, "stdout", outputCh, done) |
| 108 | + go streamPipe(stderrPipe, "stderr", outputCh, done) |
| 109 | + |
| 110 | + // Wait for both pipes to finish |
| 111 | + <-done |
| 112 | + <-done |
| 113 | + |
| 114 | + // Wait for command to complete |
| 115 | + err = cmd.Wait() |
| 116 | + exitCode := -1 |
| 117 | + if cmd.ProcessState != nil { |
| 118 | + exitCode = cmd.ProcessState.ExitCode() |
| 119 | + } |
| 120 | + |
| 121 | + // Ignore exit errors - we just care about the exit code |
| 122 | + if _, ok := err.(*exec.ExitError); ok { |
| 123 | + err = nil |
| 124 | + } |
| 125 | + |
| 126 | + return &ExecStreamResult{ExitCode: exitCode}, err |
| 127 | +} |
| 128 | + |
| 129 | +func streamPipe(pipe io.ReadCloser, stream string, outputCh chan<- ExecOutputLine, done chan<- struct{}) { |
| 130 | + defer func() { done <- struct{}{} }() |
| 131 | + |
| 132 | + scanner := bufio.NewScanner(pipe) |
| 133 | + // Increase buffer size for lines up to 1MB |
| 134 | + scanner.Buffer(make([]byte, 64*1024), 1024*1024) |
| 135 | + |
| 136 | + for scanner.Scan() { |
| 137 | + outputCh <- ExecOutputLine{ |
| 138 | + Stream: stream, |
| 139 | + Data: scanner.Text(), |
| 140 | + } |
| 141 | + } |
| 142 | +} |
0 commit comments