Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 0 additions & 142 deletions packages/cloud/src/bridge/BaseChannel.ts

This file was deleted.

91 changes: 0 additions & 91 deletions packages/cloud/src/bridge/BridgeOrchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@ import os from "os"

import {
type TaskProviderLike,
type TaskLike,
type CloudUserInfo,
type ExtensionBridgeCommand,
type TaskBridgeCommand,
type StaticAppProperties,
type GitProperties,
ConnectionState,
ExtensionSocketEvents,
TaskSocketEvents,
} from "@roo-code/types"

import { SocketTransport } from "./SocketTransport.js"
import { ExtensionChannel } from "./ExtensionChannel.js"
import { TaskChannel } from "./TaskChannel.js"

export interface BridgeOrchestratorOptions {
userId: string
Expand All @@ -35,8 +31,6 @@ export interface BridgeOrchestratorOptions {
export class BridgeOrchestrator {
private static instance: BridgeOrchestrator | null = null

private static pendingTask: TaskLike | null = null

// Core
private readonly userId: string
private readonly socketBridgeUrl: string
Expand All @@ -50,7 +44,6 @@ export class BridgeOrchestrator {
// Components
private socketTransport: SocketTransport
private extensionChannel: ExtensionChannel
private taskChannel: TaskChannel

// Reconnection
private readonly MAX_RECONNECT_ATTEMPTS = Infinity
Expand Down Expand Up @@ -153,22 +146,6 @@ export class BridgeOrchestrator {
}
}

/**
* @TODO: What if subtasks also get spawned? We'd probably want deferred
* subscriptions for those too.
*/
public static async subscribeToTask(task: TaskLike): Promise<void> {
const instance = BridgeOrchestrator.instance

if (instance && instance.socketTransport.isConnected()) {
console.log(`[BridgeOrchestrator#subscribeToTask] Subscribing to task ${task.taskId}`)
await instance.subscribeToTask(task)
} else {
console.log(`[BridgeOrchestrator#subscribeToTask] Deferring subscription for task ${task.taskId}`)
BridgeOrchestrator.pendingTask = task
}
}

private constructor(options: BridgeOrchestratorOptions) {
this.userId = options.userId
this.socketBridgeUrl = options.socketBridgeUrl
Expand Down Expand Up @@ -206,13 +183,6 @@ export class BridgeOrchestrator {
provider: this.provider,
isCloudAgent: this.isCloudAgent,
})

this.taskChannel = new TaskChannel({
instanceId: this.instanceId,
appProperties: this.appProperties,
gitProperties: this.gitProperties,
isCloudAgent: this.isCloudAgent,
})
}

private setupSocketListeners() {
Expand All @@ -225,7 +195,6 @@ export class BridgeOrchestrator {

// Remove any existing listeners first to prevent duplicates.
socket.off(ExtensionSocketEvents.RELAYED_COMMAND)
socket.off(TaskSocketEvents.RELAYED_COMMAND)
socket.off("connected")

socket.on(ExtensionSocketEvents.RELAYED_COMMAND, (message: ExtensionBridgeCommand) => {
Expand All @@ -235,14 +204,6 @@ export class BridgeOrchestrator {

this.extensionChannel?.handleCommand(message)
})

socket.on(TaskSocketEvents.RELAYED_COMMAND, (message: TaskBridgeCommand) => {
console.log(
`[BridgeOrchestrator] on(${TaskSocketEvents.RELAYED_COMMAND}) -> ${message.type} for ${message.taskId}`,
)

this.taskChannel.handleCommand(message)
})
}

private async handleConnect() {
Expand All @@ -254,27 +215,10 @@ export class BridgeOrchestrator {
}

await this.extensionChannel.onConnect(socket)
await this.taskChannel.onConnect(socket)

if (BridgeOrchestrator.pendingTask) {
console.log(
`[BridgeOrchestrator#handleConnect] Subscribing to task ${BridgeOrchestrator.pendingTask.taskId}`,
)

try {
await this.subscribeToTask(BridgeOrchestrator.pendingTask)
BridgeOrchestrator.pendingTask = null
} catch (error) {
console.error(
`[BridgeOrchestrator#handleConnect] subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
)
}
}
}

private handleDisconnect() {
this.extensionChannel.onDisconnect()
this.taskChannel.onDisconnect()
}

private async handleReconnect() {
Expand All @@ -291,39 +235,6 @@ export class BridgeOrchestrator {
this.setupSocketListeners()

await this.extensionChannel.onReconnect(socket)
await this.taskChannel.onReconnect(socket)
}

// Task API

public async subscribeToTask(task: TaskLike): Promise<void> {
const socket = this.socketTransport.getSocket()

if (!socket || !this.socketTransport.isConnected()) {
console.warn("[BridgeOrchestrator] Cannot subscribe to task: not connected. Will retry when connected.")
this.taskChannel.addPendingTask(task)

if (
this.connectionState === ConnectionState.DISCONNECTED ||
this.connectionState === ConnectionState.FAILED
) {
await this.connect()
}

return
}

await this.taskChannel.subscribeToTask(task, socket)
}

public async unsubscribeFromTask(taskId: string): Promise<void> {
const socket = this.socketTransport.getSocket()

if (!socket) {
return
}

await this.taskChannel.unsubscribeFromTask(taskId, socket)
}

// Shared API
Expand All @@ -339,10 +250,8 @@ export class BridgeOrchestrator {

public async disconnect(): Promise<void> {
await this.extensionChannel.cleanup(this.socketTransport.getSocket())
await this.taskChannel.cleanup(this.socketTransport.getSocket())
await this.socketTransport.disconnect()
BridgeOrchestrator.instance = null
BridgeOrchestrator.pendingTask = null
}

public async reconnect(): Promise<void> {
Expand Down
Loading
Loading