Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions src/core/kilocode/task/message-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/**
* Utility functions for message handling in Task
* kilocode_change - Created to fix orphaned partial ask messages bug
*/

import pWaitFor from "p-wait-for"
import type { ClineMessage, ClineAsk, ClineSay } from "@roo-code/types"

const GUARD_TIMEOUT = 30 * 1000 // 30 seconds
const GUARD_INTERVAL = 50 // 50 milliseconds

/**
* Message insertion guard to prevent concurrent message insertions.
* This prevents race conditions where checkpoint_saved messages can arrive
* during partial message updates, breaking message history order.
*/
export class MessageInsertionGuard {
private isInserting = false
private readonly timeout: number
private readonly interval: number

constructor(timeout = GUARD_TIMEOUT, interval = GUARD_INTERVAL) {
this.timeout = timeout
this.interval = interval
}

/**
* Wait for any in-flight message insertions to complete before proceeding.
* This ensures messages are inserted sequentially and prevents race conditions.
*/
async waitForClearance(): Promise<void> {
await pWaitFor(() => !this.isInserting, {
interval: this.interval,
timeout: this.timeout,
})
}

/**
* Acquire the insertion lock. Must be released with release() after insertion completes.
*/
acquire(): void {
this.isInserting = true
}

/**
* Release the insertion lock, allowing other insertions to proceed.
*/
release(): void {
this.isInserting = false
}

/**
* Check if a message insertion is currently in progress.
*/
isLocked(): boolean {
return this.isInserting
}
}

/**
* Search backwards through messages to find the most recent partial ask message
* of the specified type. This handles cases where non-interactive messages
* (like checkpoint_saved) are inserted between partial start and completion.
*
* @param messages - Array of Cline messages to search
* @param type - The ask type to search for
* @returns The partial message and its index, or undefined if not found
*/
export function findPartialAskMessage(
messages: ClineMessage[],
type: ClineAsk,
): { message: ClineMessage; index: number } | undefined {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i]
if (msg.type === "ask" && msg.ask === type && msg.partial === true) {
return { message: msg, index: i }
}
}
return undefined
}

/**
* Search backwards through messages to find the most recent partial say message
* of the specified type. Similar to findPartialAskMessage but for say messages.
*
* @param messages - Array of Cline messages to search
* @param type - The say type to search for
* @returns The partial message and its index, or undefined if not found
*/
export function findPartialSayMessage(
messages: ClineMessage[],
type: ClineSay,
): { message: ClineMessage; index: number } | undefined {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i]
if (msg.type === "say" && msg.say === type && msg.partial === true) {
return { message: msg, index: i }
}
}
return undefined
}
62 changes: 40 additions & 22 deletions src/core/task/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ import { ensureLocalKilorulesDirExists } from "../context/instructions/kilo-rule
import { getMessagesSinceLastSummary, summarizeConversation } from "../condense"
import { Gpt5Metadata, ClineMessageWithMetadata } from "./types"
import { MessageQueueService } from "../message-queue/MessageQueueService"
import { findPartialAskMessage, findPartialSayMessage, MessageInsertionGuard } from "../kilocode/task/message-utils" // kilocode_change

import { AutoApprovalHandler } from "./AutoApprovalHandler"
import { isAnyRecognizedKiloCodeError, isPaymentRequiredError } from "../../shared/kilocode/errorUtils"
Expand Down Expand Up @@ -221,6 +222,10 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
private readonly globalStoragePath: string
abort: boolean = false

// kilocode_change start: Message insertion guard to prevent race conditions with checkpoint messages
private readonly messageInsertionGuard = new MessageInsertionGuard()
// kilocode_change end

// TaskStatus
idleAsk?: ClineMessage
resumableAsk?: ClineMessage
Expand Down Expand Up @@ -647,24 +652,32 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
return readTaskMessages({ taskId: this.taskId, globalStoragePath: this.globalStoragePath })
}

// kilocode_change start: Guard against concurrent message insertions to prevent
private async addToClineMessages(message: ClineMessage) {
this.clineMessages.push(message)
const provider = this.providerRef.deref()
await provider?.postStateToWebview()
this.emit(RooCodeEventName.Message, { action: "created", message })
await this.saveClineMessages()
await this.messageInsertionGuard.waitForClearance()
this.messageInsertionGuard.acquire()

// kilocode_change start: no cloud service
// const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled()
try {
this.clineMessages.push(message)
const provider = this.providerRef.deref()
await provider?.postStateToWebview()
this.emit(RooCodeEventName.Message, { action: "created", message })
await this.saveClineMessages()

// if (shouldCaptureMessage) {
// CloudService.instance.captureEvent({
// event: TelemetryEventName.TASK_MESSAGE,
// properties: { taskId: this.taskId, message },
// })
// }
// kilocode_change end
// kilocode_change start: no cloud service
// const shouldCaptureMessage = message.partial !== true && CloudService.isEnabled()
// if (shouldCaptureMessage) {
// CloudService.instance.captureEvent({
// event: TelemetryEventName.TASK_MESSAGE,
// properties: { taskId: this.taskId, message },
// })
// }
// kilocode_change end
} finally {
this.messageInsertionGuard.release()
}
}
// kilocode_change end

public async overwriteClineMessages(newMessages: ClineMessage[]) {
this.clineMessages = newMessages
Expand Down Expand Up @@ -777,10 +790,13 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
let askTs: number

if (partial !== undefined) {
const lastMessage = this.clineMessages.at(-1)

const isUpdatingPreviousPartial =
lastMessage && lastMessage.partial && lastMessage.type === "ask" && lastMessage.ask === type
// kilocode_change start: Fix orphaned partial asks by searching backwards
// Search for the most recent partial ask of this type, handling cases where
// non-interactive messages (like checkpoint_saved) are inserted during streaming
const partialResult = findPartialAskMessage(this.clineMessages, type)
const lastMessage = partialResult?.message
const isUpdatingPreviousPartial = lastMessage !== undefined
// kilocode_change end

if (partial) {
if (isUpdatingPreviousPartial) {
Expand Down Expand Up @@ -1166,10 +1182,12 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
}

if (partial !== undefined) {
const lastMessage = this.clineMessages.at(-1)

const isUpdatingPreviousPartial =
lastMessage && lastMessage.partial && lastMessage.type === "say" && lastMessage.say === type
// kilocode_change start: Fix orphaned partial says by searching backwards
// Search for the most recent partial say of this type
const partialResult = findPartialSayMessage(this.clineMessages, type)
const lastMessage = partialResult?.message
const isUpdatingPreviousPartial = lastMessage !== undefined
// kilocode_change end

if (partial) {
if (isUpdatingPreviousPartial) {
Expand Down
Loading