From 26518ccc03e9b89d0e6d75b2d10241ad5365fa6a Mon Sep 17 00:00:00 2001 From: Roo Code Date: Wed, 10 Sep 2025 14:47:19 +0000 Subject: [PATCH] fix: reduce task file size by optimizing message saves during streaming - Skip saving partial messages during streaming to avoid excessive disk writes - Add debounced save mechanism with configurable delay and maxWait - Flush pending saves when streaming ends to ensure data persistence - Add comprehensive tests for DebouncedSave utility This fix addresses issue #7851 where task files were growing to absurd sizes (95GB) due to the entire message array being saved on every partial message update during streaming operations. --- src/core/task/Task.ts | 93 +++++--- src/utils/__tests__/debouncedSave.test.ts | 257 ++++++++++++++++++++++ src/utils/debouncedSave.ts | 134 +++++++++++ 3 files changed, 456 insertions(+), 28 deletions(-) create mode 100644 src/utils/__tests__/debouncedSave.test.ts create mode 100644 src/utils/debouncedSave.ts diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index c5be865731..d2b97a69ee 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -112,6 +112,7 @@ import { processUserContentMentions } from "../mentions/processUserContentMentio import { getMessagesSinceLastSummary, summarizeConversation } from "../condense" import { Gpt5Metadata, ClineMessageWithMetadata } from "./types" import { MessageQueueService } from "../message-queue/MessageQueueService" +import { DebouncedSave } from "../../utils/debouncedSave" import { AutoApprovalHandler } from "./AutoApprovalHandler" @@ -297,6 +298,9 @@ export class Task extends EventEmitter implements TaskLike { private tokenUsageSnapshot?: TokenUsage private tokenUsageSnapshotAt?: number + // Debounced save for streaming operations + private debouncedSave: DebouncedSave + constructor({ provider, apiConfiguration, @@ -412,6 +416,12 @@ export class Task extends EventEmitter implements TaskLike { this.toolRepetitionDetector = new ToolRepetitionDetector(this.consecutiveMistakeLimit) + // Initialize debounced save for streaming operations + this.debouncedSave = new DebouncedSave({ + delay: 500, // 500ms debounce for streaming updates + maxWait: 2000, // Force save after 2 seconds max + }) + // Initialize todo list if provided if (initialTodos && initialTodos.length > 0) { this.todoList = initialTodos @@ -611,7 +621,11 @@ export class Task extends EventEmitter implements TaskLike { const provider = this.providerRef.deref() await provider?.postStateToWebview() this.emit(RooCodeEventName.Message, { action: "created", message }) - await this.saveClineMessages() + + // Skip saving partial messages to avoid excessive disk writes during streaming + if (!message.partial) { + await this.saveClineMessages() + } const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled() @@ -646,6 +660,8 @@ export class Task extends EventEmitter implements TaskLike { await provider?.postMessageToWebview({ type: "messageUpdated", clineMessage: message }) this.emit(RooCodeEventName.Message, { action: "updated", message }) + // Skip saving partial messages to avoid excessive disk writes during streaming + // The message will be saved when it's complete const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled() if (shouldCaptureMessage) { @@ -656,34 +672,44 @@ export class Task extends EventEmitter implements TaskLike { } } - private async saveClineMessages() { - try { - await saveTaskMessages({ - messages: this.clineMessages, - taskId: this.taskId, - globalStoragePath: this.globalStoragePath, - }) + private async saveClineMessages(immediate: boolean = false) { + const saveFunction = async () => { + try { + await saveTaskMessages({ + messages: this.clineMessages, + taskId: this.taskId, + globalStoragePath: this.globalStoragePath, + }) - const { historyItem, tokenUsage } = await taskMetadata({ - taskId: this.taskId, - rootTaskId: this.rootTaskId, - parentTaskId: this.parentTaskId, - taskNumber: this.taskNumber, - messages: this.clineMessages, - globalStoragePath: this.globalStoragePath, - workspace: this.cwd, - mode: this._taskMode || defaultModeSlug, // Use the task's own mode, not the current provider mode. - }) + const { historyItem, tokenUsage } = await taskMetadata({ + taskId: this.taskId, + rootTaskId: this.rootTaskId, + parentTaskId: this.parentTaskId, + taskNumber: this.taskNumber, + messages: this.clineMessages, + globalStoragePath: this.globalStoragePath, + workspace: this.cwd, + mode: this._taskMode || defaultModeSlug, // Use the task's own mode, not the current provider mode. + }) - if (hasTokenUsageChanged(tokenUsage, this.tokenUsageSnapshot)) { - this.emit(RooCodeEventName.TaskTokenUsageUpdated, this.taskId, tokenUsage) - this.tokenUsageSnapshot = undefined - this.tokenUsageSnapshotAt = undefined + if (hasTokenUsageChanged(tokenUsage, this.tokenUsageSnapshot)) { + this.emit(RooCodeEventName.TaskTokenUsageUpdated, this.taskId, tokenUsage) + this.tokenUsageSnapshot = undefined + this.tokenUsageSnapshotAt = undefined + } + + await this.providerRef.deref()?.updateTaskHistory(historyItem) + } catch (error) { + console.error("Failed to save Roo messages:", error) } + } - await this.providerRef.deref()?.updateTaskHistory(historyItem) - } catch (error) { - console.error("Failed to save Roo messages:", error) + // If immediate save is requested or we're not streaming, save immediately + if (immediate || !this.isStreaming) { + await saveFunction() + } else { + // During streaming, use debounced save to reduce disk writes + this.debouncedSave.schedule(saveFunction) } } @@ -1072,6 +1098,7 @@ export class Task extends EventEmitter implements TaskLike { lastMessage.images = images lastMessage.partial = partial lastMessage.progressStatus = progressStatus + // Don't save partial messages - just update the UI this.updateClineMessage(lastMessage) } else { // This is a new partial message, so add it with partial state. @@ -1506,6 +1533,14 @@ export class Task extends EventEmitter implements TaskLike { public dispose(): void { console.log(`[Task#dispose] disposing task ${this.taskId}.${this.instanceId}`) + // Flush any pending saves before disposing + try { + this.debouncedSave.flush().catch(console.error) + this.debouncedSave.dispose() + } catch (error) { + console.error("Error disposing debounced save:", error) + } + // Dispose message queue and remove event listeners. try { if (this.messageQueueStateChangedHandler) { @@ -1872,9 +1907,9 @@ export class Task extends EventEmitter implements TaskLike { if (lastMessage && lastMessage.partial) { // lastMessage.ts = Date.now() DO NOT update ts since it is used as a key for virtuoso list lastMessage.partial = false - // instead of streaming partialMessage events, we do a save and post like normal to persist to disk - console.log("updating partial message", lastMessage) - // await this.saveClineMessages() + // Save the now-complete message to disk + console.log("saving completed message", lastMessage) + await this.saveClineMessages() } // Let assistant know their response was interrupted for when task is resumed @@ -2198,6 +2233,8 @@ export class Task extends EventEmitter implements TaskLike { } } finally { this.isStreaming = false + // Ensure any pending saves are flushed when streaming ends + await this.debouncedSave.flush() } // Need to call here in case the stream was aborted. diff --git a/src/utils/__tests__/debouncedSave.test.ts b/src/utils/__tests__/debouncedSave.test.ts new file mode 100644 index 0000000000..017673d16b --- /dev/null +++ b/src/utils/__tests__/debouncedSave.test.ts @@ -0,0 +1,257 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest" +import { DebouncedSave } from "../debouncedSave" + +describe("DebouncedSave", () => { + let debouncedSave: DebouncedSave + + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + if (debouncedSave) { + debouncedSave.dispose() + } + }) + + describe("basic functionality", () => { + it("should debounce multiple save calls", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 1000 }) + + // Schedule multiple saves + debouncedSave.schedule(saveFunction) + debouncedSave.schedule(saveFunction) + debouncedSave.schedule(saveFunction) + + // Should not have been called yet + expect(saveFunction).not.toHaveBeenCalled() + + // Advance time to trigger the debounced save + await vi.advanceTimersByTimeAsync(1000) + + // Should have been called only once + expect(saveFunction).toHaveBeenCalledTimes(1) + }) + + it("should reset timer when new save is scheduled", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 1000, maxWait: 5000 }) + + debouncedSave.schedule(saveFunction) + + // Advance time partially + await vi.advanceTimersByTimeAsync(500) + expect(saveFunction).not.toHaveBeenCalled() + + // Schedule another save - should reset the debounce timer but not maxWait + debouncedSave.schedule(saveFunction) + + // Advance time by 500ms more (total 1000ms from start) + await vi.advanceTimersByTimeAsync(500) + // Should still not be called because debounce timer was reset + expect(saveFunction).not.toHaveBeenCalled() + + // Advance by another 500ms (1000ms from second schedule) + await vi.advanceTimersByTimeAsync(500) + expect(saveFunction).toHaveBeenCalledTimes(1) + }) + + it("should enforce maxWait limit", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 1000, maxWait: 2000 }) + + // Schedule first save + debouncedSave.schedule(saveFunction) + + // Keep scheduling saves to reset the debounce timer + // but maxWait timer should still fire after 2000ms + await vi.advanceTimersByTimeAsync(500) + debouncedSave.schedule(saveFunction) + + await vi.advanceTimersByTimeAsync(500) + debouncedSave.schedule(saveFunction) + + await vi.advanceTimersByTimeAsync(500) + debouncedSave.schedule(saveFunction) + + // Total time: 1500ms, not yet at maxWait + expect(saveFunction).not.toHaveBeenCalled() + + // Advance to 2000ms total - should trigger maxWait + await vi.advanceTimersByTimeAsync(500) + expect(saveFunction).toHaveBeenCalledTimes(1) + }) + }) + + describe("flush functionality", () => { + it("should execute pending save immediately when flushed", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 1000 }) + + debouncedSave.schedule(saveFunction) + expect(saveFunction).not.toHaveBeenCalled() + + // Flush immediately + await debouncedSave.flush() + expect(saveFunction).toHaveBeenCalledTimes(1) + }) + + it("should do nothing when flush is called with no pending save", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 1000 }) + + // Flush without scheduling + await debouncedSave.flush() + expect(saveFunction).not.toHaveBeenCalled() + }) + + it("should clear timers after flush", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 1000 }) + + debouncedSave.schedule(saveFunction) + await debouncedSave.flush() + + // Advance time - should not trigger another save + await vi.advanceTimersByTimeAsync(1000) + expect(saveFunction).toHaveBeenCalledTimes(1) + }) + }) + + describe("cancel functionality", () => { + it("should cancel pending save operations", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 1000 }) + + debouncedSave.schedule(saveFunction) + debouncedSave.cancel() + + // Advance time - should not trigger save + await vi.advanceTimersByTimeAsync(1000) + expect(saveFunction).not.toHaveBeenCalled() + }) + + it("should clear all timers when cancelled", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 1000, maxWait: 2000 }) + + debouncedSave.schedule(saveFunction) + debouncedSave.cancel() + + // Advance time past maxWait - should not trigger save + await vi.advanceTimersByTimeAsync(3000) + expect(saveFunction).not.toHaveBeenCalled() + }) + }) + + describe("error handling", () => { + it("should handle errors in save function without throwing", async () => { + const error = new Error("Save failed") + const saveFunction = vi.fn().mockRejectedValue(error) + const consoleErrorSpy = vi.spyOn(console, "error").mockImplementation(() => {}) + + debouncedSave = new DebouncedSave({ delay: 100 }) + + debouncedSave.schedule(saveFunction) + + // Advance time to trigger save + await vi.advanceTimersByTimeAsync(100) + + expect(saveFunction).toHaveBeenCalledTimes(1) + // The error should be logged but not thrown for scheduled saves + expect(consoleErrorSpy).toHaveBeenCalledWith("Error during debounced save:", error) + + consoleErrorSpy.mockRestore() + }) + + it("should re-throw errors from flush", async () => { + const error = new Error("Save failed") + const saveFunction = vi.fn().mockRejectedValue(error) + const consoleErrorSpy = vi.spyOn(console, "error").mockImplementation(() => {}) + + debouncedSave = new DebouncedSave({ delay: 1000 }) + + debouncedSave.schedule(saveFunction) + + // Flush should re-throw the error + await expect(debouncedSave.flush()).rejects.toThrow("Save failed") + + expect(consoleErrorSpy).toHaveBeenCalledWith("Error during debounced save:", error) + consoleErrorSpy.mockRestore() + }) + }) + + describe("dispose functionality", () => { + it("should cancel pending operations when disposed", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 1000 }) + + debouncedSave.schedule(saveFunction) + debouncedSave.dispose() + + // Advance time - should not trigger save + await vi.advanceTimersByTimeAsync(1000) + expect(saveFunction).not.toHaveBeenCalled() + }) + }) + + describe("multiple save functions", () => { + it("should use the latest save function", async () => { + const saveFunction1 = vi.fn().mockResolvedValue(undefined) + const saveFunction2 = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 1000 }) + + debouncedSave.schedule(saveFunction1) + debouncedSave.schedule(saveFunction2) + + await vi.advanceTimersByTimeAsync(1000) + + // Only the latest save function should be called + expect(saveFunction1).not.toHaveBeenCalled() + expect(saveFunction2).toHaveBeenCalledTimes(1) + }) + }) + + describe("edge cases", () => { + it("should handle zero delay", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 0 }) + + debouncedSave.schedule(saveFunction) + + // Even with 0 delay, it's still async + expect(saveFunction).not.toHaveBeenCalled() + + await vi.advanceTimersByTimeAsync(0) + expect(saveFunction).toHaveBeenCalledTimes(1) + }) + + it("should handle maxWait smaller than delay", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 2000, maxWait: 1000 }) + + debouncedSave.schedule(saveFunction) + + // Should trigger at maxWait (1000ms) not delay (2000ms) + await vi.advanceTimersByTimeAsync(1000) + expect(saveFunction).toHaveBeenCalledTimes(1) + }) + + it("should handle rapid successive schedules", async () => { + const saveFunction = vi.fn().mockResolvedValue(undefined) + debouncedSave = new DebouncedSave({ delay: 100 }) + + // Schedule many saves rapidly + for (let i = 0; i < 100; i++) { + debouncedSave.schedule(saveFunction) + } + + await vi.advanceTimersByTimeAsync(100) + + // Should still only call once + expect(saveFunction).toHaveBeenCalledTimes(1) + }) + }) +}) diff --git a/src/utils/debouncedSave.ts b/src/utils/debouncedSave.ts new file mode 100644 index 0000000000..e162bb9e6e --- /dev/null +++ b/src/utils/debouncedSave.ts @@ -0,0 +1,134 @@ +/** + * Utility for debounced saving of task messages to prevent excessive disk writes + * during streaming operations. + */ + +interface DebouncedSaveOptions { + /** + * The delay in milliseconds before executing the save + */ + delay?: number + /** + * Maximum time to wait before forcing a save + */ + maxWait?: number +} + +export class DebouncedSave { + private timer: NodeJS.Timeout | null = null + private maxTimer: NodeJS.Timeout | null = null + private pendingSave: (() => Promise) | null = null + private maxWaitStartTime: number = 0 + private readonly delay: number + private readonly maxWait: number + + constructor(options: DebouncedSaveOptions = {}) { + this.delay = options.delay ?? 1000 // Default 1 second debounce + this.maxWait = options.maxWait ?? 5000 // Default 5 seconds max wait + } + + /** + * Schedule a save operation with debouncing + */ + public schedule(saveFunction: () => Promise): void { + this.pendingSave = saveFunction + + // Clear existing timer + if (this.timer) { + clearTimeout(this.timer) + this.timer = null + } + + // Set up the debounced save + this.timer = setTimeout(() => { + this.executeSave() + }, this.delay) + + // Set up max wait timer if not already running + if (!this.maxTimer) { + this.maxWaitStartTime = Date.now() + this.maxTimer = setTimeout(() => { + this.executeSave() + }, this.maxWait) + } + } + + /** + * Execute the pending save immediately + */ + public async flush(): Promise { + // Clear timers + if (this.timer) { + clearTimeout(this.timer) + this.timer = null + } + if (this.maxTimer) { + clearTimeout(this.maxTimer) + this.maxTimer = null + } + + // Execute the save if pending + if (this.pendingSave) { + const saveFunction = this.pendingSave + this.pendingSave = null + this.maxWaitStartTime = 0 + + try { + await saveFunction() + } catch (error) { + console.error("Error during debounced save:", error) + // Re-throw for flush to maintain error handling behavior + throw error + } + } + } + + /** + * Cancel any pending save operations + */ + public cancel(): void { + if (this.timer) { + clearTimeout(this.timer) + this.timer = null + } + if (this.maxTimer) { + clearTimeout(this.maxTimer) + this.maxTimer = null + } + this.pendingSave = null + } + + private async executeSave(): Promise { + // Clear timers + if (this.timer) { + clearTimeout(this.timer) + this.timer = null + } + if (this.maxTimer) { + clearTimeout(this.maxTimer) + this.maxTimer = null + } + + // Execute the save + if (this.pendingSave) { + const saveFunction = this.pendingSave + this.pendingSave = null + this.maxWaitStartTime = 0 + + try { + await saveFunction() + } catch (error) { + console.error("Error during debounced save:", error) + // Don't re-throw for scheduled saves to prevent unhandled rejections + // Re-throwing is only done in flush() for explicit error handling + } + } + } + + /** + * Dispose of the debounced save instance + */ + public dispose(): void { + this.cancel() + } +}