Skip to content

Commit eda9b47

Browse files
consolidate
1 parent 253d995 commit eda9b47

File tree

16 files changed

+1350
-1371
lines changed

16 files changed

+1350
-1371
lines changed

libs/sdk-angular/src/index.ts

Lines changed: 25 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import {
1414
getMessagesMetadataMap,
1515
StreamError,
1616
extractInterrupts,
17+
userFacingInterruptsFromThreadTasks,
18+
userFacingInterruptsFromValuesArray,
1719
toMessageClass,
1820
ensureMessageInstances,
1921
ensureHistoryMessageInstances,
@@ -49,10 +51,7 @@ import {
4951
type DefaultToolCall,
5052
} from "@langchain/langgraph-sdk";
5153
import { getToolCallsWithResults } from "@langchain/langgraph-sdk/utils";
52-
import {
53-
isHeadlessToolInterrupt,
54-
handleHeadlessToolInterrupt,
55-
} from "@langchain/langgraph-sdk";
54+
import { flushPendingHeadlessToolInterrupts } from "@langchain/langgraph-sdk";
5655
import { useStreamCustom } from "./stream.custom.js";
5756

5857
export { FetchStreamTransport } from "@langchain/langgraph-sdk/ui";
@@ -788,18 +787,21 @@ export function useStreamLGP<
788787
"__interrupt__" in vals &&
789788
Array.isArray(vals.__interrupt__)
790789
) {
791-
const valueInterrupts = vals.__interrupt__;
792-
if (valueInterrupts.length === 0) return [{ when: "breakpoint" }];
793-
return valueInterrupts;
790+
return userFacingInterruptsFromValuesArray<InterruptType>(
791+
vals.__interrupt__ as Interrupt<InterruptType>[],
792+
);
794793
}
795794

796795
if (isLoading()) return [];
797796

798797
const allTasks = branchContext().threadHead?.tasks ?? [];
799798
const allInterrupts = allTasks.flatMap((t) => t.interrupts ?? []);
800799

801-
if (allInterrupts.length > 0) {
802-
return allInterrupts as Interrupt<InterruptType>[];
800+
const fromTasks = userFacingInterruptsFromThreadTasks<InterruptType>(
801+
allInterrupts as Interrupt<InterruptType>[],
802+
);
803+
if (fromTasks !== null) {
804+
return fromTasks;
803805
}
804806

805807
const next = branchContext().threadHead?.next ?? [];
@@ -864,37 +866,20 @@ export function useStreamLGP<
864866
handledToolsLGP.clear();
865867
}
866868

867-
const { tools, onTool } = options;
868-
if (!tools?.length) return;
869-
870-
const interrupts = vals?.__interrupt__;
871-
if (!Array.isArray(interrupts) || interrupts.length === 0) return;
872-
873-
for (const interrupt of interrupts) {
874-
if (!isHeadlessToolInterrupt(interrupt.value)) continue;
875-
876-
const interruptId = interrupt.id ?? interrupt.value.toolCall.id ?? "";
877-
if (handledToolsLGP.has(interruptId)) continue;
878-
handledToolsLGP.add(interruptId);
879-
880-
void Promise.resolve().then(() =>
881-
handleHeadlessToolInterrupt(interrupt.value, tools, onTool).then(
882-
(result) => {
883-
void submit(null as unknown as StateType, {
884-
// interrupt ensures the resume bypasses the LGP queue and calls
885-
// submitDirect directly, even if isLoading is still true when
886-
// the browser tool interrupt fires.
887-
multitaskStrategy: "interrupt",
888-
command: {
889-
resume: result.toolCallId
890-
? { [result.toolCallId]: result.value }
891-
: result.value,
892-
},
893-
});
894-
},
895-
),
896-
);
897-
}
869+
flushPendingHeadlessToolInterrupts(
870+
vals,
871+
options.tools,
872+
handledToolsLGP,
873+
{
874+
onTool: options.onTool,
875+
defer: (run) => void Promise.resolve().then(run),
876+
resumeSubmit: (command) =>
877+
void submit(null as unknown as StateType, {
878+
multitaskStrategy: "interrupt",
879+
command,
880+
}),
881+
},
882+
);
898883
});
899884

