Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 65 additions & 28 deletions src/core/task/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ import { processUserContentMentions } from "../mentions/processUserContentMentio
import { getMessagesSinceLastSummary, summarizeConversation } from "../condense"
import { Gpt5Metadata, ClineMessageWithMetadata } from "./types"
import { MessageQueueService } from "../message-queue/MessageQueueService"
import { DebouncedSave } from "../../utils/debouncedSave"

import { AutoApprovalHandler } from "./AutoApprovalHandler"

Expand Down Expand Up @@ -297,6 +298,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
private tokenUsageSnapshot?: TokenUsage
private tokenUsageSnapshotAt?: number

// Debounced save for streaming operations
private debouncedSave: DebouncedSave

constructor({
provider,
apiConfiguration,
Expand Down Expand Up @@ -412,6 +416,12 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {

this.toolRepetitionDetector = new ToolRepetitionDetector(this.consecutiveMistakeLimit)

// Initialize debounced save for streaming operations
this.debouncedSave = new DebouncedSave({
delay: 500, // 500ms debounce for streaming updates
maxWait: 2000, // Force save after 2 seconds max
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be beneficial to make these delays configurable for different use cases or testing scenarios? The hardcoded values might not be optimal for all situations.

})

// Initialize todo list if provided
if (initialTodos && initialTodos.length > 0) {
this.todoList = initialTodos
Expand Down Expand Up @@ -611,7 +621,11 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
const provider = this.providerRef.deref()
await provider?.postStateToWebview()
this.emit(RooCodeEventName.Message, { action: "created", message })
await this.saveClineMessages()

// Skip saving partial messages to avoid excessive disk writes during streaming
if (!message.partial) {
await this.saveClineMessages()
}

const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled()

Expand Down Expand Up @@ -646,6 +660,8 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
await provider?.postMessageToWebview({ type: "messageUpdated", clineMessage: message })
this.emit(RooCodeEventName.Message, { action: "updated", message })

// Skip saving partial messages to avoid excessive disk writes during streaming
// The message will be saved when it's complete
const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled()

if (shouldCaptureMessage) {
Expand All @@ -656,34 +672,44 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
}
}

private async saveClineMessages() {
try {
await saveTaskMessages({
messages: this.clineMessages,
taskId: this.taskId,
globalStoragePath: this.globalStoragePath,
})
private async saveClineMessages(immediate: boolean = false) {
const saveFunction = async () => {
try {
await saveTaskMessages({
messages: this.clineMessages,
taskId: this.taskId,
globalStoragePath: this.globalStoragePath,
})

const { historyItem, tokenUsage } = await taskMetadata({
taskId: this.taskId,
rootTaskId: this.rootTaskId,
parentTaskId: this.parentTaskId,
taskNumber: this.taskNumber,
messages: this.clineMessages,
globalStoragePath: this.globalStoragePath,
workspace: this.cwd,
mode: this._taskMode || defaultModeSlug, // Use the task's own mode, not the current provider mode.
})
const { historyItem, tokenUsage } = await taskMetadata({
taskId: this.taskId,
rootTaskId: this.rootTaskId,
parentTaskId: this.parentTaskId,
taskNumber: this.taskNumber,
messages: this.clineMessages,
globalStoragePath: this.globalStoragePath,
workspace: this.cwd,
mode: this._taskMode || defaultModeSlug, // Use the task's own mode, not the current provider mode.
})

if (hasTokenUsageChanged(tokenUsage, this.tokenUsageSnapshot)) {
this.emit(RooCodeEventName.TaskTokenUsageUpdated, this.taskId, tokenUsage)
this.tokenUsageSnapshot = undefined
this.tokenUsageSnapshotAt = undefined
if (hasTokenUsageChanged(tokenUsage, this.tokenUsageSnapshot)) {
this.emit(RooCodeEventName.TaskTokenUsageUpdated, this.taskId, tokenUsage)
this.tokenUsageSnapshot = undefined
this.tokenUsageSnapshotAt = undefined
}

await this.providerRef.deref()?.updateTaskHistory(historyItem)
} catch (error) {
console.error("Failed to save Roo messages:", error)
}
}

await this.providerRef.deref()?.updateTaskHistory(historyItem)
} catch (error) {
console.error("Failed to save Roo messages:", error)
// If immediate save is requested or we're not streaming, save immediately
if (immediate || !this.isStreaming) {
await saveFunction()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here could be clearer. When immediate is false but isStreaming is also false, it saves immediately anyway. Is this intentional? Consider adding a comment to clarify this behavior.

} else {
// During streaming, use debounced save to reduce disk writes
this.debouncedSave.schedule(saveFunction)
}
}

Expand Down Expand Up @@ -1072,6 +1098,7 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
lastMessage.images = images
lastMessage.partial = partial
lastMessage.progressStatus = progressStatus
// Don't save partial messages - just update the UI
this.updateClineMessage(lastMessage)
} else {
// This is a new partial message, so add it with partial state.
Expand Down Expand Up @@ -1506,6 +1533,14 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
public dispose(): void {
console.log(`[Task#dispose] disposing task ${this.taskId}.${this.instanceId}`)

// Flush any pending saves before disposing
try {
this.debouncedSave.flush().catch(console.error)
this.debouncedSave.dispose()
} catch (error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The debounced save should be flushed in abortTask before disposal to ensure any pending saves complete. Currently it's only handled in dispose(). Consider adding a flush before the dispose call to prevent data loss.

console.error("Error disposing debounced save:", error)
}

// Dispose message queue and remove event listeners.
try {
if (this.messageQueueStateChangedHandler) {
Expand Down Expand Up @@ -1872,9 +1907,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
if (lastMessage && lastMessage.partial) {
// lastMessage.ts = Date.now() DO NOT update ts since it is used as a key for virtuoso list
lastMessage.partial = false
// instead of streaming partialMessage events, we do a save and post like normal to persist to disk
console.log("updating partial message", lastMessage)
// await this.saveClineMessages()
// Save the now-complete message to disk
console.log("saving completed message", lastMessage)
await this.saveClineMessages()
}

// Let assistant know their response was interrupted for when task is resumed
Expand Down Expand Up @@ -2198,6 +2233,8 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
}
} finally {
this.isStreaming = false
// Ensure any pending saves are flushed when streaming ends
await this.debouncedSave.flush()
}

// Need to call here in case the stream was aborted.
Expand Down
Loading
Loading