Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 0 additions & 142 deletions packages/cloud/src/bridge/BaseChannel.ts

This file was deleted.

131 changes: 11 additions & 120 deletions packages/cloud/src/bridge/BridgeOrchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@ import os from "os"

import {
type TaskProviderLike,
type TaskLike,
type CloudUserInfo,
type ExtensionBridgeCommand,
type TaskBridgeCommand,
type StaticAppProperties,
type GitProperties,
ConnectionState,
ExtensionSocketEvents,
TaskSocketEvents,
} from "@roo-code/types"

import { SocketTransport } from "./SocketTransport.js"
import { ExtensionChannel } from "./ExtensionChannel.js"
import { TaskChannel } from "./TaskChannel.js"

export interface BridgeOrchestratorOptions {
userId: string
Expand All @@ -27,16 +23,9 @@ export interface BridgeOrchestratorOptions {
isCloudAgent: boolean
}

/**
* Central orchestrator for the extension bridge system.
* Coordinates communication between the VSCode extension and web application
* through WebSocket connections and manages extension/task channels.
*/
export class BridgeOrchestrator {
private static instance: BridgeOrchestrator | null = null

private static pendingTask: TaskLike | null = null

// Core
private readonly userId: string
private readonly socketBridgeUrl: string
Expand All @@ -50,7 +39,6 @@ export class BridgeOrchestrator {
// Components
private socketTransport: SocketTransport
private extensionChannel: ExtensionChannel
private taskChannel: TaskChannel

// Reconnection
private readonly MAX_RECONNECT_ATTEMPTS = Infinity
Expand Down Expand Up @@ -92,7 +80,7 @@ export class BridgeOrchestrator {
}
}

public static async connect(options: BridgeOrchestratorOptions) {
private static async connect(options: BridgeOrchestratorOptions) {
const instance = BridgeOrchestrator.instance

if (!instance) {
Expand All @@ -110,13 +98,10 @@ export class BridgeOrchestrator {
)
}
} else {
if (
instance.connectionState === ConnectionState.FAILED ||
instance.connectionState === ConnectionState.DISCONNECTED
) {
console.log(
`[BridgeOrchestrator#connectOrDisconnect] Re-connecting... (state: ${instance.connectionState})`,
)
const connectionState = instance.socketTransport.getConnectionState()

if (connectionState === ConnectionState.FAILED || connectionState === ConnectionState.DISCONNECTED) {
console.log(`[BridgeOrchestrator#connectOrDisconnect] Re-connecting... (state: ${connectionState})`)

instance.reconnect().catch((error) => {
console.error(
Expand All @@ -125,7 +110,7 @@ export class BridgeOrchestrator {
})
} else {
console.log(
`[BridgeOrchestrator#connectOrDisconnect] Already connected or connecting (state: ${instance.connectionState})`,
`[BridgeOrchestrator#connectOrDisconnect] Already connected or connecting (state: ${connectionState})`,
)
}
}
Expand All @@ -135,11 +120,10 @@ export class BridgeOrchestrator {
const instance = BridgeOrchestrator.instance

if (instance) {
try {
console.log(
`[BridgeOrchestrator#connectOrDisconnect] Disconnecting... (state: ${instance.connectionState})`,
)
const connectionState = instance.socketTransport.getConnectionState()

try {
console.log(`[BridgeOrchestrator#connectOrDisconnect] Disconnecting... (state: ${connectionState})`)
await instance.disconnect()
} catch (error) {
console.error(
Expand All @@ -153,22 +137,6 @@ export class BridgeOrchestrator {
}
}

/**
* @TODO: What if subtasks also get spawned? We'd probably want deferred
* subscriptions for those too.
*/
public static async subscribeToTask(task: TaskLike): Promise<void> {
const instance = BridgeOrchestrator.instance

if (instance && instance.socketTransport.isConnected()) {
console.log(`[BridgeOrchestrator#subscribeToTask] Subscribing to task ${task.taskId}`)
await instance.subscribeToTask(task)
} else {
console.log(`[BridgeOrchestrator#subscribeToTask] Deferring subscription for task ${task.taskId}`)
BridgeOrchestrator.pendingTask = task
}
}

private constructor(options: BridgeOrchestratorOptions) {
this.userId = options.userId
this.socketBridgeUrl = options.socketBridgeUrl
Expand Down Expand Up @@ -206,13 +174,6 @@ export class BridgeOrchestrator {
provider: this.provider,
isCloudAgent: this.isCloudAgent,
})

this.taskChannel = new TaskChannel({
instanceId: this.instanceId,
appProperties: this.appProperties,
gitProperties: this.gitProperties,
isCloudAgent: this.isCloudAgent,
})
}

private setupSocketListeners() {
Expand All @@ -225,7 +186,6 @@ export class BridgeOrchestrator {

// Remove any existing listeners first to prevent duplicates.
socket.off(ExtensionSocketEvents.RELAYED_COMMAND)
socket.off(TaskSocketEvents.RELAYED_COMMAND)
socket.off("connected")

socket.on(ExtensionSocketEvents.RELAYED_COMMAND, (message: ExtensionBridgeCommand) => {
Expand All @@ -235,14 +195,6 @@ export class BridgeOrchestrator {

this.extensionChannel?.handleCommand(message)
})

socket.on(TaskSocketEvents.RELAYED_COMMAND, (message: TaskBridgeCommand) => {
console.log(
`[BridgeOrchestrator] on(${TaskSocketEvents.RELAYED_COMMAND}) -> ${message.type} for ${message.taskId}`,
)

this.taskChannel.handleCommand(message)
})
}

private async handleConnect() {
Expand All @@ -254,27 +206,10 @@ export class BridgeOrchestrator {
}

await this.extensionChannel.onConnect(socket)
await this.taskChannel.onConnect(socket)

if (BridgeOrchestrator.pendingTask) {
console.log(
`[BridgeOrchestrator#handleConnect] Subscribing to task ${BridgeOrchestrator.pendingTask.taskId}`,
)

try {
await this.subscribeToTask(BridgeOrchestrator.pendingTask)
BridgeOrchestrator.pendingTask = null
} catch (error) {
console.error(
`[BridgeOrchestrator#handleConnect] subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
)
}
}
}

private handleDisconnect() {
this.extensionChannel.onDisconnect()
this.taskChannel.onDisconnect()
}

private async handleReconnect() {
Expand All @@ -291,65 +226,21 @@ export class BridgeOrchestrator {
this.setupSocketListeners()

await this.extensionChannel.onReconnect(socket)
await this.taskChannel.onReconnect(socket)
}

// Task API

public async subscribeToTask(task: TaskLike): Promise<void> {
const socket = this.socketTransport.getSocket()

if (!socket || !this.socketTransport.isConnected()) {
console.warn("[BridgeOrchestrator] Cannot subscribe to task: not connected. Will retry when connected.")
this.taskChannel.addPendingTask(task)

if (
this.connectionState === ConnectionState.DISCONNECTED ||
this.connectionState === ConnectionState.FAILED
) {
await this.connect()
}

return
}

await this.taskChannel.subscribeToTask(task, socket)
}

public async unsubscribeFromTask(taskId: string): Promise<void> {
const socket = this.socketTransport.getSocket()

if (!socket) {
return
}

await this.taskChannel.unsubscribeFromTask(taskId, socket)
}

// Shared API

public get connectionState(): ConnectionState {
return this.socketTransport.getConnectionState()
}

private async connect(): Promise<void> {
await this.socketTransport.connect()
this.setupSocketListeners()
}

public async disconnect(): Promise<void> {
private async disconnect(): Promise<void> {
await this.extensionChannel.cleanup(this.socketTransport.getSocket())
await this.taskChannel.cleanup(this.socketTransport.getSocket())
await this.socketTransport.disconnect()
BridgeOrchestrator.instance = null
BridgeOrchestrator.pendingTask = null
}

public async reconnect(): Promise<void> {
private async reconnect(): Promise<void> {
await this.socketTransport.reconnect()

// After a manual reconnect, we have a new socket instance
// so we need to set up listeners again.
this.setupSocketListeners()
}
}
Loading
Loading