Skip to content

Commit 69f02bc

Browse files
authored
Merge pull request #11 from damassi/refactor/agent-loop-abort
refactor: Simplify agent loop / fix abort controller
2 parents 5815716 + e84e00d commit 69f02bc

File tree

10 files changed

+169
-159
lines changed

10 files changed

+169
-159
lines changed

src/__tests__/store.test.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ describe("Store", () => {
4848
expect(getState().sessionId).toBeUndefined()
4949
expect(getState().stats).toBeUndefined()
5050
expect(getState().pendingToolPermission).toBeUndefined()
51-
expect(getState().abortController).toBeInstanceOf(AbortController)
51+
expect(getState().abortController).toBeUndefined()
5252
})
5353

5454
test("should have MessageQueue instance", () => {

src/components/AgentChat.tsx

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import { Box, Text, useInput } from "ink"
2-
import Spinner from "ink-spinner"
31
import { ChatHeader } from "components/ChatHeader"
42
import { Markdown } from "components/Markdown"
53
import { Stats } from "components/Stats"
@@ -8,6 +6,8 @@ import { ToolUses } from "components/ToolUses"
86
import { UserInput } from "components/UserInput"
97
import { useAgent } from "hooks/useAgent"
108
import { useMcpClient } from "hooks/useMcpClient"
9+
import { Box, Text, useInput } from "ink"
10+
import Spinner from "ink-spinner"
1111
import { AgentStore } from "store"
1212

1313
export const AgentChat: React.FC = () => {
@@ -92,21 +92,23 @@ export const AgentChat: React.FC = () => {
9292

9393
case state.isProcessing: {
9494
return (
95-
<Text dimColor>
96-
<Text color="cyan">
97-
<Spinner type="balloon" />
95+
<>
96+
<Text dimColor>
97+
<Text color="cyan">
98+
<Spinner type="balloon" />
99+
</Text>
100+
{" Agent is thinking..."}
98101
</Text>
99-
{" Agent is thinking..."}
100-
</Text>
101-
)
102-
}
103102

104-
default: {
105-
return <UserInput />
103+
<Box marginBottom={1} />
104+
</>
105+
)
106106
}
107107
}
108108
})()}
109109

110+
<UserInput />
111+
110112
<Box marginTop={1} />
111113
</Box>
112114
)

src/components/UserInput.tsx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ export const UserInput: React.FC = () => {
3535
})
3636

3737
actions.sendMessage(value)
38+
39+
// Slight delay just in case user has aborted request via second message
40+
setTimeout(() => {
41+
actions.setIsProcessing(true)
42+
}, 100)
43+
3844
reset()
3945
}
4046

src/hooks/useAgent.ts

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,64 @@
1-
import { useEffect, useRef } from "react"
1+
import { useCallback, useEffect, useRef } from "react"
22
import { AgentStore } from "store"
3+
import { log } from "utils/logger"
34
import { messageTypes, runAgentLoop } from "utils/runAgentLoop"
45

