Skip to content

Commit 7b4ba33

Browse files
fix(core): clean up event listener properly (#9375)
1 parent db476e9 commit 7b4ba33

File tree

3 files changed

+46
-16
lines changed

3 files changed

+46
-16
lines changed

libs/langchain-classic/src/chains/base.ts

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,16 +106,28 @@ export abstract class BaseChain<
106106
);
107107
let outputValues: RunOutput;
108108
try {
109-
outputValues = await (fullValues.signal
110-
? (Promise.race([
111-
this._call(fullValues as RunInput, runManager, config),
112-
new Promise((_, reject) => {
113-
fullValues.signal?.addEventListener("abort", () => {
114-
reject(new Error("AbortError"));
115-
});
116-
}),
117-
]) as Promise<RunOutput>)
118-
: this._call(fullValues as RunInput, runManager, config));
109+
if (fullValues.signal) {
110+
let listener: (() => void) | undefined;
111+
outputValues = (await Promise.race([
112+
this._call(fullValues as RunInput, runManager, config),
113+
new Promise<never>((_, reject) => {
114+
listener = () => {
115+
reject(new Error("AbortError"));
116+
};
117+
fullValues.signal?.addEventListener("abort", listener);
118+
}),
119+
]).finally(() => {
120+
if (fullValues.signal && listener) {
121+
fullValues.signal.removeEventListener("abort", listener);
122+
}
123+
})) as RunOutput;
124+
} else {
125+
outputValues = await this._call(
126+
fullValues as RunInput,
127+
runManager,
128+
config
129+
);
130+
}
119131
} catch (e) {
120132
await runManager?.handleChainError(e);
121133
throw e;

libs/langchain-core/src/tools/index.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -783,10 +783,19 @@ export function tool<
783783
schema,
784784
func: async (input, runManager, config) => {
785785
return new Promise<ToolOutputT>((resolve, reject) => {
786+
let listener: (() => void) | undefined;
787+
const cleanup = () => {
788+
if (config?.signal && listener) {
789+
config.signal.removeEventListener("abort", listener);
790+
}
791+
};
792+
786793
if (config?.signal) {
787-
config.signal.addEventListener("abort", () => {
788-
return reject(getAbortSignalError(config.signal));
789-
});
794+
listener = () => {
795+
cleanup();
796+
reject(getAbortSignalError(config.signal));
797+
};
798+
config.signal.addEventListener("abort", listener);
790799
}
791800

792801
const childConfig = patchConfig(config, {
@@ -805,11 +814,14 @@ export function tool<
805814
* as the promise is already rejected.
806815
*/
807816
if (config?.signal?.aborted) {
817+
cleanup();
808818
return;
809819
}
810820

821+
cleanup();
811822
resolve(result);
812823
} catch (e) {
824+
cleanup();
813825
reject(e);
814826
}
815827
}

libs/langchain-core/src/utils/async_caller.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,20 @@ export class AsyncCaller {
140140
// Note this doesn't cancel the underlying request,
141141
// when available prefer to use the signal option of the underlying call
142142
if (options.signal) {
143+
let listener: (() => void) | undefined;
143144
return Promise.race([
144145
this.call<A, T>(callable, ...args),
145146
new Promise<never>((_, reject) => {
146-
options.signal?.addEventListener("abort", () => {
147+
listener = () => {
147148
reject(getAbortSignalError(options.signal));
148-
});
149+
};
150+
options.signal?.addEventListener("abort", listener);
149151
}),
150-
]);
152+
]).finally(() => {
153+
if (options.signal && listener) {
154+
options.signal.removeEventListener("abort", listener);
155+
}
156+
});
151157
}
152158
return this.call<A, T>(callable, ...args);
153159
}

0 commit comments

Comments
 (0)