Skip to content

Commit f8d5cc4

Browse files
author
Eric Wheeler
committed
refactor: make resumeTaskFromHistory message updates atomic
The resumeTaskFromHistory method was refactored to ensure that updates to both the cline message history and the API conversation history are fully atomic. Previously, the method would read the histories, modify them in-memory, and then call the respective modify functions with the already-modified data. This approach did not guarantee atomicity. The new implementation moves the modification logic directly inside the callbacks for `modifyClineMessages` and `modifyApiConversationHistory`. This ensures that the entire read-modify-write cycle for each history is performed as a single, uninterruptible transaction, preventing potential race conditions or partial state saves. This change also involved: - Adjusting variable scopes to support the new callback structure. - Removing the now-unused `getSavedClineMessages` helper method as part of the refactor. Signed-off-by: Eric Wheeler <[email protected]>
1 parent ef499ea commit f8d5cc4

File tree

1 file changed

+135
-140
lines changed

1 file changed

+135
-140
lines changed

src/core/task/Task.ts

Lines changed: 135 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -362,11 +362,6 @@ export class Task extends EventEmitter<ClineEvents> {
362362
}
363363

364364
// Cline Messages
365-
366-
private async getSavedClineMessages(): Promise<ClineMessage[]> {
367-
return readTaskMessages({ taskId: this.taskId, globalStoragePath: this.globalStoragePath })
368-
}
369-
370365
private async addToClineMessages(message: ClineMessage) {
371366
await this.modifyClineMessages(async (messages) => {
372367
messages.push(message)
@@ -902,33 +897,31 @@ export class Task extends EventEmitter<ClineEvents> {
902897
}
903898

904899
private async resumeTaskFromHistory() {
905-
const modifiedClineMessages = await this.getSavedClineMessages()
906-
907-
// Remove any resume messages that may have been added before
908-
const lastRelevantMessageIndex = findLastIndex(
909-
modifiedClineMessages,
910-
(m) => !(m.ask === "resume_task" || m.ask === "resume_completed_task"),
911-
)
900+
await this.modifyClineMessages(async (modifiedClineMessages) => {
901+
// Remove any resume messages that may have been added before
902+
const lastRelevantMessageIndex = findLastIndex(
903+
modifiedClineMessages,
904+
(m) => !(m.ask === "resume_task" || m.ask === "resume_completed_task"),
905+
)
912906

913-
if (lastRelevantMessageIndex !== -1) {
914-
modifiedClineMessages.splice(lastRelevantMessageIndex + 1)
915-
}
907+
if (lastRelevantMessageIndex !== -1) {
908+
modifiedClineMessages.splice(lastRelevantMessageIndex + 1)
909+
}
916910

917-
// since we don't use api_req_finished anymore, we need to check if the last api_req_started has a cost value, if it doesn't and no cancellation reason to present, then we remove it since it indicates an api request without any partial content streamed
918-
const lastApiReqStartedIndex = findLastIndex(
919-
modifiedClineMessages,
920-
(m) => m.type === "say" && m.say === "api_req_started",
921-
)
911+
// since we don't use api_req_finished anymore, we need to check if the last api_req_started has a cost value, if it doesn't and no cancellation reason to present, then we remove it since it indicates an api request without any partial content streamed
912+
const lastApiReqStartedIndex = findLastIndex(
913+
modifiedClineMessages,
914+
(m) => m.type === "say" && m.say === "api_req_started",
915+
)
922916

923-
if (lastApiReqStartedIndex !== -1) {
924-
const lastApiReqStarted = modifiedClineMessages[lastApiReqStartedIndex]
925-
const { cost, cancelReason }: ClineApiReqInfo = JSON.parse(lastApiReqStarted.text || "{}")
926-
if (cost === undefined && cancelReason === undefined) {
927-
modifiedClineMessages.splice(lastApiReqStartedIndex, 1)
917+
if (lastApiReqStartedIndex !== -1) {
918+
const lastApiReqStarted = modifiedClineMessages[lastApiReqStartedIndex]
919+
const { cost, cancelReason }: ClineApiReqInfo = JSON.parse(lastApiReqStarted.text || "{}")
920+
if (cost === undefined && cancelReason === undefined) {
921+
modifiedClineMessages.splice(lastApiReqStartedIndex, 1)
922+
}
928923
}
929-
}
930924

931-
await this.modifyClineMessages(async () => {
932925
return modifiedClineMessages
933926
})
934927

@@ -965,125 +958,131 @@ export class Task extends EventEmitter<ClineEvents> {
965958

966959
// Make sure that the api conversation history can be resumed by the API,
967960
// even if it goes out of sync with cline messages.
968-
let existingApiConversationHistory: ApiMessage[] = await this.getSavedApiConversationHistory()
969-
970-
// v2.0 xml tags refactor caveat: since we don't use tools anymore, we need to replace all tool use blocks with a text block since the API disallows conversations with tool uses and no tool schema
971-
const conversationWithoutToolBlocks = existingApiConversationHistory.map((message) => {
972-
if (Array.isArray(message.content)) {
973-
const newContent = message.content.map((block) => {
974-
if (block.type === "tool_use") {
975-
// It's important we convert to the new tool schema
976-
// format so the model doesn't get confused about how to
977-
// invoke tools.
978-
const inputAsXml = Object.entries(block.input as Record<string, string>)
979-
.map(([key, value]) => `<${key}>\n${value}\n</${key}>`)
980-
.join("\n")
981-
return {
982-
type: "text",
983-
text: `<${block.name}>\n${inputAsXml}\n</${block.name}>`,
984-
} as Anthropic.Messages.TextBlockParam
985-
} else if (block.type === "tool_result") {
986-
// Convert block.content to text block array, removing images
987-
const contentAsTextBlocks = Array.isArray(block.content)
988-
? block.content.filter((item) => item.type === "text")
989-
: [{ type: "text", text: block.content }]
990-
const textContent = contentAsTextBlocks.map((item) => item.text).join("\n\n")
991-
const toolName = findToolName(block.tool_use_id, existingApiConversationHistory)
992-
return {
993-
type: "text",
994-
text: `[${toolName} Result]\n\n${textContent}`,
995-
} as Anthropic.Messages.TextBlockParam
996-
}
997-
return block
998-
})
999-
return { ...message, content: newContent }
1000-
}
1001-
return message
1002-
})
1003-
existingApiConversationHistory = conversationWithoutToolBlocks
1004-
1005-
// FIXME: remove tool use blocks altogether
1006-
1007-
// if the last message is an assistant message, we need to check if there's tool use since every tool use has to have a tool response
1008-
// if there's no tool use and only a text block, then we can just add a user message
1009-
// (note this isn't relevant anymore since we use custom tool prompts instead of tool use blocks, but this is here for legacy purposes in case users resume old tasks)
1010-
1011-
// if the last message is a user message, we can need to get the assistant message before it to see if it made tool calls, and if so, fill in the remaining tool responses with 'interrupted'
1012-
1013-
let modifiedOldUserContent: Anthropic.Messages.ContentBlockParam[] // either the last message if its user message, or the user message before the last (assistant) message
1014-
let modifiedApiConversationHistory: ApiMessage[] // need to remove the last user message to replace with new modified user message
1015-
if (existingApiConversationHistory.length > 0) {
1016-
const lastMessage = existingApiConversationHistory[existingApiConversationHistory.length - 1]
1017-
1018-
if (lastMessage.role === "assistant") {
1019-
const content = Array.isArray(lastMessage.content)
1020-
? lastMessage.content
1021-
: [{ type: "text", text: lastMessage.content }]
1022-
const hasToolUse = content.some((block) => block.type === "tool_use")
1023-
1024-
if (hasToolUse) {
1025-
const toolUseBlocks = content.filter(
1026-
(block) => block.type === "tool_use",
1027-
) as Anthropic.Messages.ToolUseBlock[]
1028-
const toolResponses: Anthropic.ToolResultBlockParam[] = toolUseBlocks.map((block) => ({
1029-
type: "tool_result",
1030-
tool_use_id: block.id,
1031-
content: "Task was interrupted before this tool call could be completed.",
1032-
}))
1033-
modifiedApiConversationHistory = [...existingApiConversationHistory] // no changes
1034-
modifiedOldUserContent = [...toolResponses]
1035-
} else {
1036-
modifiedApiConversationHistory = [...existingApiConversationHistory]
1037-
modifiedOldUserContent = []
961+
let modifiedOldUserContent: Anthropic.Messages.ContentBlockParam[] | undefined
962+
await this.modifyApiConversationHistory(async (existingApiConversationHistory) => {
963+
const conversationWithoutToolBlocks = existingApiConversationHistory.map((message) => {
964+
if (Array.isArray(message.content)) {
965+
const newContent = message.content.map((block) => {
966+
if (block.type === "tool_use") {
967+
// It's important we convert to the new tool schema
968+
// format so the model doesn't get confused about how to
969+
// invoke tools.
970+
const inputAsXml = Object.entries(block.input as Record<string, string>)
971+
.map(([key, value]) => `<${key}>\n${value}\n</${key}>`)
972+
.join("\n")
973+
return {
974+
type: "text",
975+
text: `<${block.name}>\n${inputAsXml}\n</${block.name}>`,
976+
} as Anthropic.Messages.TextBlockParam
977+
} else if (block.type === "tool_result") {
978+
// Convert block.content to text block array, removing images
979+
const contentAsTextBlocks = Array.isArray(block.content)
980+
? block.content.filter((item) => item.type === "text")
981+
: [{ type: "text", text: block.content }]
982+
const textContent = contentAsTextBlocks.map((item) => item.text).join("\n\n")
983+
const toolName = findToolName(block.tool_use_id, existingApiConversationHistory)
984+
return {
985+
type: "text",
986+
text: `[${toolName} Result]\n\n${textContent}`,
987+
} as Anthropic.Messages.TextBlockParam
988+
}
989+
return block
990+
})
991+
return { ...message, content: newContent }
1038992
}
1039-
} else if (lastMessage.role === "user") {
1040-
const previousAssistantMessage: ApiMessage | undefined =
1041-
existingApiConversationHistory[existingApiConversationHistory.length - 2]
1042-
1043-
const existingUserContent: Anthropic.Messages.ContentBlockParam[] = Array.isArray(lastMessage.content)
1044-
? lastMessage.content
1045-
: [{ type: "text", text: lastMessage.content }]
1046-
if (previousAssistantMessage && previousAssistantMessage.role === "assistant") {
1047-
const assistantContent = Array.isArray(previousAssistantMessage.content)
1048-
? previousAssistantMessage.content
1049-
: [{ type: "text", text: previousAssistantMessage.content }]
1050-
1051-
const toolUseBlocks = assistantContent.filter(
1052-
(block) => block.type === "tool_use",
1053-
) as Anthropic.Messages.ToolUseBlock[]
1054-
1055-
if (toolUseBlocks.length > 0) {
1056-
const existingToolResults = existingUserContent.filter(
1057-
(block) => block.type === "tool_result",
1058-
) as Anthropic.ToolResultBlockParam[]
1059-
1060-
const missingToolResponses: Anthropic.ToolResultBlockParam[] = toolUseBlocks
1061-
.filter(
1062-
(toolUse) => !existingToolResults.some((result) => result.tool_use_id === toolUse.id),
1063-
)
1064-
.map((toolUse) => ({
1065-
type: "tool_result",
1066-
tool_use_id: toolUse.id,
1067-
content: "Task was interrupted before this tool call could be completed.",
1068-
}))
1069-
1070-
modifiedApiConversationHistory = existingApiConversationHistory.slice(0, -1) // removes the last user message
1071-
modifiedOldUserContent = [...existingUserContent, ...missingToolResponses]
993+
return message
994+
})
995+
existingApiConversationHistory = conversationWithoutToolBlocks
996+
997+
// FIXME: remove tool use blocks altogether
998+
999+
// if the last message is an assistant message, we need to check if there's tool use since every tool use has to have a tool response
1000+
// if there's no tool use and only a text block, then we can just add a user message
1001+
// (note this isn't relevant anymore since we use custom tool prompts instead of tool use blocks, but this is here for legacy purposes in case users resume old tasks)
1002+
1003+
// if the last message is a user message, we can need to get the assistant message before it to see if it made tool calls, and if so, fill in the remaining tool responses with 'interrupted'
1004+
1005+
let modifiedApiConversationHistory: ApiMessage[] // need to remove the last user message to replace with new modified user message
1006+
if (existingApiConversationHistory.length > 0) {
1007+
const lastMessage = existingApiConversationHistory[existingApiConversationHistory.length - 1]
1008+
1009+
if (lastMessage.role === "assistant") {
1010+
const content = Array.isArray(lastMessage.content)
1011+
? lastMessage.content
1012+
: [{ type: "text", text: lastMessage.content }]
1013+
const hasToolUse = content.some((block) => block.type === "tool_use")
1014+
1015+
if (hasToolUse) {
1016+
const toolUseBlocks = content.filter(
1017+
(block) => block.type === "tool_use",
1018+
) as Anthropic.Messages.ToolUseBlock[]
1019+
const toolResponses: Anthropic.ToolResultBlockParam[] = toolUseBlocks.map((block) => ({
1020+
type: "tool_result",
1021+
tool_use_id: block.id,
1022+
content: "Task was interrupted before this tool call could be completed.",
1023+
}))
1024+
modifiedApiConversationHistory = [...existingApiConversationHistory] // no changes
1025+
modifiedOldUserContent = [...toolResponses]
1026+
} else {
1027+
modifiedApiConversationHistory = [...existingApiConversationHistory]
1028+
modifiedOldUserContent = []
1029+
}
1030+
} else if (lastMessage.role === "user") {
1031+
const previousAssistantMessage: ApiMessage | undefined =
1032+
existingApiConversationHistory[existingApiConversationHistory.length - 2]
1033+
1034+
const existingUserContent: Anthropic.Messages.ContentBlockParam[] = Array.isArray(
1035+
lastMessage.content,
1036+
)
1037+
? lastMessage.content
1038+
: [{ type: "text", text: lastMessage.content }]
1039+
if (previousAssistantMessage && previousAssistantMessage.role === "assistant") {
1040+
const assistantContent = Array.isArray(previousAssistantMessage.content)
1041+
? previousAssistantMessage.content
1042+
: [{ type: "text", text: previousAssistantMessage.content }]
1043+
1044+
const toolUseBlocks = assistantContent.filter(
1045+
(block) => block.type === "tool_use",
1046+
) as Anthropic.Messages.ToolUseBlock[]
1047+
1048+
if (toolUseBlocks.length > 0) {
1049+
const existingToolResults = existingUserContent.filter(
1050+
(block) => block.type === "tool_result",
1051+
) as Anthropic.ToolResultBlockParam[]
1052+
1053+
const missingToolResponses: Anthropic.ToolResultBlockParam[] = toolUseBlocks
1054+
.filter(
1055+
(toolUse) =>
1056+
!existingToolResults.some((result) => result.tool_use_id === toolUse.id),
1057+
)
1058+
.map((toolUse) => ({
1059+
type: "tool_result",
1060+
tool_use_id: toolUse.id,
1061+
content: "Task was interrupted before this tool call could be completed.",
1062+
}))
1063+
1064+
modifiedApiConversationHistory = existingApiConversationHistory.slice(0, -1) // removes the last user message
1065+
modifiedOldUserContent = [...existingUserContent, ...missingToolResponses]
1066+
} else {
1067+
modifiedApiConversationHistory = existingApiConversationHistory.slice(0, -1)
1068+
modifiedOldUserContent = [...existingUserContent]
1069+
}
10721070
} else {
10731071
modifiedApiConversationHistory = existingApiConversationHistory.slice(0, -1)
10741072
modifiedOldUserContent = [...existingUserContent]
10751073
}
10761074
} else {
1077-
modifiedApiConversationHistory = existingApiConversationHistory.slice(0, -1)
1078-
modifiedOldUserContent = [...existingUserContent]
1075+
throw new Error("Unexpected: Last message is not a user or assistant message")
10791076
}
10801077
} else {
1081-
throw new Error("Unexpected: Last message is not a user or assistant message")
1078+
throw new Error("Unexpected: No existing API conversation history")
10821079
}
1083-
} else {
1084-
throw new Error("Unexpected: No existing API conversation history")
1085-
}
1080+
return modifiedApiConversationHistory
1081+
})
10861082

1083+
if (!modifiedOldUserContent) {
1084+
throw new Error("modifiedOldUserContent was not set")
1085+
}
10871086
let newUserContent: Anthropic.Messages.ContentBlockParam[] = [...modifiedOldUserContent]
10881087

10891088
const agoText = ((): string => {
@@ -1132,10 +1131,6 @@ export class Task extends EventEmitter<ClineEvents> {
11321131
newUserContent.push(...formatResponse.imageBlocks(responseImages))
11331132
}
11341133

1135-
await this.modifyApiConversationHistory(async () => {
1136-
return modifiedApiConversationHistory
1137-
})
1138-
11391134
console.log(`[subtasks] task ${this.taskId}.${this.instanceId} resuming from history item`)
11401135

11411136
await this.initiateTaskLoop(newUserContent)

0 commit comments

Comments
 (0)