Skip to content

Commit 8b63efc

Browse files
committed
refactor: Message guard
1 parent e5e4b13 commit 8b63efc

File tree

3 files changed

+402
-16
lines changed

3 files changed

+402
-16
lines changed

src/core/kilocode/task/message-utils.ts

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,62 @@
11
/**
22
* Utility functions for message handling in Task
3-
* @kilocode_change - Created to fix orphaned partial ask messages bug
3+
* kilocode_change - Created to fix orphaned partial ask messages bug
44
*/
55

6+
import pWaitFor from "p-wait-for"
67
import type { ClineMessage, ClineAsk, ClineSay } from "@roo-code/types"
78

9+
const GUARD_TIMEOUT = 30 * 1000 // 30 seconds
10+
const GUARD_INTERVAL = 50 // 50 milliseconds
11+
12+
/**
13+
* Message insertion guard to prevent concurrent message insertions.
14+
* This prevents race conditions where checkpoint_saved messages can arrive
15+
* during partial message updates, breaking message history order.
16+
*/
17+
export class MessageInsertionGuard {
18+
private isInserting = false
19+
private readonly timeout: number
20+
private readonly interval: number
21+
22+
constructor(timeout = GUARD_TIMEOUT, interval = GUARD_INTERVAL) {
23+
this.timeout = timeout
24+
this.interval = interval
25+
}
26+
27+
/**
28+
* Wait for any in-flight message insertions to complete before proceeding.
29+
* This ensures messages are inserted sequentially and prevents race conditions.
30+
*/
31+
async waitForClearance(): Promise<void> {
32+
await pWaitFor(() => !this.isInserting, {
33+
interval: this.interval,
34+
timeout: this.timeout,
35+
})
36+
}
37+
38+
/**
39+
* Acquire the insertion lock. Must be released with release() after insertion completes.
40+
*/
41+
acquire(): void {
42+
this.isInserting = true
43+
}
44+
45+
/**
46+
* Release the insertion lock, allowing other insertions to proceed.
47+
*/
48+
release(): void {
49+
this.isInserting = false
50+
}
51+
52+
/**
53+
* Check if a message insertion is currently in progress.
54+
*/
55+
isLocked(): boolean {
56+
return this.isInserting
57+
}
58+
}
59+
860
/**
961
* Search backwards through messages to find the most recent partial ask message
1062
* of the specified type. This handles cases where non-interactive messages

src/core/task/Task.ts

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ import { ensureLocalKilorulesDirExists } from "../context/instructions/kilo-rule
119119
import { getMessagesSinceLastSummary, summarizeConversation } from "../condense"
120120
import { Gpt5Metadata, ClineMessageWithMetadata } from "./types"
121121
import { MessageQueueService } from "../message-queue/MessageQueueService"
122-
import { findPartialAskMessage, findPartialSayMessage } from "../kilocode/task/message-utils" // kilocode_change
122+
import { findPartialAskMessage, findPartialSayMessage, MessageInsertionGuard } from "../kilocode/task/message-utils" // kilocode_change
123123

124124
import { AutoApprovalHandler } from "./AutoApprovalHandler"
125125
import { isAnyRecognizedKiloCodeError, isPaymentRequiredError } from "../../shared/kilocode/errorUtils"
@@ -221,6 +221,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
221221
private readonly globalStoragePath: string
222222
abort: boolean = false
223223

224+
// kilocode_change start: Message insertion guard to prevent race conditions with checkpoint messages
225+
private readonly messageInsertionGuard = new MessageInsertionGuard()
226+
// kilocode_change end
227+
224228
// TaskStatus
225229
idleAsk?: ClineMessage
226230
resumableAsk?: ClineMessage
@@ -639,24 +643,32 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
639643
return readTaskMessages({ taskId: this.taskId, globalStoragePath: this.globalStoragePath })
640644
}
641645

646+
// kilocode_change start: Guard against concurrent message insertions to prevent
642647
private async addToClineMessages(message: ClineMessage) {
643-
this.clineMessages.push(message)
644-
const provider = this.providerRef.deref()
645-
await provider?.postStateToWebview()
646-
this.emit(RooCodeEventName.Message, { action: "created", message })
647-
await this.saveClineMessages()
648+
await this.messageInsertionGuard.waitForClearance()
649+
this.messageInsertionGuard.acquire()
648650

649-
// kilocode_change start: no cloud service
650-
// const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled()
651+
try {
652+
this.clineMessages.push(message)
653+
const provider = this.providerRef.deref()
654+
await provider?.postStateToWebview()
655+
this.emit(RooCodeEventName.Message, { action: "created", message })
656+
await this.saveClineMessages()
651657

652-
// if (shouldCaptureMessage) {
653-
// CloudService.instance.captureEvent({
654-
// event: TelemetryEventName.TASK_MESSAGE,
655-
// properties: { taskId: this.taskId, message },
656-
// })
657-
// }
658-
// kilocode_change end
658+
// kilocode_change start: no cloud service
659+
// const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled()
660+
// if (shouldCaptureMessage) {
661+
// CloudService.instance.captureEvent({
662+
// event: TelemetryEventName.TASK_MESSAGE,
663+
// properties: { taskId: this.taskId, message },
664+
// })
665+
// }
666+
// kilocode_change end
667+
} finally {
668+
this.messageInsertionGuard.release()
669+
}
659670
}
671+
// kilocode_change end
660672

661673
public async overwriteClineMessages(newMessages: ClineMessage[]) {
662674
this.clineMessages = newMessages

0 commit comments

Comments
 (0)