Skip to content

Commit ee831d8

Browse files
committed
handle an AbortSignal
1 parent 4bb2ff0 commit ee831d8

File tree

3 files changed

+51
-17
lines changed

3 files changed

+51
-17
lines changed

packages/mcp-client/cli.ts

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ if (process.env.EXPERIMENTAL_HF_MCP_SERVER) {
3535
});
3636
}
3737

38-
let SAMPLE_INPUT = process.env.USE_SAMPLE_INPUT
39-
? `generate a haiku about Hugging Face and save it to a file name hf.txt on my Desktop`
40-
: undefined;
41-
4238
async function main() {
4339
if (!process.env.HF_TOKEN) {
4440
console.error(`a valid HF_TOKEN must be present in the env`);
@@ -53,14 +49,33 @@ async function main() {
5349
});
5450

5551
const rl = readline.createInterface({ input: stdin, output: stdout });
52+
let abortController = new AbortController();
53+
let waitingForInput = false;
54+
async function waitForInput() {
55+
waitingForInput = true;
56+
const input = await rl.question("> ");
57+
waitingForInput = false;
58+
return input;
59+
}
5660
rl.on("SIGINT", async () => {
57-
await agent.cleanup();
58-
stdout.write("\n");
59-
rl.close();
61+
if (waitingForInput) {
62+
// close the whole process
63+
await agent.cleanup();
64+
stdout.write("\n");
65+
rl.close();
66+
} else {
67+
// otherwise, it means a request is underway
68+
abortController.abort();
69+
abortController = new AbortController();
70+
stdout.write(ANSI.GRAY);
71+
stdout.write("Ctrl+C a second time to exit");
72+
stdout.write(ANSI.RESET);
73+
}
6074
});
61-
process.on("uncaughtException", () => {
75+
process.on("uncaughtException", (err) => {
6276
stdout.write("\n");
6377
rl.close();
78+
throw err;
6479
});
6580

6681
await agent.loadTools();
@@ -72,8 +87,8 @@ async function main() {
7287
stdout.write("\n");
7388

7489
while (true) {
75-
const input = await rl.question("> ");
76-
for await (const chunk of agent.run(input)) {
90+
const input = await waitForInput();
91+
for await (const chunk of agent.run(input, { abortSignal: abortController.signal })) {
7792
if ("choices" in chunk) {
7893
const delta = chunk.choices[0]?.delta;
7994
if (delta.content) {

packages/mcp-client/src/Agent.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ export class Agent extends McpClient {
7272
return this.addMcpServers(this.servers);
7373
}
7474

75-
async *run(input: string): AsyncGenerator<ChatCompletionStreamOutput | ChatCompletionInputMessageTool> {
75+
async *run(
76+
input: string,
77+
opts: { abortSignal?: AbortSignal } = {}
78+
): AsyncGenerator<ChatCompletionStreamOutput | ChatCompletionInputMessageTool> {
7679
this.messages.push({
7780
role: "user",
7881
content: input,
@@ -81,10 +84,18 @@ export class Agent extends McpClient {
8184
let numOfTurns = 0;
8285
let nextTurnShouldCallTools = true;
8386
while (true) {
84-
yield* this.processSingleTurnWithTools(this.messages, {
85-
exitLoopTools,
86-
exitIfFirstChunkNoTool: numOfTurns > 0 && nextTurnShouldCallTools,
87-
});
87+
try {
88+
yield* this.processSingleTurnWithTools(this.messages, {
89+
exitLoopTools,
90+
exitIfFirstChunkNoTool: numOfTurns > 0 && nextTurnShouldCallTools,
91+
abortSignal: opts.abortSignal,
92+
});
93+
} catch (err) {
94+
if (err instanceof Error && err.message === "AbortError") {
95+
return;
96+
}
97+
throw err;
98+
}
8899
numOfTurns++;
89100
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
90101
const currentLast = this.messages.at(-1)!;

packages/mcp-client/src/McpClient.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ export class McpClient {
7272

7373
async *processSingleTurnWithTools(
7474
messages: ChatCompletionInputMessage[],
75-
opts: { exitLoopTools?: ChatCompletionInputTool[]; exitIfFirstChunkNoTool?: boolean } = {}
75+
opts: {
76+
exitLoopTools?: ChatCompletionInputTool[];
77+
exitIfFirstChunkNoTool?: boolean;
78+
abortSignal?: AbortSignal;
79+
} = {}
7680
): AsyncGenerator<ChatCompletionStreamOutput | ChatCompletionInputMessageTool> {
7781
debug("start of single turn");
7882

@@ -82,6 +86,7 @@ export class McpClient {
8286
messages,
8387
tools: opts.exitLoopTools ? [...opts.exitLoopTools, ...this.availableTools] : this.availableTools,
8488
tool_choice: "auto",
89+
signal: opts.abortSignal,
8590
});
8691

8792
const message = {
@@ -92,6 +97,9 @@ export class McpClient {
9297
let numOfChunks = 0;
9398

9499
for await (const chunk of stream) {
100+
if (opts.abortSignal?.aborted) {
101+
throw new Error("AbortError");
102+
}
95103
yield chunk;
96104
debug(chunk.choices[0]);
97105
numOfChunks++;
@@ -143,7 +151,7 @@ export class McpClient {
143151
/// Get the appropriate session for this tool
144152
const client = this.clients.get(toolName);
145153
if (client) {
146-
const result = await client.callTool({ name: toolName, arguments: toolArgs });
154+
const result = await client.callTool({ name: toolName, arguments: toolArgs, signal: opts.abortSignal });
147155
toolMessage.content = (result.content as Array<{ text: string }>)[0].text;
148156
} else {
149157
toolMessage.content = `Error: No session found for tool: ${toolName}`;

0 commit comments

Comments
 (0)