Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/integrations/terminal/BaseTerminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ export abstract class BaseTerminal implements RooTerminal {
return output
}

public sendInput(_input: string): boolean {
return false
}

public isWaitingForInput(): boolean {
return false
}

public static defaultShellIntegrationTimeout = 5_000
private static shellIntegrationTimeout: number = BaseTerminal.defaultShellIntegrationTimeout
private static shellIntegrationDisabled: boolean = false
Expand Down
14 changes: 14 additions & 0 deletions src/integrations/terminal/BaseTerminalProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ export abstract class BaseTerminalProcess extends EventEmitter<RooTerminalProces
*/
abstract hasUnretrievedOutput(): boolean

/**
* Writes `input` to the subprocess stdin.
* @param input The input string to write
*/
public sendInput(_input: string): void {}

/**
* Checks if the process is waiting for input.
* @returns true if the process is waiting for input
*/
public isWaitingForInput(): boolean {
return false
}

/**
* Returns complete lines with their carriage returns.
* The final line may lack a carriage return if the program didn't send one.
Expand Down
9 changes: 9 additions & 0 deletions src/integrations/terminal/ExecaTerminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export class ExecaTerminal extends BaseTerminal {
this.process = process

process.on("line", (line) => callbacks.onLine(line, process))
process.on("input_required", () => callbacks.onInputRequired?.(process))
process.once("completed", (output) => callbacks.onCompleted(output, process))
process.once("shell_execution_started", (pid) => callbacks.onShellExecutionStarted(pid, process))
process.once("shell_execution_complete", (details) => callbacks.onShellExecutionComplete(details, process))
Expand All @@ -35,4 +36,12 @@ export class ExecaTerminal extends BaseTerminal {

return mergePromise(process, promise)
}

public override sendInput(input: string): boolean {
return this.process?.sendInput(input) ?? false
}

public override isWaitingForInput(): boolean {
return this.process?.isWaitingForInput() ?? false
}
}
83 changes: 66 additions & 17 deletions src/integrations/terminal/ExecaTerminalProcess.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { execa, ExecaError } from "execa"
import { execa, ExecaError, ResultPromise } from "execa"
import psTree from "ps-tree"
import process from "process"

Expand All @@ -10,6 +10,17 @@ export class ExecaTerminalProcess extends BaseTerminalProcess {
private aborted = false
private pid?: number

private subprocess?: ResultPromise<{
shell: true
cwd: string
all: true
stdin: "pipe"
}>

private waitingForInput = false
private lastOutputAt = 0
private inputDetectionTimer?: NodeJS.Timeout

constructor(terminal: RooTerminal) {
super()

Expand All @@ -35,22 +46,22 @@ export class ExecaTerminalProcess extends BaseTerminalProcess {

try {
this.isHot = true
this.lastOutputAt = Date.now()

const subprocess = execa({
shell: true,
cwd: this.terminal.getCurrentWorkingDirectory(),
all: true,
})`${command}`

this.pid = subprocess.pid
const stream = subprocess.iterable({ from: "all", preserveNewlines: true })
this.terminal.setActiveStream(stream, subprocess.pid)
const cwd = this.terminal.getCurrentWorkingDirectory()
this.subprocess = execa({ shell: true, cwd, all: true, stdin: "pipe" })`${command}`
this.pid = this.subprocess.pid
const stream = this.subprocess.iterable({ from: "all", preserveNewlines: true })
this.terminal.setActiveStream(stream, this.subprocess.pid)

for await (const line of stream) {
if (this.aborted) {
break
}

this.lastOutputAt = Date.now()
this.waitingForInput = false

this.fullOutput += line

const now = Date.now()
Expand All @@ -61,6 +72,7 @@ export class ExecaTerminalProcess extends BaseTerminalProcess {
}

this.startHotTimer(line)
this.startInputDetectionTimer()
}

if (this.aborted) {
Expand All @@ -69,15 +81,17 @@ export class ExecaTerminalProcess extends BaseTerminalProcess {
const kill = new Promise<void>((resolve) => {
timeoutId = setTimeout(() => {
try {
subprocess.kill("SIGKILL")
if (this.subprocess) {
this.subprocess.kill("SIGKILL")
}
} catch (e) {}

resolve()
}, 5_000)
})

try {
await Promise.race([subprocess, kill])
await Promise.race([this.subprocess, kill])
} catch (error) {
console.log(
`[ExecaTerminalProcess] subprocess termination error: ${error instanceof Error ? error.message : String(error)}`,
Expand Down Expand Up @@ -105,6 +119,7 @@ export class ExecaTerminalProcess extends BaseTerminalProcess {
this.terminal.setActiveStream(undefined)
this.emitRemainingBufferIfListening()
this.stopHotTimer()
this.stopInputDetectionTimer()
this.emit("completed", this.fullOutput)
this.emit("continue")
}
Expand All @@ -113,6 +128,7 @@ export class ExecaTerminalProcess extends BaseTerminalProcess {
this.isListening = false
this.removeAllListeners("line")
this.emit("continue")
this.stopInputDetectionTimer()
}

public override abort() {
Expand Down Expand Up @@ -148,6 +164,8 @@ export class ExecaTerminalProcess extends BaseTerminalProcess {
)
}
}

this.stopInputDetectionTimer()
}

public override hasUnretrievedOutput() {
Expand All @@ -165,11 +183,6 @@ export class ExecaTerminalProcess extends BaseTerminalProcess {
index++
this.lastRetrievedIndex += index

// console.log(
// `[ExecaTerminalProcess#getUnretrievedOutput] fullOutput.length=${this.fullOutput.length} lastRetrievedIndex=${this.lastRetrievedIndex}`,
// output.slice(0, index),
// )

return output.slice(0, index)
}

Expand All @@ -184,4 +197,40 @@ export class ExecaTerminalProcess extends BaseTerminalProcess {
this.emit("line", output)
}
}

private startInputDetectionTimer() {
this.stopInputDetectionTimer()

this.inputDetectionTimer = setInterval(() => {
const now = Date.now()

if (now - this.lastOutputAt > 500 && this.subprocess && !this.subprocess.killed) {
if (!this.waitingForInput) {
this.waitingForInput = true
this.emit("input_required")
}
}
}, 100)
}

private stopInputDetectionTimer() {
if (this.inputDetectionTimer) {
clearInterval(this.inputDetectionTimer)
this.inputDetectionTimer = undefined
}
}

public override sendInput(input: string): void {
if (this.subprocess && this.subprocess.stdin) {
this.subprocess.stdin.write(input + "\n")
this.waitingForInput = false
this.lastOutputAt = Date.now()
} else {
console.error("[ExecaTerminalProcess] Cannot write to stdin: subprocess or stdin not available")
}
}

public override isWaitingForInput(): boolean {
return this.waitingForInput
}
}
114 changes: 114 additions & 0 deletions src/integrations/terminal/__tests__/ExecaTerminalStdin.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// npx vitest run src/integrations/terminal/__tests__/ExecaTerminalStdin.spec.ts

import { vi, describe, beforeEach, afterEach, it, expect } from "vitest"

import { ExecaTerminal } from "../ExecaTerminal"
import { ExecaTerminalProcess } from "../ExecaTerminalProcess"
import { RooTerminalCallbacks } from "../types"

describe("ExecaTerminal stdin handling", () => {
let terminal: ExecaTerminal
let callbacks: RooTerminalCallbacks
let outputLines: string[] = []
let _completedOutput: string | undefined
let _shellExecutionStartedPid: number | undefined
let _shellExecutionCompleteDetails: any

beforeEach(() => {
terminal = new ExecaTerminal(1, process.cwd())
outputLines = []
_completedOutput = undefined
_shellExecutionStartedPid = undefined
_shellExecutionCompleteDetails = undefined

callbacks = {
onLine: (line, _process) => {
outputLines.push(line)
},
onCompleted: (output, _process) => {
_completedOutput = output
},
onShellExecutionStarted: (pid, _process) => {
_shellExecutionStartedPid = pid
},
onShellExecutionComplete: (details, _process) => {
_shellExecutionCompleteDetails = details
},
}
})

afterEach(() => {
// Clean up any running processes
if (terminal.process) {
terminal.process.abort()
}
})

it("should detect when input is required", async () => {
// This is a mock test since we can't reliably test with real password prompts
// in an automated test environment

// Create a spy on the input_required event
const inputRequiredSpy = vi.fn()

// Run a command and get the process
const processPromise = terminal.runCommand("echo 'Testing stdin'", callbacks)

// We know this is an ExecaTerminalProcess because we're using ExecaTerminal
const process = terminal.process as ExecaTerminalProcess

// Add a listener for the input_required event
process.on("input_required", inputRequiredSpy)

// Manually trigger the input detection
// @ts-ignore - Accessing private property for testing
process.waitingForInput = true
process.emit("input_required")

// Wait for the process to complete
await processPromise

// Verify the input_required event was emitted
expect(inputRequiredSpy).toHaveBeenCalled()
})

it("should be able to send input to the process", async () => {
// This is a mock test since we can't reliably test with real password prompts

// Run a command and get the process
const processPromise = terminal.runCommand("echo 'Testing stdin'", callbacks)

// We know this is an ExecaTerminalProcess because we're using ExecaTerminal
const process = terminal.process as ExecaTerminalProcess

// Create a spy on the sendInput method
const sendInputSpy = vi.spyOn(process, "sendInput")

// Send input to the process
terminal.sendInput("test input")

// Wait for the process to complete
await processPromise

// Verify sendInput was called with the correct input
expect(sendInputSpy).toHaveBeenCalledWith("test input")
})

it("isWaitingForInput should return the correct state", async () => {
// Run a command and get the process
const processPromise = terminal.runCommand("echo 'Testing stdin'", callbacks)

// Initially, the process should not be waiting for input
expect(terminal.isWaitingForInput()).toBe(false)

// Manually set the process to be waiting for input
// @ts-ignore - Accessing private property for testing
terminal.process!.waitingForInput = true

// Now it should report that it's waiting for input
expect(terminal.isWaitingForInput()).toBe(true)

// Wait for the process to complete
await processPromise
})
})
21 changes: 17 additions & 4 deletions src/integrations/terminal/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,27 @@ export interface RooTerminal {
running: boolean
taskId?: string
process?: RooTerminalProcess
getCurrentWorkingDirectory(): string
isClosed: () => boolean

runCommand: (command: string, callbacks: RooTerminalCallbacks) => RooTerminalProcessResultPromise
sendInput: (input: string) => boolean

setActiveStream(stream: AsyncIterable<string> | undefined, pid?: number): void
shellExecutionComplete(exitDetails: ExitCodeDetails): void

getCurrentWorkingDirectory(): string
getLastCommand(): string
getProcessesWithOutput(): RooTerminalProcess[]
getUnretrievedOutput(): string
getLastCommand(): string

isWaitingForInput: () => boolean
isClosed: () => boolean

shellExecutionComplete(exitDetails: ExitCodeDetails): void
cleanCompletedProcessQueue(): void
}

export interface RooTerminalCallbacks {
onLine: (line: string, process: RooTerminalProcess) => void
onInputRequired?: (process: RooTerminalProcess) => void
onCompleted: (output: string | undefined, process: RooTerminalProcess) => void
onShellExecutionStarted: (pid: number | undefined, process: RooTerminalProcess) => void
onShellExecutionComplete: (details: ExitCodeDetails, process: RooTerminalProcess) => void
Expand All @@ -31,17 +39,22 @@ export interface RooTerminalCallbacks {
export interface RooTerminalProcess extends EventEmitter<RooTerminalProcessEvents> {
command: string
isHot: boolean

run: (command: string) => Promise<void>
sendInput: (input: string) => void
continue: () => void
abort: () => void

hasUnretrievedOutput: () => boolean
getUnretrievedOutput: () => string
isWaitingForInput: () => boolean
}

export type RooTerminalProcessResultPromise = RooTerminalProcess & Promise<void>

export interface RooTerminalProcessEvents {
line: [line: string]
input_required: []
continue: []
completed: [output?: string]
stream_available: [stream: AsyncIterable<string>]
Expand Down