900885
return {

libs/sdk-angular/src/stream.custom.ts

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
StreamManager,
44
MessageTupleManager,
55
extractInterrupts,
6+
userFacingInterruptsFromValuesArray,
67
toMessageClass,
78
ensureMessageInstances,
89
type EventStreamEvent,
@@ -17,10 +18,7 @@ import {
1718
} from "@langchain/langgraph-sdk/ui";
1819
import { getToolCallsWithResults } from "@langchain/langgraph-sdk/utils";
1920
import type { BagTemplate, Message, Interrupt } from "@langchain/langgraph-sdk";
20-
import {
21-
isHeadlessToolInterrupt,
22-
handleHeadlessToolInterrupt,
23-
} from "@langchain/langgraph-sdk";
21+
import { flushPendingHeadlessToolInterrupts } from "@langchain/langgraph-sdk";
2422

2523
export function useStreamCustom<
2624
StateType extends Record<string, unknown> = Record<string, unknown>,
@@ -189,39 +187,24 @@ export function useStreamCustom<
189187
effect(() => {
190188
const vals = streamValues();
191189

192-
// Reset dedup set when thread changes
193190
if (options.threadId !== lastThreadId) {
194191
lastThreadId = options.threadId;
195192
handledBrowserTools.clear();
196193
}
197194

198-
const { tools, onTool } = options;
199-
if (!tools?.length) return;
200-
201-
const interrupts = vals?.__interrupt__;
202-
if (!Array.isArray(interrupts) || interrupts.length === 0) return;
203-
204-
for (const interrupt of interrupts) {
205-
if (!isHeadlessToolInterrupt(interrupt.value)) continue;
206-
207-
const interruptId = interrupt.id ?? interrupt.value.toolCall.id ?? "";
208-
if (handledBrowserTools.has(interruptId)) continue;
209-
handledBrowserTools.add(interruptId);
210-
211-
void Promise.resolve().then(() =>
212-
handleHeadlessToolInterrupt(interrupt.value, tools, onTool).then(
213-
(result) => {
214-
void submit(null, {
215-
command: {
216-
resume: result.toolCallId
217-
? { [result.toolCallId]: result.value }
218-
: result.value,
219-
},
220-
});
221-
},
222-
),
223-
);
224-
}
195+
flushPendingHeadlessToolInterrupts(
196+
vals,
197+
options.tools,
198+
handledBrowserTools,
199+
{
200+
onTool: options.onTool,
201+
defer: (run) => void Promise.resolve().then(run),
202+
resumeSubmit: (command) =>
203+
void submit(null, {
204+
command,
205+
}),
206+
},
207+
);
225208
});
226209

227210
const values = computed(() => streamValues() ?? ({} as StateType));
@@ -276,9 +259,9 @@ export function useStreamCustom<
276259
"__interrupt__" in vals &&
277260
Array.isArray(vals.__interrupt__)
278261
) {
279-
const valueInterrupts = vals.__interrupt__;
280-
if (valueInterrupts.length === 0) return [{ when: "breakpoint" }];
281-
return valueInterrupts;
262+
return userFacingInterruptsFromValuesArray<InterruptType>(
263+
vals.__interrupt__ as Interrupt<InterruptType>[],
264+
);
282265
}
283266

284267
return [];

libs/sdk-react/src/stream.custom.tsx

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
StreamManager,
1414
MessageTupleManager,
1515
extractInterrupts,
16+
userFacingInterruptsFromValuesArray,
1617
FetchStreamTransport,
1718
toMessageClass,
1819
ensureMessageInstances,
@@ -28,10 +29,7 @@ import {
2829
} from "@langchain/langgraph-sdk/ui";
2930
import { getToolCallsWithResults } from "@langchain/langgraph-sdk/utils";
3031
import type { BagTemplate, Message, Interrupt } from "@langchain/langgraph-sdk";
31-
import {
32-
isHeadlessToolInterrupt,
33-
handleHeadlessToolInterrupt,
34-
} from "@langchain/langgraph-sdk";
32+
import { flushPendingHeadlessToolInterrupts } from "@langchain/langgraph-sdk";
3533
import type { BaseMessage } from "@langchain/core/messages";
3634
import { useControllableThreadId } from "./thread.js";
3735
import type { UseStreamCustom } from "./types.js";
@@ -216,34 +214,18 @@ export function useStreamCustom<
216214
}, [threadId]);
217215

218216
useEffect(() => {
219-
const tools = toolsRef.current;
220-
if (!tools?.length) return;
221-
if (!stream.values) return;
222-
223-
const interrupts = stream.values.__interrupt__;
224-
if (!Array.isArray(interrupts) || interrupts.length === 0) return;
225-
226-
for (const interrupt of interrupts) {
227-
if (!isHeadlessToolInterrupt(interrupt.value)) continue;
228-
229-
const interruptId = interrupt.id ?? interrupt.value.toolCall.id ?? "";
230-
if (handledBrowserToolsRef.current.has(interruptId)) continue;
231-
handledBrowserToolsRef.current.add(interruptId);
232-
233-
void handleHeadlessToolInterrupt(
234-
interrupt.value,
235-
tools,
236-
onToolRef.current,
237-
).then((result) => {
238-
void submit(null, {
239-
command: {
240-
resume: result.toolCallId
241-
? { [result.toolCallId]: result.value }
242-
: result.value,
243-
},
244-
});
245-
});
246-
}
217+
flushPendingHeadlessToolInterrupts(
218+
stream.values,
219+
toolsRef.current,
220+
handledBrowserToolsRef.current,
221+
{
222+
onTool: onToolRef.current,
223+
resumeSubmit: (command) =>
224+
void submit(null, {
225+
command,
226+
}),
227+
},
228+
);
247229
}, [stream.values, submit]);
248230

