Skip to content

Commit 26518cc

Browse files
committed
fix: reduce task file size by optimizing message saves during streaming
- Skip saving partial messages during streaming to avoid excessive disk writes - Add debounced save mechanism with configurable delay and maxWait - Flush pending saves when streaming ends to ensure data persistence - Add comprehensive tests for DebouncedSave utility This fix addresses issue #7851 where task files were growing to absurd sizes (95GB) due to the entire message array being saved on every partial message update during streaming operations.
1 parent 7cd6520 commit 26518cc

File tree

3 files changed

+456
-28
lines changed

3 files changed

+456
-28
lines changed

src/core/task/Task.ts

Lines changed: 65 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ import { processUserContentMentions } from "../mentions/processUserContentMentio
112112
import { getMessagesSinceLastSummary, summarizeConversation } from "../condense"
113113
import { Gpt5Metadata, ClineMessageWithMetadata } from "./types"
114114
import { MessageQueueService } from "../message-queue/MessageQueueService"
115+
import { DebouncedSave } from "../../utils/debouncedSave"
115116

116117
import { AutoApprovalHandler } from "./AutoApprovalHandler"
117118

@@ -297,6 +298,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
297298
private tokenUsageSnapshot?: TokenUsage
298299
private tokenUsageSnapshotAt?: number
299300

301+
// Debounced save for streaming operations
302+
private debouncedSave: DebouncedSave
303+
300304
constructor({
301305
provider,
302306
apiConfiguration,
@@ -412,6 +416,12 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
412416

413417
this.toolRepetitionDetector = new ToolRepetitionDetector(this.consecutiveMistakeLimit)
414418

419+
// Initialize debounced save for streaming operations
420+
this.debouncedSave = new DebouncedSave({
421+
delay: 500, // 500ms debounce for streaming updates
422+
maxWait: 2000, // Force save after 2 seconds max
423+
})
424+
415425
// Initialize todo list if provided
416426
if (initialTodos && initialTodos.length > 0) {
417427
this.todoList = initialTodos
@@ -611,7 +621,11 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
611621
const provider = this.providerRef.deref()
612622
await provider?.postStateToWebview()
613623
this.emit(RooCodeEventName.Message, { action: "created", message })
614-
await this.saveClineMessages()
624+
625+
// Skip saving partial messages to avoid excessive disk writes during streaming
626+
if (!message.partial) {
627+
await this.saveClineMessages()
628+
}
615629

616630
const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled()
617631

@@ -646,6 +660,8 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
646660
await provider?.postMessageToWebview({ type: "messageUpdated", clineMessage: message })
647661
this.emit(RooCodeEventName.Message, { action: "updated", message })
648662

663+
// Skip saving partial messages to avoid excessive disk writes during streaming
664+
// The message will be saved when it's complete
649665
const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled()
650666

651667
if (shouldCaptureMessage) {
@@ -656,34 +672,44 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
656672
}
657673
}
658674

659-
private async saveClineMessages() {
660-
try {
661-
await saveTaskMessages({
662-
messages: this.clineMessages,
663-
taskId: this.taskId,
664-
globalStoragePath: this.globalStoragePath,
665-
})
675+
private async saveClineMessages(immediate: boolean = false) {
676+
const saveFunction = async () => {
677+
try {
678+
await saveTaskMessages({
679+
messages: this.clineMessages,
680+
taskId: this.taskId,
681+
globalStoragePath: this.globalStoragePath,
682+
})
666683

667-
const { historyItem, tokenUsage } = await taskMetadata({
668-
taskId: this.taskId,
669-
rootTaskId: this.rootTaskId,
670-
parentTaskId: this.parentTaskId,
671-
taskNumber: this.taskNumber,
672-
messages: this.clineMessages,
673-
globalStoragePath: this.globalStoragePath,
674-
workspace: this.cwd,
675-
mode: this._taskMode || defaultModeSlug, // Use the task's own mode, not the current provider mode.
676-
})
684+
const { historyItem, tokenUsage } = await taskMetadata({
685+
taskId: this.taskId,
686+
rootTaskId: this.rootTaskId,
687+
parentTaskId: this.parentTaskId,
688+
taskNumber: this.taskNumber,
689+
messages: this.clineMessages,
690+
globalStoragePath: this.globalStoragePath,
691+
workspace: this.cwd,
692+
mode: this._taskMode || defaultModeSlug, // Use the task's own mode, not the current provider mode.
693+
})
677694

678-
if (hasTokenUsageChanged(tokenUsage, this.tokenUsageSnapshot)) {
679-
this.emit(RooCodeEventName.TaskTokenUsageUpdated, this.taskId, tokenUsage)
680-
this.tokenUsageSnapshot = undefined
681-
this.tokenUsageSnapshotAt = undefined
695+
if (hasTokenUsageChanged(tokenUsage, this.tokenUsageSnapshot)) {
696+
this.emit(RooCodeEventName.TaskTokenUsageUpdated, this.taskId, tokenUsage)
697+
this.tokenUsageSnapshot = undefined
698+
this.tokenUsageSnapshotAt = undefined
699+
}
700+
701+
await this.providerRef.deref()?.updateTaskHistory(historyItem)
702+
} catch (error) {
703+
console.error("Failed to save Roo messages:", error)
682704
}
705+
}
683706

684-
await this.providerRef.deref()?.updateTaskHistory(historyItem)
685-
} catch (error) {
686-
console.error("Failed to save Roo messages:", error)
707+
// If immediate save is requested or we're not streaming, save immediately
708+
if (immediate || !this.isStreaming) {
709+
await saveFunction()
710+
} else {
711+
// During streaming, use debounced save to reduce disk writes
712+
this.debouncedSave.schedule(saveFunction)
687713
}
688714
}
689715

@@ -1072,6 +1098,7 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
10721098
lastMessage.images = images
10731099
lastMessage.partial = partial
10741100
lastMessage.progressStatus = progressStatus
1101+
// Don't save partial messages - just update the UI
10751102
this.updateClineMessage(lastMessage)
10761103
} else {
10771104
// This is a new partial message, so add it with partial state.
@@ -1506,6 +1533,14 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
15061533
public dispose(): void {
15071534
console.log(`[Task#dispose] disposing task ${this.taskId}.${this.instanceId}`)
15081535

1536+
// Flush any pending saves before disposing
1537+
try {
1538+
this.debouncedSave.flush().catch(console.error)
1539+
this.debouncedSave.dispose()
1540+
} catch (error) {
1541+
console.error("Error disposing debounced save:", error)
1542+
}
1543+
15091544
// Dispose message queue and remove event listeners.
15101545
try {
15111546
if (this.messageQueueStateChangedHandler) {
@@ -1872,9 +1907,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
18721907
if (lastMessage && lastMessage.partial) {
18731908
// lastMessage.ts = Date.now() DO NOT update ts since it is used as a key for virtuoso list
18741909
lastMessage.partial = false
1875-
// instead of streaming partialMessage events, we do a save and post like normal to persist to disk
1876-
console.log("updating partial message", lastMessage)
1877-
// await this.saveClineMessages()
1910+
// Save the now-complete message to disk
1911+
console.log("saving completed message", lastMessage)
1912+
await this.saveClineMessages()
18781913
}
18791914

18801915
// Let assistant know their response was interrupted for when task is resumed
@@ -2198,6 +2233,8 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
21982233
}
21992234
} finally {
22002235
this.isStreaming = false
2236+
// Ensure any pending saves are flushed when streaming ends
2237+
await this.debouncedSave.flush()
22012238
}
22022239

22032240
// Need to call here in case the stream was aborted.

0 commit comments

Comments
 (0)