56
export function useAgent() {
67
const messageQueue = AgentStore.useStoreState((state) => state.messageQueue)
7-
const sessionId = AgentStore.useStoreState((state) => state.sessionId)
88
const config = AgentStore.useStoreState((state) => state.config)
9-
const abortController = AgentStore.useStoreState(
10-
(state) => state.abortController
11-
)
129
const actions = AgentStore.useStoreActions((actions) => actions)
1310
const currentAssistantMessageRef = useRef("")
14-
const abortControllerRef = useRef(abortController)
11+
const sessionIdRef = useRef<string | undefined>(undefined)
12+
const abortControllerRef = useRef<AbortController | undefined>(undefined)
13+
const connectedServersRef = useRef<Set<string>>(new Set())
1514

16-
// Update ref when abort controller changes
17-
abortControllerRef.current = abortController
15+
const runQuery = useCallback(
16+
async (userMessage: string) => {
17+
if (abortControllerRef.current) {
18+
log("[useAgent] Aborting existing query for new message:", userMessage)
1819

19-
useEffect(() => {
20-
const streamEnabled = config.stream ?? false
21-
22-
const runAgent = async () => {
23-
const { agentLoop } = await runAgentLoop({
24-
messageQueue,
25-
sessionId,
26-
config,
27-
abortControllerRef,
28-
onToolPermissionRequest: (toolName, input) => {
29-
actions.setPendingToolPermission({ toolName, input })
30-
},
31-
onServerConnection: (status) => {
32-
actions.addChatHistoryEntry({
33-
type: "message",
34-
role: "system",
35-
content: status,
36-
})
37-
},
38-
setIsProcessing: actions.setIsProcessing,
39-
})
20+
// When a new message comes in, always abort the old one and start fresh
21+
abortControllerRef.current.abort()
22+
}
23+
24+
// Create fresh abort controller for this query
25+
const abortController = new AbortController()
26+
abortControllerRef.current = abortController
27+
actions.setAbortController(abortController)
28+
29+
const streamEnabled = config.stream ?? false
4030

4131
try {
32+
const agentLoop = runAgentLoop({
33+
abortController,
34+
config,
35+
connectedServers: connectedServersRef.current,
36+
messageQueue,
37+
onToolPermissionRequest: (toolName, input) => {
38+
actions.setPendingToolPermission({ toolName, input })
39+
},
40+
onServerConnection: (status) => {
41+
actions.addChatHistoryEntry({
42+
type: "message",
43+
role: "system",
44+
content: status,
45+
})
46+
},
47+
sessionId: sessionIdRef.current,
48+
setIsProcessing: actions.setIsProcessing,
49+
userMessage,
50+
})
51+
4252
for await (const message of agentLoop) {
53+
if (abortController.signal.aborted) {
54+
log("[useAgent] Query was aborted, stopping message processing")
55+
return
56+
}
57+
4358
switch (true) {
4459
case message.type === messageTypes.SYSTEM &&
4560
message.subtype === messageTypes.INIT: {
61+
sessionIdRef.current = message.session_id
4662
actions.setSessionId(message.session_id)
4763
actions.handleMcpServerStatus(message.mcp_servers)
4864

@@ -127,6 +143,12 @@ export function useAgent() {
127143
}
128144
}
129145
} catch (error) {
146+
if (error instanceof Error && error.name === "AbortError") {
147+
actions.setIsProcessing(false)
148+
return
149+
}
150+
151+
// Handle other errors
130152
if (
131153
error instanceof Error &&
132154
!error.message.includes("process aborted by user")
@@ -136,8 +158,18 @@ export function useAgent() {
136158

137159
actions.setIsProcessing(false)
138160
}
139-
}
161+
},
162+
[config, messageQueue, actions]
163+
)
140164

141-
runAgent()
142-
}, [])
165+
// Start listening for new messages from input
166+
useEffect(() => {
167+
const unsubscribe = messageQueue.subscribe((userMessage) => {
168+
setTimeout(() => {
169+
runQuery(userMessage)
170+
}, 0)
171+
})
172+
173+
return unsubscribe
174+
}, [messageQueue, runQuery])
143175
}

src/mcp/getAgentStatus.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@ import { messageTypes, runAgentLoop } from "utils/runAgentLoop"
66
export const getAgentStatus = async (mcpServer?: McpServer) => {
77
const config = await loadConfig()
88
const messageQueue = new MessageQueue()
9+
const abortController = new AbortController()
10+
const connectedServers = new Set<string>()
911

10-
const { agentLoop } = await runAgentLoop({
11-
messageQueue,
12+
const agentLoop = runAgentLoop({
13+
abortController,
1214
config,
15+
connectedServers,
16+
messageQueue,
17+
userMessage: "status",
1318
})
1419

15-
await new Promise((resolve) => setTimeout(resolve, 0))
16-
17-
messageQueue.sendMessage("status")
18-
1920
for await (const message of agentLoop) {
2021
if (
2122
message.type === messageTypes.SYSTEM &&

src/mcp/runStandaloneAgentLoop.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,17 @@ export const runStandaloneAgentLoop = async ({
2525
const messageQueue = new MessageQueue()
2626
const streamEnabled = config.stream ?? false
2727

28-
const { agentLoop, connectedServers } = await runAgentLoop({
28+
const connectedServers = existingConnectedServers ?? new Set<string>()
29+
const abortController = new AbortController()
30+
31+
const agentLoop = runAgentLoop({
32+
abortController,
2933
additionalSystemPrompt,
3034
config,
31-
existingConnectedServers,
35+
connectedServers,
3236
messageQueue,
3337
sessionId,
38+
userMessage: prompt,
3439
onServerConnection: async (status) => {
3540
await mcpServer.sendLoggingMessage({
3641
level: "info",
@@ -42,10 +47,6 @@ export const runStandaloneAgentLoop = async ({
4247
},
4348
})
4449

45-
await new Promise((resolve) => setTimeout(resolve, 0))
46-
47-
messageQueue.sendMessage(prompt)
48-
4950
let finalResponse = ""
5051
let assistantMessage = ""
5152

src/store.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ export interface StoreModel {
112112
}
113113

114114
export const AgentStore = createContextStore<StoreModel>({
115-
abortController: new AbortController(),
115+
abortController: undefined,
116116
chatHistory: [],
117117
config: null as unknown as AgentChatConfig,
118118
currentAssistantMessage: "",
@@ -142,7 +142,9 @@ export const AgentStore = createContextStore<StoreModel>({
142142
// Actions
143143
abortRequest: action((state) => {
144144
state.abortController?.abort()
145-
state.abortController = new AbortController()
145+
state.abortController = undefined
146+
state.currentAssistantMessage = ""
147+
state.stats = "User aborted the request."
146148
state.isProcessing = false
147149
}),
148150

src/utils/MessageQueue.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,11 @@ export class MessageQueue extends EventEmitter {
3333
hasPendingRequests(): boolean {
3434
return this.listenerCount("message") > 0
3535
}
36+
37+
subscribe(callback: (message: string) => void): () => void {
38+
this.on("message", callback)
39+
return () => {
40+
this.off("message", callback)
41+
}
42+
}
3643
}

src/utils/mcpServerSelectionAgent.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { messageTypes } from "./runAgentLoop"
1212
interface SelectMcpServersOptions {
1313
abortController?: AbortController
1414
agents?: Record<string, AgentConfig>
15-
alreadyConnectedServers?: Set<string>
15+
connectedServers?: Set<string>
1616
enabledMcpServers: Record<string, any> | undefined
1717
onServerConnection?: (status: string) => void
1818
sessionId?: string
@@ -22,7 +22,7 @@ interface SelectMcpServersOptions {
2222
export const selectMcpServers = async ({
2323
abortController,
2424
agents,
25-
alreadyConnectedServers = new Set(),
25+
connectedServers = new Set(),
2626
enabledMcpServers,
2727
onServerConnection,
2828
sessionId,
@@ -38,7 +38,7 @@ export const selectMcpServers = async ({
3838

3939
log(
4040
"[mcpServerSelectionAgent] Already connected:",
41-
Array.from(alreadyConnectedServers).join(", ") || "none"
41+
Array.from(connectedServers).join(", ") || "none"
4242
)
4343

4444
const serverCapabilities = Object.entries(enabledMcpServers)
@@ -159,7 +159,7 @@ Examples:
159159
log("[mcpServerSelectionAgent] Selected MCP servers:", selectedServers)
160160

161161
const newServers = selectedServers.filter(
162-
(server) => !alreadyConnectedServers.has(server.toLowerCase())
162+
(server) => !connectedServers.has(server.toLowerCase())
163163
)
164164

165165
if (newServers.length > 0) {
@@ -172,7 +172,7 @@ Examples:
172172
}
173173

174174
const allServers = new Set([
175-
...Array.from(alreadyConnectedServers),
175+
...Array.from(connectedServers),
176176
...selectedServers,
177177
])
178178

@@ -212,7 +212,7 @@ Examples:
212212

213213
// Update the connected servers set with new servers
214214
newServers.forEach((server) => {
215-
alreadyConnectedServers.add(server.toLowerCase())
215+
connectedServers.add(server.toLowerCase())
216216
})
217217

218218
return {

0 commit comments

Comments
 (0)