249231
return {
@@ -284,9 +266,9 @@ export function useStreamCustom<
284266
"__interrupt__" in stream.values &&
285267
Array.isArray(stream.values.__interrupt__)
286268
) {
287-
const valueInterrupts = stream.values.__interrupt__;
288-
if (valueInterrupts.length === 0) return [{ when: "breakpoint" }];
289-
return valueInterrupts;
269+
return userFacingInterruptsFromValuesArray<InterruptType>(
270+
stream.values.__interrupt__ as Interrupt<InterruptType>[],
271+
);
290272
}
291273

292274
return [];

libs/sdk-react/src/stream.lgp.tsx

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import {
3232
StreamManager,
3333
MessageTupleManager,
3434
extractInterrupts,
35+
userFacingInterruptsFromThreadTasks,
36+
userFacingInterruptsFromValuesArray,
3537
toMessageClass,
3638
ensureMessageInstances,
3739
ensureHistoryMessageInstances,
@@ -49,10 +51,7 @@ import {
4951
type UseStreamThread,
5052
} from "@langchain/langgraph-sdk/ui";
5153
import { getToolCallsWithResults } from "@langchain/langgraph-sdk/utils";
52-
import {
53-
isHeadlessToolInterrupt,
54-
handleHeadlessToolInterrupt,
55-
} from "@langchain/langgraph-sdk";
54+
import { flushPendingHeadlessToolInterrupts } from "@langchain/langgraph-sdk";
5655
import { useControllableThreadId } from "./thread.js";
5756
import type { UseStream, SubmitOptions } from "./types.js";
5857

@@ -857,33 +856,18 @@ export function useStreamLGP<
857856
}, [threadId]);
858857

859858
useEffect(() => {
860-
const tools = toolsRef.current;
861-
if (!tools?.length) return;
862-
863-
const interrupts = values?.__interrupt__;
864-
if (!Array.isArray(interrupts) || interrupts.length === 0) return;
865-
866-
for (const interrupt of interrupts) {
867-
if (!isHeadlessToolInterrupt(interrupt.value)) continue;
868-
869-
const interruptId = interrupt.id ?? interrupt.value.toolCall.id ?? "";
870-
if (handledBrowserToolsRef.current.has(interruptId)) continue;
871-
handledBrowserToolsRef.current.add(interruptId);
872-
873-
void handleHeadlessToolInterrupt(
874-
interrupt.value,
875-
tools,
876-
onToolRef.current,
877-
).then((result) => {
878-
void submit(null, {
879-
command: {
880-
resume: result.toolCallId
881-
? { [result.toolCallId]: result.value }
882-
: result.value,
883-
},
884-
});
885-
});
886-
}
859+
flushPendingHeadlessToolInterrupts(
860+
values,
861+
toolsRef.current,
862+
handledBrowserToolsRef.current,
863+
{
864+
onTool: onToolRef.current,
865+
resumeSubmit: (command) =>
866+
void submit(null, {
867+
command,
868+
}),
869+
},
870+
);
887871
}, [values, submit]);
888872

889873
return {
@@ -938,9 +922,9 @@ export function useStreamLGP<
938922
"__interrupt__" in values &&
939923
Array.isArray(values.__interrupt__)
940924
) {
941-
const valueInterrupts = values.__interrupt__;
942-
if (valueInterrupts.length === 0) return [{ when: "breakpoint" }];
943-
return valueInterrupts;
925+
return userFacingInterruptsFromValuesArray<InterruptType>(
926+
values.__interrupt__ as Interrupt<InterruptType>[],
927+
);
944928
}
945929

946930
// If we're deferring to old interrupt detection logic, don't show the interrupt if the stream is loading
@@ -950,8 +934,11 @@ export function useStreamLGP<
950934
const allTasks = branchContext.threadHead?.tasks ?? [];
951935
const allInterrupts = allTasks.flatMap((t) => t.interrupts ?? []);
952936

953-
if (allInterrupts.length > 0) {
954-
return allInterrupts as Interrupt<InterruptType>[];
937+
const fromTasks = userFacingInterruptsFromThreadTasks<InterruptType>(
938+
allInterrupts as Interrupt<InterruptType>[],
939+
);
940+
if (fromTasks !== null) {
941+
return fromTasks;
955942
}
956943

957944
// check if there's a next task present (breakpoint-style interrupt)

0 commit comments

Comments
 (0)