Skip to content

Commit 22e9448

Browse files
authored
refactor: improve stream cancellation and remove cloud service integration (#502)
1 parent 79ee241 commit 22e9448

File tree

3 files changed

+79
-66
lines changed

3 files changed

+79
-66
lines changed

src/api/providers/zgsm.ts

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export class ZgsmAiHandler extends BaseProvider implements SingleCompletionHandl
4747
private modelInfo = {} as ModelInfo
4848
private apiResponseRenderModeInfo = renderModes.fast
4949
private logger: ILogger
50-
private curStream: any = null
50+
private abortController?: AbortController
5151

5252
constructor(options: ApiHandlerOptions) {
5353
super()
@@ -101,6 +101,7 @@ export class ZgsmAiHandler extends BaseProvider implements SingleCompletionHandl
101101
metadata?: ApiHandlerCreateMessageMetadata,
102102
): ApiStream {
103103
// Performance monitoring log
104+
this.abortController = new AbortController()
104105
const requestId = uuidv7()
105106
await this.fetchModel()
106107
this.apiResponseRenderModeInfo = getApiResponseRenderMode()
@@ -137,7 +138,7 @@ export class ZgsmAiHandler extends BaseProvider implements SingleCompletionHandl
137138
const tokens = await ZgsmAuthService.getInstance().getTokens()
138139
this.client.apiKey = tokens?.access_token || "not-provided"
139140
} catch (error) {
140-
console.warn(
141+
this.logger.info(
141142
`[createMessage] getting new tokens failed \n\nuse old tokens: ${this.client.apiKey} \n\n${error.message}`,
142143
)
143144
}
@@ -165,18 +166,18 @@ export class ZgsmAiHandler extends BaseProvider implements SingleCompletionHandl
165166
let stream
166167
try {
167168
this.logger.info(`[RequestID]:`, requestId)
168-
const { data: _stream, response } = await this.client.chat.completions
169+
const { data, response } = await this.client.chat.completions
169170
.create(
170171
requestOptions,
171172
Object.assign(isAzureAiInference ? { path: OPENAI_AZURE_AI_INFERENCE_PATH } : {}, {
172173
headers: _headers,
174+
signal: this.abortController.signal,
173175
}),
174176
)
175177
.withResponse()
176178
this.logger.info(`[ResponseID]:`, response.headers.get("x-request-id"))
177179

178-
stream = _stream
179-
this.curStream = _stream
180+
stream = data
180181
if (this.options.zgsmModelId === autoModeModelId) {
181182
const userInputHeader = response.headers.get("x-user-input")
182183
if (userInputHeader) {
@@ -208,6 +209,7 @@ export class ZgsmAiHandler extends BaseProvider implements SingleCompletionHandl
208209
requestOptions,
209210
Object.assign(isAzureAiInference ? { path: OPENAI_AZURE_AI_INFERENCE_PATH } : {}, {
210211
headers: _headers,
212+
signal: this.abortController.signal,
211213
}),
212214
)
213215
this.logger.info(`[ResponseId]:`, response._request_id)
@@ -571,7 +573,9 @@ export class ZgsmAiHandler extends BaseProvider implements SingleCompletionHandl
571573
try {
572574
stream = await this.client.chat.completions.create(
573575
requestOptions,
574-
methodIsAzureAiInference ? { path: OPENAI_AZURE_AI_INFERENCE_PATH } : {},
576+
Object.assign(methodIsAzureAiInference ? { path: OPENAI_AZURE_AI_INFERENCE_PATH } : {}, {
577+
signal: this.abortController?.signal,
578+
}),
575579
)
576580
} catch (error) {
577581
throw handleOpenAIError(error, this.providerName)
@@ -601,7 +605,9 @@ export class ZgsmAiHandler extends BaseProvider implements SingleCompletionHandl
601605
try {
602606
response = await this.client.chat.completions.create(
603607
requestOptions,
604-
methodIsAzureAiInference ? { path: OPENAI_AZURE_AI_INFERENCE_PATH } : {},
608+
Object.assign(methodIsAzureAiInference ? { path: OPENAI_AZURE_AI_INFERENCE_PATH } : {}, {
609+
signal: this.abortController?.signal,
610+
}),
605611
)
606612
} catch (error) {
607613
throw handleOpenAIError(error, this.providerName)
@@ -680,12 +686,16 @@ export class ZgsmAiHandler extends BaseProvider implements SingleCompletionHandl
680686
return this.chatType
681687
}
682688

683-
cancelChat(type?: ClineApiReqCancelReason): void {
689+
cancelChat(reason?: ClineApiReqCancelReason): void {
684690
try {
685-
this.curStream?.controller?.abort?.()
686-
this.logger.info(`[cancelChat] Cancelled chat request ${type}`)
691+
if (reason === "user_cancelled") {
692+
this.logger.info(`[cancelChat] User Cancelled chat request: ${reason}`)
693+
} else {
694+
this.logger.info(`[cancelChat] AI Cancelled chat request: ${reason}`)
695+
}
696+
this.abortController?.abort(reason)
687697
} catch (error) {
688-
console.log(`Error while cancelling message ${error}`)
698+
this.logger.info(`Error while cancelling message ${error}`)
689699
}
690700
}
691701
}

src/core/task/Task.ts

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import {
3737
QueuedMessage,
3838
} from "@roo-code/types"
3939
import { TelemetryService } from "@roo-code/telemetry"
40-
import { CloudService, BridgeOrchestrator } from "@roo-code/cloud"
40+
// import { CloudService, BridgeOrchestrator } from "@roo-code/cloud"
4141

4242
// api
4343
import { ApiHandler, ApiHandlerCreateMessageMetadata, buildApiHandler } from "../../api"
@@ -658,14 +658,14 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
658658
await provider?.postMessageToWebview({ type: "messageUpdated", clineMessage: message })
659659
this.emit(RooCodeEventName.Message, { action: "updated", message })
660660

661-
const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled()
661+
// const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled()
662662

663-
if (shouldCaptureMessage) {
664-
CloudService.instance.captureEvent({
665-
event: TelemetryEventName.TASK_MESSAGE,
666-
properties: { taskId: this.taskId, message },
667-
})
668-
}
663+
// if (shouldCaptureMessage) {
664+
// CloudService.instance.captureEvent({
665+
// event: TelemetryEventName.TASK_MESSAGE,
666+
// properties: { taskId: this.taskId, message },
667+
// })
668+
// }
669669
}
670670

671671
private async saveClineMessages() {
@@ -1206,15 +1206,15 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
12061206
// Start / Resume / Abort / Dispose
12071207

12081208
private async startTask(task?: string, images?: string[]): Promise<void> {
1209-
if (this.enableBridge) {
1210-
try {
1211-
await BridgeOrchestrator.subscribeToTask(this)
1212-
} catch (error) {
1213-
console.error(
1214-
`[Task#startTask] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
1215-
)
1216-
}
1217-
}
1209+
// if (this.enableBridge) {
1210+
// try {
1211+
// await BridgeOrchestrator.subscribeToTask(this)
1212+
// } catch (error) {
1213+
// console.error(
1214+
// `[Task#startTask] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
1215+
// )
1216+
// }
1217+
// }
12181218

12191219
// `conversationHistory` (for API) and `clineMessages` (for webview)
12201220
// need to be in sync.
@@ -1247,15 +1247,15 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
12471247
}
12481248

12491249
private async resumeTaskFromHistory() {
1250-
if (this.enableBridge) {
1251-
try {
1252-
await BridgeOrchestrator.subscribeToTask(this)
1253-
} catch (error) {
1254-
console.error(
1255-
`[Task#resumeTaskFromHistory] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
1256-
)
1257-
}
1258-
}
1250+
// if (this.enableBridge) {
1251+
// try {
1252+
// await BridgeOrchestrator.subscribeToTask(this)
1253+
// } catch (error) {
1254+
// console.error(
1255+
// `[Task#resumeTaskFromHistory] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`,
1256+
// )
1257+
// }
1258+
// }
12591259

12601260
const modifiedClineMessages = await this.getSavedClineMessages()
12611261

@@ -1470,25 +1470,25 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
14701470

14711471
let newUserContent: Anthropic.Messages.ContentBlockParam[] = [...modifiedOldUserContent]
14721472

1473-
const agoText = ((): string => {
1474-
const timestamp = lastClineMessage?.ts ?? Date.now()
1475-
const now = Date.now()
1476-
const diff = now - timestamp
1477-
const minutes = Math.floor(diff / 60000)
1478-
const hours = Math.floor(minutes / 60)
1479-
const days = Math.floor(hours / 24)
1480-
1481-
if (days > 0) {
1482-
return `${days} day${days > 1 ? "s" : ""} ago`
1483-
}
1484-
if (hours > 0) {
1485-
return `${hours} hour${hours > 1 ? "s" : ""} ago`
1486-
}
1487-
if (minutes > 0) {
1488-
return `${minutes} minute${minutes > 1 ? "s" : ""} ago`
1489-
}
1490-
return "just now"
1491-
})()
1473+
// const agoText = ((): string => {
1474+
// const timestamp = lastClineMessage?.ts ?? Date.now()
1475+
// const now = Date.now()
1476+
// const diff = now - timestamp
1477+
// const minutes = Math.floor(diff / 60000)
1478+
// const hours = Math.floor(minutes / 60)
1479+
// const days = Math.floor(hours / 24)
1480+
1481+
// if (days > 0) {
1482+
// return `${days} day${days > 1 ? "s" : ""} ago`
1483+
// }
1484+
// if (hours > 0) {
1485+
// return `${hours} hour${hours > 1 ? "s" : ""} ago`
1486+
// }
1487+
// if (minutes > 0) {
1488+
// return `${minutes} minute${minutes > 1 ? "s" : ""} ago`
1489+
// }
1490+
// return "just now"
1491+
// })()
14921492

14931493
if (responseText) {
14941494
newUserContent.push({
@@ -1570,15 +1570,15 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
15701570
this.pauseInterval = undefined
15711571
}
15721572

1573-
if (this.enableBridge) {
1574-
BridgeOrchestrator.getInstance()
1575-
?.unsubscribeFromTask(this.taskId)
1576-
.catch((error) =>
1577-
console.error(
1578-
`[Task#dispose] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`,
1579-
),
1580-
)
1581-
}
1573+
// if (this.enableBridge) {
1574+
// BridgeOrchestrator.getInstance()
1575+
// ?.unsubscribeFromTask(this.taskId)
1576+
// .catch((error) =>
1577+
// console.error(
1578+
// `[Task#dispose] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`,
1579+
// ),
1580+
// )
1581+
// }
15821582

15831583
// Release any terminals associated with this task.
15841584
try {
@@ -1925,6 +1925,7 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
19251925
// Signals to provider that it can retrieve the saved messages
19261926
// from disk, as abortTask can not be awaited on in nature.
19271927
this.didFinishAbortingStream = true
1928+
19281929
this?.api?.cancelChat?.(cancelReason)
19291930
}
19301931

@@ -2230,6 +2231,7 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
22302231

22312232
// Now abort (emits TaskAborted which provider listens to)
22322233
await this.abortTask()
2234+
22332235
this?.api?.cancelChat?.(cancelReason)
22342236
// Do not rehydrate here; provider owns rehydration to avoid duplication races
22352237
}

src/core/webview/ClineProvider.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2680,9 +2680,10 @@ export class ClineProvider
26802680
},
26812681
).catch(() => {
26822682
console.error("Failed to abort task")
2683-
task?.api?.cancelChat?.(task.abortReason)
26842683
})
26852684

2685+
task?.api?.cancelChat?.(task.abortReason)
2686+
26862687
// Defensive safeguard: if current instance already changed, skip rehydrate
26872688
const current = this.getCurrentTask()
26882689
if (current && current.instanceId !== originalInstanceId) {

0 commit comments

Comments
 (0)