Skip to content

Commit 8de9374

Browse files
committed
fix: too many abort signal listeners
1 parent 24bc67a commit 8de9374

File tree

2 files changed

+177
-163
lines changed

2 files changed

+177
-163
lines changed

src/evaluator/LlamaChatSession/LlamaChatSession.ts

Lines changed: 165 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ export class LlamaChatSession {
470470
throw new DisposedError();
471471

472472
const supportsParallelFunctionCalling = this._chat.chatWrapper.settings.functions.parallelism != null;
473-
const abortController = wrapAbortSignal(signal);
473+
const [abortController, disposeAbortController] = wrapAbortSignal(signal);
474474
let lastEvaluation = this._lastEvaluation;
475475
let newChatHistory = appendUserMessageToChatHistory(this._chatHistory, prompt);
476476
let newContextWindowChatHistory = lastEvaluation?.contextWindow == null
@@ -501,179 +501,185 @@ export class LlamaChatSession {
501501
safeEventCallback(onTextChunk)?.(resolvedResponsePrefix);
502502
}
503503

504-
while (true) {
505-
const functionCallsAndResults: Array<Promise<null | {
506-
functionCall: LlamaChatResponseFunctionCall<Functions extends ChatModelFunctions ? Functions : ChatModelFunctions>,
507-
functionDefinition: ChatSessionModelFunction<any>,
508-
functionCallResult: any
509-
}>> = [];
510-
let canThrowFunctionCallingErrors = false;
511-
let abortedOnFunctionCallError = false;
512-
513-
const initialOutputTokens = this._chat.sequence.tokenMeter.usedOutputTokens;
514-
const {
515-
lastEvaluation: currentLastEvaluation,
516-
metadata
517-
} = await this._chat.generateResponse<Functions>(newChatHistory, {
518-
functions,
519-
documentFunctionParams,
520-
maxParallelFunctionCalls,
521-
grammar: grammar as undefined, // this is a workaround to allow passing both `functions` and `grammar`
522-
onTextChunk: safeEventCallback(onTextChunk),
523-
onToken: safeEventCallback(onToken),
524-
signal: abortController.signal,
525-
stopOnAbortSignal,
526-
repeatPenalty,
527-
minP,
528-
topK,
529-
topP,
530-
seed,
531-
tokenBias,
532-
customStopTriggers,
533-
maxTokens,
534-
temperature,
535-
trimWhitespaceSuffix,
536-
contextShift: {
537-
...this._contextShift,
538-
lastEvaluationMetadata: lastEvaluation?.contextShiftMetadata
539-
},
540-
evaluationPriority,
541-
lastEvaluationContextWindow: {
542-
history: newContextWindowChatHistory,
543-
minimumOverlapPercentageToPreventContextShift: 0.5
544-
},
545-
onFunctionCall: async (functionCall) => {
546-
functionCallsAndResults.push(
547-
(async () => {
548-
try {
549-
const functionDefinition = functions?.[functionCall.functionName];
550-
551-
if (functionDefinition == null)
552-
throw new Error(
553-
`The model tried to call function "${functionCall.functionName}" which is not defined`
554-
);
555-
556-
const functionCallResult = await functionDefinition.handler(functionCall.params);
557-
558-
return {
559-
functionCall,
560-
functionDefinition,
561-
functionCallResult
562-
};
563-
} catch (err) {
564-
if (!abortController.signal.aborted) {
565-
abortedOnFunctionCallError = true;
566-
abortController.abort(err);
504+
try {
505+
while (true) {
506+
const functionCallsAndResults: Array<Promise<null | {
507+
functionCall: LlamaChatResponseFunctionCall<Functions extends ChatModelFunctions ? Functions : ChatModelFunctions>,
508+
functionDefinition: ChatSessionModelFunction<any>,
509+
functionCallResult: any
510+
}>> = [];
511+
let canThrowFunctionCallingErrors = false;
512+
let abortedOnFunctionCallError = false;
513+
514+
const initialOutputTokens = this._chat.sequence.tokenMeter.usedOutputTokens;
515+
const {
516+
lastEvaluation: currentLastEvaluation,
517+
metadata
518+
} = await this._chat.generateResponse<Functions>(newChatHistory, {
519+
functions,
520+
documentFunctionParams,
521+
maxParallelFunctionCalls,
522+
grammar: grammar as undefined, // this is a workaround to allow passing both `functions` and `grammar`
523+
onTextChunk: safeEventCallback(onTextChunk),
524+
onToken: safeEventCallback(onToken),
525+
signal: abortController.signal,
526+
stopOnAbortSignal,
527+
repeatPenalty,
528+
minP,
529+
topK,
530+
topP,
531+
seed,
532+
tokenBias,
533+
customStopTriggers,
534+
maxTokens,
535+
temperature,
536+
trimWhitespaceSuffix,
537+
contextShift: {
538+
...this._contextShift,
539+
lastEvaluationMetadata: lastEvaluation?.contextShiftMetadata
540+
},
541+
evaluationPriority,
542+
lastEvaluationContextWindow: {
543+
history: newContextWindowChatHistory,
544+
minimumOverlapPercentageToPreventContextShift: 0.5
545+
},
546+
onFunctionCall: async (functionCall) => {
547+
functionCallsAndResults.push(
548+
(async () => {
549+
try {
550+
const functionDefinition = functions?.[functionCall.functionName];
551+
552+
if (functionDefinition == null)
553+
throw new Error(
554+
`The model tried to call function "${functionCall.functionName}" which is not defined`
555+
);
556+
557+
const functionCallResult = await functionDefinition.handler(functionCall.params);
558+
559+
return {
560+
functionCall,
561+
functionDefinition,
562+
functionCallResult
563+
};
564+
} catch (err) {
565+
if (!abortController.signal.aborted) {
566+
abortedOnFunctionCallError = true;
567+
abortController.abort(err);
568+
}
569+
570+
if (canThrowFunctionCallingErrors)
571+
throw err;
572+
573+
return null;
567574
}
568-
569-
if (canThrowFunctionCallingErrors)
570-
throw err;
571-
572-
return null;
575+
})()
576+
);
577+
}
578+
});
579+
this._ensureNotDisposed();
580+
if (abortController.signal.aborted && (abortedOnFunctionCallError || !stopOnAbortSignal))
581+
throw abortController.signal.reason;
582+
583+
if (maxTokens != null)
584+
maxTokens = Math.max(0, maxTokens - (this._chat.sequence.tokenMeter.usedOutputTokens - initialOutputTokens));
585+
586+
lastEvaluation = currentLastEvaluation;
587+
newChatHistory = lastEvaluation.cleanHistory;
588+
589+
if (functionCallsAndResults.length > 0) {
590+
canThrowFunctionCallingErrors = true;
591+
const functionCallResultsPromise = Promise.all(functionCallsAndResults);
592+
const raceEventAbortController = new AbortController();
593+
await Promise.race([
594+
functionCallResultsPromise,
595+
new Promise<void>((accept, reject) => {
596+
abortController.signal.addEventListener("abort", () => {
597+
if (abortedOnFunctionCallError || !stopOnAbortSignal)
598+
reject(abortController.signal.reason);
599+
else
600+
accept();
601+
}, {signal: raceEventAbortController.signal});
602+
603+
if (abortController.signal.aborted) {
604+
if (abortedOnFunctionCallError || !stopOnAbortSignal)
605+
reject(abortController.signal.reason);
606+
else
607+
accept();
573608
}
574-
})()
575-
);
576-
}
577-
});
578-
this._ensureNotDisposed();
579-
if (abortController.signal.aborted && (abortedOnFunctionCallError || !stopOnAbortSignal))
580-
throw abortController.signal.reason;
609+
})
610+
]);
611+
raceEventAbortController.abort();
612+
this._ensureNotDisposed();
581613

582-
if (maxTokens != null)
583-
maxTokens = Math.max(0, maxTokens - (this._chat.sequence.tokenMeter.usedOutputTokens - initialOutputTokens));
584-
585-
lastEvaluation = currentLastEvaluation;
586-
newChatHistory = lastEvaluation.cleanHistory;
587-
588-
if (functionCallsAndResults.length > 0) {
589-
canThrowFunctionCallingErrors = true;
590-
const functionCallResultsPromise = Promise.all(functionCallsAndResults);
591-
await Promise.race([
592-
functionCallResultsPromise,
593-
new Promise<void>((accept, reject) => {
594-
abortController.signal.addEventListener("abort", () => {
595-
if (abortedOnFunctionCallError || !stopOnAbortSignal)
596-
reject(abortController.signal.reason);
597-
else
598-
accept();
599-
});
600-
601-
if (abortController.signal.aborted) {
602-
if (abortedOnFunctionCallError || !stopOnAbortSignal)
603-
reject(abortController.signal.reason);
604-
else
605-
accept();
614+
if (!abortController.signal.aborted) {
615+
const functionCallResults = (await functionCallResultsPromise)
616+
.filter((result): result is Exclude<typeof result, null> => result != null);
617+
this._ensureNotDisposed();
618+
619+
if (abortController.signal.aborted)
620+
throw abortController.signal.reason;
621+
622+
newContextWindowChatHistory = lastEvaluation.contextWindow;
623+
624+
let startNewChunk = supportsParallelFunctionCalling;
625+
for (const {functionCall, functionDefinition, functionCallResult} of functionCallResults) {
626+
newChatHistory = addFunctionCallToChatHistory({
627+
chatHistory: newChatHistory,
628+
functionName: functionCall.functionName,
629+
functionDescription: functionDefinition.description,
630+
callParams: functionCall.params,
631+
callResult: functionCallResult,
632+
rawCall: functionCall.raw,
633+
startsNewChunk: startNewChunk
634+
});
635+
636+
newContextWindowChatHistory = addFunctionCallToChatHistory({
637+
chatHistory: newContextWindowChatHistory,
638+
functionName: functionCall.functionName,
639+
functionDescription: functionDefinition.description,
640+
callParams: functionCall.params,
641+
callResult: functionCallResult,
642+
rawCall: functionCall.raw,
643+
startsNewChunk: startNewChunk
644+
});
645+
646+
startNewChunk = false;
606647
}
607-
})
608-
]);
609-
this._ensureNotDisposed();
610648

611-
if (!abortController.signal.aborted) {
612-
const functionCallResults = (await functionCallResultsPromise)
613-
.filter((result): result is Exclude<typeof result, null> => result != null);
614-
this._ensureNotDisposed();
649+
lastEvaluation.cleanHistory = newChatHistory;
650+
lastEvaluation.contextWindow = newContextWindowChatHistory;
615651

616-
if (abortController.signal.aborted)
617-
throw abortController.signal.reason;
618-
619-
newContextWindowChatHistory = lastEvaluation.contextWindow;
620-
621-
let startNewChunk = supportsParallelFunctionCalling;
622-
for (const {functionCall, functionDefinition, functionCallResult} of functionCallResults) {
623-
newChatHistory = addFunctionCallToChatHistory({
624-
chatHistory: newChatHistory,
625-
functionName: functionCall.functionName,
626-
functionDescription: functionDefinition.description,
627-
callParams: functionCall.params,
628-
callResult: functionCallResult,
629-
rawCall: functionCall.raw,
630-
startsNewChunk: startNewChunk
631-
});
632-
633-
newContextWindowChatHistory = addFunctionCallToChatHistory({
634-
chatHistory: newContextWindowChatHistory,
635-
functionName: functionCall.functionName,
636-
functionDescription: functionDefinition.description,
637-
callParams: functionCall.params,
638-
callResult: functionCallResult,
639-
rawCall: functionCall.raw,
640-
startsNewChunk: startNewChunk
641-
});
642-
643-
startNewChunk = false;
652+
continue;
644653
}
645-
646-
lastEvaluation.cleanHistory = newChatHistory;
647-
lastEvaluation.contextWindow = newContextWindowChatHistory;
648-
649-
continue;
650654
}
651-
}
652655

653-
this._lastEvaluation = lastEvaluation;
654-
this._chatHistory = newChatHistory;
655-
this._chatHistoryStateRef = {};
656+
this._lastEvaluation = lastEvaluation;
657+
this._chatHistory = newChatHistory;
658+
this._chatHistoryStateRef = {};
656659

657-
const lastModelResponseItem = getLastModelResponseItem(newChatHistory);
658-
const responseText = lastModelResponseItem.response
659-
.filter((item): item is string => typeof item === "string")
660-
.join("");
660+
const lastModelResponseItem = getLastModelResponseItem(newChatHistory);
661+
const responseText = lastModelResponseItem.response
662+
.filter((item): item is string => typeof item === "string")
663+
.join("");
664+
665+
if (metadata.stopReason === "customStopTrigger")
666+
return {
667+
response: lastModelResponseItem.response,
668+
responseText,
669+
stopReason: metadata.stopReason,
670+
customStopTrigger: metadata.customStopTrigger,
671+
remainingGenerationAfterStop: metadata.remainingGenerationAfterStop
672+
};
661673

662-
if (metadata.stopReason === "customStopTrigger")
663674
return {
664675
response: lastModelResponseItem.response,
665676
responseText,
666677
stopReason: metadata.stopReason,
667-
customStopTrigger: metadata.customStopTrigger,
668678
remainingGenerationAfterStop: metadata.remainingGenerationAfterStop
669679
};
670-
671-
return {
672-
response: lastModelResponseItem.response,
673-
responseText,
674-
stopReason: metadata.stopReason,
675-
remainingGenerationAfterStop: metadata.remainingGenerationAfterStop
676-
};
680+
}
681+
} finally {
682+
disposeAbortController();
677683
}
678684
});
679685
}
@@ -755,7 +761,7 @@ export class LlamaChatSession {
755761
throw new Error("The LlamaGrammar used by passed to this function was created with a different Llama instance than the one used by this sequence's model. Make sure you use the same Llama instance for both the model and the grammar.");
756762
}
757763

758-
const abortController = wrapAbortSignal(signal);
764+
const [abortController, disposeAbortController] = wrapAbortSignal(signal);
759765
this._preloadAndCompleteAbortControllers.add(abortController);
760766

761767
try {
@@ -821,6 +827,7 @@ export class LlamaChatSession {
821827
});
822828
} finally {
823829
this._preloadAndCompleteAbortControllers.delete(abortController);
830+
disposeAbortController();
824831
}
825832
}
826833

src/utils/wrapAbortSignal.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
1-
export function wrapAbortSignal(abortSignal?: AbortSignal) {
1+
export function wrapAbortSignal(abortSignal?: AbortSignal): [controller: AbortController, dispose: (() => void)] {
22
const controller = new AbortController();
33

4+
function onAbort() {
5+
controller.abort(abortSignal!.reason);
6+
}
7+
8+
function dispose() {
9+
if (abortSignal != null)
10+
abortSignal.removeEventListener("abort", onAbort);
11+
}
12+
413
if (abortSignal != null)
5-
abortSignal.addEventListener("abort", () => {
6-
controller.abort(abortSignal.reason);
7-
});
14+
abortSignal.addEventListener("abort", onAbort);
815

9-
return controller;
16+
return [controller, dispose];
1017
}

0 commit comments

Comments
 (0)