diff --git a/apps/macos/Sources/OpenClaw/CockpitData.swift b/apps/macos/Sources/OpenClaw/CockpitData.swift index 50dc049f8..9f2ece9e5 100644 --- a/apps/macos/Sources/OpenClaw/CockpitData.swift +++ b/apps/macos/Sources/OpenClaw/CockpitData.swift @@ -194,6 +194,18 @@ struct CockpitWorkerLogs: Codable, Sendable { let stderrTail: String } +struct CockpitPtySnapshot: Codable, Sendable { + let workerId: String + let running: Bool + let stdoutTail: String + let stderrTail: String +} + +struct CockpitPtySubscription: Codable, Sendable { + let subscriptionId: String + let workerId: String +} + struct CockpitSupervisorTickResult: Codable, Sendable { let action: String let reason: String? diff --git a/apps/macos/Sources/OpenClaw/CockpitStore.swift b/apps/macos/Sources/OpenClaw/CockpitStore.swift index 94df591b0..b857164d2 100644 --- a/apps/macos/Sources/OpenClaw/CockpitStore.swift +++ b/apps/macos/Sources/OpenClaw/CockpitStore.swift @@ -7,6 +7,7 @@ typealias CockpitGatewayStatusLoader = @Sendable () async throws -> CockpitGatew typealias CockpitWorkerLogsLoader = @Sendable (_ workerId: String) async throws -> CockpitWorkerLogs typealias CockpitSupervisorTickPerformer = @Sendable (_ repoRoot: String?) async throws -> CockpitSupervisorTickResult typealias CockpitWorkerActionPerformer = @Sendable (_ action: CockpitWorkerAction, _ workerId: String) async throws -> Void +typealias CockpitPtySnapshotLoader = @Sendable (_ workerId: String) async throws -> CockpitPtySnapshot typealias CockpitRemoteReconnectAction = @Sendable () async throws -> Void enum CockpitLoadError: LocalizedError { @@ -47,12 +48,15 @@ final class CockpitStore { var isPerformingWorkerAction = false var activeWorkerAction: CockpitWorkerAction? var isRepairingRemoteConnection = false + var terminalStores: [String: CockpitTerminalStore] = [:] + var showTerminalLanes = true private let logger = Logger(subsystem: "ai.openclaw", category: "cockpit.ui") private let isPreview: Bool private let loadGatewayStatus: CockpitGatewayStatusLoader private let loadSummary: CockpitSummaryLoader private let loadWorkerLogs: CockpitWorkerLogsLoader + private let loadPtySnapshot: CockpitPtySnapshotLoader private let performSupervisorTickImpl: CockpitSupervisorTickPerformer private let performWorkerActionImpl: CockpitWorkerActionPerformer private let reconnectRemoteGatewayImpl: CockpitRemoteReconnectAction @@ -78,6 +82,7 @@ final class CockpitStore { loadGatewayStatus: CockpitGatewayStatusLoader? = nil, loadSummary: CockpitSummaryLoader? = nil, loadWorkerLogs: CockpitWorkerLogsLoader? = nil, + loadPtySnapshot: CockpitPtySnapshotLoader? = nil, performSupervisorTick: CockpitSupervisorTickPerformer? = nil, performWorkerAction: CockpitWorkerActionPerformer? = nil, reconnectRemoteGateway: CockpitRemoteReconnectAction? = nil) @@ -94,6 +99,9 @@ final class CockpitStore { self.loadWorkerLogs = loadWorkerLogs ?? { workerId in try await GatewayConnection.shared.codeWorkerLogs(workerId: workerId) } + self.loadPtySnapshot = loadPtySnapshot ?? { workerId in + try await GatewayConnection.shared.codeWorkerPtySnapshot(workerId: workerId) + } self.performSupervisorTickImpl = performSupervisorTick ?? { repoRoot in try await GatewayConnection.shared.codeSupervisorTick(repoRoot: repoRoot) } @@ -147,6 +155,7 @@ final class CockpitStore { self.snapshot = self.snapshot ?? .preview self.gatewayStatus = self.gatewayStatus ?? .previewLocal self.reconcileSelection() + self.reconcileTerminalStores() if let workerId = self.selectedWorkerId, self.selectedWorkerLogs == nil { self.selectedWorkerLogs = .preview(workerId: workerId) } @@ -161,6 +170,7 @@ final class CockpitStore { self.gatewayStatus = try await self.loadGatewayStatus() self.snapshot = try await self.loadSummary() self.reconcileSelection() + self.reconcileTerminalStores() await self.refreshSelectedWorkerLogs() } catch { let message = (error as? LocalizedError)?.errorDescription ?? error.localizedDescription @@ -261,6 +271,47 @@ final class CockpitStore { self.selectedWorkerId = snapshot.activeLanes.first?.workerId } + func reconcileTerminalStores() { + guard let snapshot = self.snapshot else { + for store in self.terminalStores.values { store.stopPolling() } + self.terminalStores.removeAll() + return + } + + let activeLaneIds = Set(snapshot.activeLanes.map(\.workerId)) + + // Remove stores for lanes that no longer exist. + for (workerId, store) in self.terminalStores where !activeLaneIds.contains(workerId) { + store.stopPolling() + self.terminalStores.removeValue(forKey: workerId) + } + + // Create stores for new lanes. + for lane in snapshot.activeLanes where self.terminalStores[lane.workerId] == nil { + let store = CockpitTerminalStore( + workerId: lane.workerId, + loadSnapshot: self.loadPtySnapshot) + self.terminalStores[lane.workerId] = store + if lane.status == "running" { + store.startPolling() + } + } + + // Start/stop polling based on worker status. + for lane in snapshot.activeLanes { + guard let store = self.terminalStores[lane.workerId] else { continue } + if lane.status == "running" { + store.startPolling() + } else { + store.stopPolling() + } + } + } + + func terminalStore(for workerId: String) -> CockpitTerminalStore? { + self.terminalStores[workerId] + } + private func refreshSelectedWorkerLogs() async { guard let workerId = self.selectedWorkerId else { self.selectedWorkerLogs = nil diff --git a/apps/macos/Sources/OpenClaw/CockpitTerminalLane.swift b/apps/macos/Sources/OpenClaw/CockpitTerminalLane.swift new file mode 100644 index 000000000..ac1901248 --- /dev/null +++ b/apps/macos/Sources/OpenClaw/CockpitTerminalLane.swift @@ -0,0 +1,137 @@ +import AppKit +import Observation +import SwiftUI + +/// A live terminal output view for a single cockpit worker lane. +/// Polls the gateway for PTY snapshots and auto-scrolls to the bottom. +@MainActor +struct CockpitTerminalLane: View { + let workerId: String + let workerName: String + let status: String + @Bindable var terminalStore: CockpitTerminalStore + + var body: some View { + VStack(alignment: .leading, spacing: 6) { + HStack(spacing: 8) { + Circle() + .fill(self.statusColor) + .frame(width: 8, height: 8) + Text(self.workerName) + .font(.caption.weight(.semibold)) + Spacer() + Text(self.status.replacingOccurrences(of: "_", with: " ")) + .font(.caption2) + .foregroundStyle(.secondary) + if self.terminalStore.isPolling { + ProgressView() + .controlSize(.mini) + } + } + .padding(.horizontal, 10) + .padding(.top, 8) + + ScrollViewReader { proxy in + ScrollView(.vertical) { + Text(self.displayText) + .font(.system(size: 11, design: .monospaced)) + .foregroundStyle(.primary) + .textSelection(.enabled) + .frame(maxWidth: .infinity, alignment: .leading) + .padding(8) + .id("terminal-bottom") + } + .onChange(of: self.terminalStore.stdoutContent) { + if self.terminalStore.autoScroll { + proxy.scrollTo("terminal-bottom", anchor: .bottom) + } + } + } + .background(Color(nsColor: .textBackgroundColor).opacity(0.6)) + .clipShape(RoundedRectangle(cornerRadius: 6, style: .continuous)) + .padding(.horizontal, 6) + .padding(.bottom, 6) + } + .background( + RoundedRectangle(cornerRadius: 10, style: .continuous) + .fill(Color.primary.opacity(0.03)) + .stroke(Color.primary.opacity(0.08), lineWidth: 1)) + } + + private var displayText: String { + let stdout = self.terminalStore.stdoutContent + let stderr = self.terminalStore.stderrContent + if stdout.isEmpty && stderr.isEmpty { + return "Waiting for output…" + } + if stderr.isEmpty { return Self.stripAnsi(stdout) } + if stdout.isEmpty { return Self.stripAnsi(stderr) } + return "\(Self.stripAnsi(stdout))\n\nstderr:\n\(Self.stripAnsi(stderr))" + } + + private var statusColor: Color { + switch self.status { + case "running": .green + case "paused": .orange + case "failed", "cancelled": .red + case "completed", "succeeded": .blue + default: .gray + } + } + + /// Strip ANSI escape sequences for display in a plain text view. + private static func stripAnsi(_ input: String) -> String { + // Matches CSI sequences (ESC [ ... final byte) and OSC sequences (ESC ] ... ST). + let pattern = #"\x1B(?:\[[0-9;]*[A-Za-z]|\][^\x07\x1B]*(?:\x07|\x1B\\))"# + return (try? NSRegularExpression(pattern: pattern)) + .map { $0.stringByReplacingMatches(in: input, range: NSRange(input.startIndex..., in: input), withTemplate: "") } + ?? input + } +} + +/// Manages polling for a single worker's PTY snapshot. +@MainActor +@Observable +final class CockpitTerminalStore { + let workerId: String + var stdoutContent = "" + var stderrContent = "" + var isPolling = false + var autoScroll = true + + private let loadSnapshot: @Sendable (_ workerId: String) async throws -> CockpitPtySnapshot + private var pollTask: Task? + + init( + workerId: String, + loadSnapshot: @escaping @Sendable (_ workerId: String) async throws -> CockpitPtySnapshot) + { + self.workerId = workerId + self.loadSnapshot = loadSnapshot + } + + func startPolling(intervalSeconds: Double = 1.5) { + guard self.pollTask == nil else { return } + self.pollTask = Task { [weak self] in + while !Task.isCancelled { + guard let self else { return } + self.isPolling = true + do { + let snapshot = try await self.loadSnapshot(self.workerId) + self.stdoutContent = snapshot.stdoutTail + self.stderrContent = snapshot.stderrTail + } catch { + // Polling errors are silently tolerated; next poll will retry. + } + self.isPolling = false + try? await Task.sleep(for: .seconds(intervalSeconds)) + } + } + } + + func stopPolling() { + self.pollTask?.cancel() + self.pollTask = nil + self.isPolling = false + } +} diff --git a/apps/macos/Sources/OpenClaw/CockpitWindow.swift b/apps/macos/Sources/OpenClaw/CockpitWindow.swift index 88f7769c0..4ab7057e8 100644 --- a/apps/macos/Sources/OpenClaw/CockpitWindow.swift +++ b/apps/macos/Sources/OpenClaw/CockpitWindow.swift @@ -73,6 +73,11 @@ struct CockpitWindow: View { }) CockpitSelectedWorkerSection(store: self.store) } + if self.store.showTerminalLanes && !snapshot.activeLanes.isEmpty { + CockpitTerminalLanesSection( + lanes: snapshot.activeLanes, + store: self.store) + } HStack(alignment: .top, spacing: 16) { CockpitReviewSection(reviews: snapshot.pendingReviews) CockpitRunsSection(runs: snapshot.recentRuns) @@ -570,6 +575,49 @@ private struct CockpitTasksSection: View { } } +private struct CockpitTerminalLanesSection: View { + let lanes: [CockpitLaneSummary] + @Bindable var store: CockpitStore + + private let columns = [ + GridItem(.flexible(minimum: 360), spacing: 10), + GridItem(.flexible(minimum: 360), spacing: 10), + ] + + var body: some View { + VStack(alignment: .leading, spacing: 10) { + HStack { + Text("Terminal Lanes") + .font(.title3.weight(.semibold)) + Spacer() + Button { + self.store.showTerminalLanes.toggle() + } label: { + Label( + self.store.showTerminalLanes ? "Hide" : "Show", + systemImage: self.store.showTerminalLanes + ? "terminal.fill" : "terminal") + } + .buttonStyle(.bordered) + .controlSize(.small) + } + LazyVGrid(columns: self.columns, alignment: .leading, spacing: 10) { + ForEach(self.lanes) { lane in + if let terminalStore = self.store.terminalStore(for: lane.workerId) { + CockpitTerminalLane( + workerId: lane.workerId, + workerName: lane.workerName, + status: lane.status, + terminalStore: terminalStore) + .frame(minHeight: 200, maxHeight: 320) + } + } + } + } + .frame(maxWidth: .infinity, alignment: .leading) + } +} + private func sectionPlaceholder(_ message: String) -> some View { Text(message) .font(.callout) diff --git a/apps/macos/Sources/OpenClaw/GatewayConnection.swift b/apps/macos/Sources/OpenClaw/GatewayConnection.swift index 454e17dd2..1a1cc5e3a 100644 --- a/apps/macos/Sources/OpenClaw/GatewayConnection.swift +++ b/apps/macos/Sources/OpenClaw/GatewayConnection.swift @@ -104,6 +104,9 @@ actor GatewayConnection { case codeWorkerResume = "code.worker.resume" case codeWorkerCancel = "code.worker.cancel" case codeWorkerLogs = "code.worker.logs" + case codeWorkerPtySnapshot = "code.worker.pty.snapshot" + case codeWorkerPtySubscribe = "code.worker.pty.subscribe" + case codeWorkerPtyUnsubscribe = "code.worker.pty.unsubscribe" } private let configProvider: @Sendable () async throws -> Config @@ -848,6 +851,26 @@ extension GatewayConnection { timeoutMs: 10000) } + func codeWorkerPtySnapshot(workerId: String) async throws -> CockpitPtySnapshot { + try await self.requestDecoded( + method: .codeWorkerPtySnapshot, + params: ["workerId": AnyCodable(workerId)], + timeoutMs: 10000) + } + + func codeWorkerPtySubscribe(workerId: String) async throws -> CockpitPtySubscription { + try await self.requestDecoded( + method: .codeWorkerPtySubscribe, + params: ["workerId": AnyCodable(workerId)], + timeoutMs: 10000) + } + + func codeWorkerPtyUnsubscribe(subscriptionId: String) async throws { + try await self.requestVoid( + method: .codeWorkerPtyUnsubscribe, + params: ["subscriptionId": AnyCodable(subscriptionId)]) + } + nonisolated static func decodeCronListResponse(_ data: Data) throws -> [CronJob] { let decoded = try JSONDecoder().decode(LossyCronListResponse.self, from: data) let jobs = decoded.jobs.compactMap(\.value) diff --git a/src/code-cockpit/runtime.ts b/src/code-cockpit/runtime.ts index d1288aed8..3b6d226de 100644 --- a/src/code-cockpit/runtime.ts +++ b/src/code-cockpit/runtime.ts @@ -114,6 +114,20 @@ type ActiveWorkerRun = { logWrite: Promise; }; +export type PtyLogChunk = { + workerId: string; + stream: "stdout" | "stderr"; + data: string; +}; + +export type PtyLogSubscriber = (chunk: PtyLogChunk) => void; + +type PtySubscription = { + id: string; + workerId: string; + listener: PtyLogSubscriber; +}; + type PreparedBackend = Awaited>; export type CodeCockpitRuntimeDeps = { @@ -685,6 +699,7 @@ class CodeCockpitRuntime { private readonly runCommandWithTimeout; private readonly now; private readonly activeRuns = new Map(); + private readonly ptySubscribers = new Map(); private initPromise: Promise | null = null; constructor(deps: CodeCockpitRuntimeDeps = {}) { @@ -785,6 +800,17 @@ class CodeCockpitRuntime { await fs.appendFile(logPath, chunk, "utf8"); }) .catch(() => {}); + + // Notify PTY log subscribers for this worker. + for (const sub of this.ptySubscribers.values()) { + if (sub.workerId === active.workerId) { + try { + sub.listener({ workerId: active.workerId, stream, data: chunk }); + } catch { + // Subscriber errors must not disrupt the run. + } + } + } } private async bootstrapFastTodoTask(repoRoot: string): Promise { @@ -1561,6 +1587,41 @@ class CodeCockpitRuntime { }; } + subscribePtyLogs(workerId: string, listener: PtyLogSubscriber): string { + const id = `pty_sub_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; + this.ptySubscribers.set(id, { id, workerId, listener }); + return id; + } + + unsubscribePtyLogs(subscriptionId: string): void { + this.ptySubscribers.delete(subscriptionId); + } + + isWorkerRunning(workerId: string): boolean { + return this.activeRuns.has(workerId); + } + + async readWorkerPtySnapshot(params: { + workerId: string; + }): Promise<{ workerId: string; running: boolean; stdoutTail: string; stderrTail: string }> { + const active = this.activeRuns.get(params.workerId); + if (active) { + return { + workerId: params.workerId, + running: true, + stdoutTail: active.stdoutTail, + stderrTail: active.stderrTail, + }; + } + const logs = await this.readWorkerLogs(params); + return { + workerId: params.workerId, + running: false, + stdoutTail: logs.stdoutTail, + stderrTail: logs.stderrTail, + }; + } + async readWorkerLogs(params: { workerId: string }): Promise { const shown = await this.showWorker(params); const latestRun = shown.runs[0] ?? null; diff --git a/src/gateway/server-methods/code-cockpit.ts b/src/gateway/server-methods/code-cockpit.ts index 73f9f1e1f..b12f90ca1 100644 --- a/src/gateway/server-methods/code-cockpit.ts +++ b/src/gateway/server-methods/code-cockpit.ts @@ -70,6 +70,14 @@ async function withRuntimeResult( } } +function requireSubscriptionId(value: unknown): string { + const subId = typeof value === "string" ? value.trim() : ""; + if (!subId) { + throw new Error("subscriptionId is required"); + } + return subId; +} + export const codeCockpitHandlers: GatewayRequestHandlers = { "code.cockpit.summary": async ({ respond }) => { await withRuntimeResult( @@ -253,4 +261,49 @@ export const codeCockpitHandlers: GatewayRequestHandlers = { }), ); }, + "code.worker.pty.snapshot": async ({ params, respond }) => { + await withRuntimeResult( + respond, + async () => + await getCodeCockpitRuntime().readWorkerPtySnapshot({ + workerId: requireWorkerId(params.workerId), + }), + ); + }, + "code.worker.pty.subscribe": async ({ params, respond }) => { + try { + const workerId = requireWorkerId(params.workerId); + const runtime = getCodeCockpitRuntime(); + const subscriptionId = runtime.subscribePtyLogs(workerId, () => { + // The subscriber callback is used internally; the client polls via + // pty.snapshot or receives push events when those are wired up. + }); + respond(true, { subscriptionId, workerId }, undefined); + } catch (error) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + error instanceof Error ? error.message : String(error), + ), + ); + } + }, + "code.worker.pty.unsubscribe": async ({ params, respond }) => { + try { + const subscriptionId = requireSubscriptionId(params.subscriptionId); + getCodeCockpitRuntime().unsubscribePtyLogs(subscriptionId); + respond(true, { ok: true }, undefined); + } catch (error) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + error instanceof Error ? error.message : String(error), + ), + ); + } + }, };