Skip to content

Commit 5b414ef

Browse files
authored
chore: Use tasks, not microtasks, to queue scheduler execution (commontoolsinc#2115)
chore: Use tasks, not microtasks, to queue scheduler execution. Fixes infinite loop when LLM builtins error and scheduler.idle() waits for queued setTimeout callback before resolving.
1 parent c700890 commit 5b414ef

File tree

2 files changed

+41
-8
lines changed

2 files changed

+41
-8
lines changed

packages/runner/src/builtins/llm.ts

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ async function handleLLMError<T, P>(
228228
resultCell: Cell<T>,
229229
errorCell: Cell<unknown>,
230230
partialCell: Cell<P>,
231+
requestHashCell: Cell<string | undefined>,
232+
requestHash: string,
231233
getCurrentRun: () => number,
232234
thisRun: number,
233235
resetPreviousHash: () => void,
@@ -243,6 +245,7 @@ async function handleLLMError<T, P>(
243245
errorCell.withTx(tx).set(error);
244246
resultCell.withTx(tx).set(undefined as T);
245247
partialCell.withTx(tx).set(undefined as P);
248+
requestHashCell.withTx(tx).set(requestHash);
246249
});
247250

248251
resetPreviousHash();
@@ -447,6 +450,8 @@ export function llm(
447450
resultCell.key("result"),
448451
resultCell.key("error"),
449452
resultCell.key("partial"),
453+
resultCell.key("requestHash"),
454+
hash,
450455
() => currentRun,
451456
thisRun,
452457
() => {
@@ -551,10 +556,14 @@ export function generateText(
551556
const hash = refer(llmParams).toString();
552557
const currentRequestHash = requestHashWithLog.get();
553558
const currentResult = resultWithLog.get();
559+
const currentError = errorWithLog.get();
554560

555561
// Return if the same request is being made again
556-
// Only skip if we have a result - otherwise we need to (re)make the request
557-
if (currentResult !== undefined && hash === currentRequestHash) {
562+
// Also return if there's an error for this request (don't retry automatically)
563+
if (
564+
(currentResult !== undefined || currentError !== undefined) &&
565+
hash === currentRequestHash
566+
) {
558567
return;
559568
}
560569

@@ -627,6 +636,8 @@ export function generateText(
627636
resultCell.key("result"),
628637
resultCell.key("error"),
629638
resultCell.key("partial"),
639+
resultCell.key("requestHash"),
640+
hash,
630641
() => currentRun,
631642
thisRun,
632643
() => {
@@ -746,9 +757,14 @@ export function generateObject<T extends Record<string, unknown>>(
746757
const hash = refer({ ...llmParams, schema }).toString();
747758
const currentRequestHash = requestHashWithLog.get();
748759
const currentResult = resultWithLog.get();
760+
const currentError = errorWithLog.get();
749761

750762
// Return if the same request is being made again
751-
if (currentResult !== undefined && hash === currentRequestHash) {
763+
// Also return if there's an error for this request (don't retry automatically)
764+
if (
765+
(currentResult !== undefined || currentError !== undefined) &&
766+
hash === currentRequestHash
767+
) {
752768
return;
753769
}
754770

@@ -900,6 +916,8 @@ export function generateObject<T extends Record<string, unknown>>(
900916
resultCell.key("result"),
901917
resultCell.key("error"),
902918
resultCell.key("partial"),
919+
resultCell.key("requestHash"),
920+
hash,
903921
() => currentRun,
904922
thisRun,
905923
() => {
@@ -927,10 +945,14 @@ export function generateObject<T extends Record<string, unknown>>(
927945
const hash = refer(generateObjectParams).toString();
928946
const currentRequestHash = requestHashWithLog.get();
929947
const currentResult = resultWithLog.get();
948+
const currentError = errorWithLog.get();
930949

931950
// Return if the same request is being made again
932-
// Only skip if we have a result - otherwise we need to (re)make the request
933-
if (currentResult !== undefined && hash === currentRequestHash) {
951+
// Also return if there's an error for this request (don't retry automatically)
952+
if (
953+
(currentResult !== undefined || currentError !== undefined) &&
954+
hash === currentRequestHash
955+
) {
934956
return;
935957
}
936958

@@ -980,6 +1002,8 @@ export function generateObject<T extends Record<string, unknown>>(
9801002
resultCell.key("result"),
9811003
resultCell.key("error"),
9821004
resultCell.key("partial"),
1005+
resultCell.key("requestHash"),
1006+
hash,
9831007
() => currentRun,
9841008
thisRun,
9851009
() => {

packages/runner/src/scheduler.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,11 @@ export class Scheduler implements IScheduler {
306306
} // Once nothing is running, see if more work is queued up. If not, then
307307
// resolve the idle promise, otherwise add it to the idle promises list
308308
// that will be resolved once all the work is done.
309-
else if (this.pending.size === 0 && this.eventQueue.length === 0) {
309+
// IMPORTANT: Also check !this.scheduled to wait for any queued macro task execution
310+
else if (
311+
this.pending.size === 0 && this.eventQueue.length === 0 &&
312+
!this.scheduled
313+
) {
310314
resolve();
311315
} else {
312316
this.idlePromises.push(resolve);
@@ -437,7 +441,7 @@ export class Scheduler implements IScheduler {
437441

438442
private queueExecution(): void {
439443
if (this.scheduled) return;
440-
queueMicrotask(() => this.execute());
444+
queueTask(() => this.execute());
441445
this.scheduled = true;
442446
}
443447

@@ -577,7 +581,8 @@ export class Scheduler implements IScheduler {
577581
this.loopCounter = new WeakMap();
578582
this.scheduled = false;
579583
} else {
580-
queueMicrotask(() => this.execute());
584+
// Keep scheduled = true since we're queuing another execution
585+
queueTask(() => this.execute());
581586
}
582587
}
583588
}
@@ -718,3 +723,7 @@ function getCharmMetadataFromFrame(frame?: Frame): {
718723
)["/"];
719724
return result;
720725
}
726+
727+
function queueTask(fn: () => void): void {
728+
setTimeout(fn, 0);
729+
}

0 commit comments

Comments
 (0)