Skip to content

Commit 87af3e9

Browse files
consolidate
1 parent 53d42ac commit 87af3e9

File tree

15 files changed

+347
-383
lines changed

15 files changed

+347
-383
lines changed

libs/sdk-angular/src/index.ts

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import {
1414
getMessagesMetadataMap,
1515
StreamError,
1616
extractInterrupts,
17+
userFacingInterruptsFromThreadTasks,
18+
userFacingInterruptsFromValuesArray,
1719
toMessageClass,
1820
ensureMessageInstances,
1921
ensureHistoryMessageInstances,
@@ -49,10 +51,7 @@ import {
4951
type DefaultToolCall,
5052
} from "@langchain/langgraph-sdk";
5153
import { getToolCallsWithResults } from "@langchain/langgraph-sdk/utils";
52-
import {
53-
isHeadlessToolInterrupt,
54-
handleHeadlessToolInterrupt,
55-
} from "@langchain/langgraph-sdk";
54+
import { flushPendingHeadlessToolInterrupts } from "@langchain/langgraph-sdk";
5655
import { useStreamCustom } from "./stream.custom.js";
5756

5857
export { FetchStreamTransport } from "@langchain/langgraph-sdk/ui";
@@ -788,18 +787,21 @@ export function useStreamLGP<
788787
"__interrupt__" in vals &&
789788
Array.isArray(vals.__interrupt__)
790789
) {
791-
const valueInterrupts = vals.__interrupt__;
792-
if (valueInterrupts.length === 0) return [{ when: "breakpoint" }];
793-
return valueInterrupts;
790+
return userFacingInterruptsFromValuesArray<InterruptType>(
791+
vals.__interrupt__ as Interrupt<InterruptType>[],
792+
);
794793
}
795794

796795
if (isLoading()) return [];
797796

798797
const allTasks = branchContext().threadHead?.tasks ?? [];
799798
const allInterrupts = allTasks.flatMap((t) => t.interrupts ?? []);
800799

801-
if (allInterrupts.length > 0) {
802-
return allInterrupts as Interrupt<InterruptType>[];
800+
const fromTasks = userFacingInterruptsFromThreadTasks<InterruptType>(
801+
allInterrupts as Interrupt<InterruptType>[],
802+
);
803+
if (fromTasks !== null) {
804+
return fromTasks;
803805
}
804806

805807
const next = branchContext().threadHead?.next ?? [];
@@ -864,37 +866,15 @@ export function useStreamLGP<
864866
handledToolsLGP.clear();
865867
}
866868

867-
const { tools, onTool } = options;
868-
if (!tools?.length) return;
869-
870-
const interrupts = vals?.__interrupt__;
871-
if (!Array.isArray(interrupts) || interrupts.length === 0) return;
872-
873-
for (const interrupt of interrupts) {
874-
if (!isHeadlessToolInterrupt(interrupt.value)) continue;
875-
876-
const interruptId = interrupt.id ?? interrupt.value.toolCall.id ?? "";
877-
if (handledToolsLGP.has(interruptId)) continue;
878-
handledToolsLGP.add(interruptId);
879-
880-
void Promise.resolve().then(() =>
881-
handleHeadlessToolInterrupt(interrupt.value, tools, onTool).then(
882-
(result) => {
883-
void submit(null as unknown as StateType, {
884-
// interrupt ensures the resume bypasses the LGP queue and calls
885-
// submitDirect directly, even if isLoading is still true when
886-
// the browser tool interrupt fires.
887-
multitaskStrategy: "interrupt",
888-
command: {
889-
resume: result.toolCallId
890-
? { [result.toolCallId]: result.value }
891-
: result.value,
892-
},
893-
});
894-
},
895-
),
896-
);
897-
}
869+
flushPendingHeadlessToolInterrupts(vals, options.tools, handledToolsLGP, {
870+
onTool: options.onTool,
871+
defer: (run) => void Promise.resolve().then(run),
872+
resumeSubmit: (command) =>
873+
void submit(null as unknown as StateType, {
874+
multitaskStrategy: "interrupt",
875+
command,
876+
}),
877+
});
898878
});
899879

900880
return {

libs/sdk-angular/src/stream.custom.ts

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
StreamManager,
44
MessageTupleManager,
55
extractInterrupts,
6+
userFacingInterruptsFromValuesArray,
67
toMessageClass,
78
ensureMessageInstances,
89
type EventStreamEvent,
@@ -21,9 +22,8 @@ import type {
2122
Message,
2223
Interrupt,
2324
ThreadState,
24-
isHeadlessToolInterrupt,
25-
handleHeadlessToolInterrupt,
2625
} from "@langchain/langgraph-sdk";
26+
import { flushPendingHeadlessToolInterrupts } from "@langchain/langgraph-sdk";
2727

2828
function createCustomTransportThreadState<
2929
StateType extends Record<string, unknown>,
@@ -221,39 +221,24 @@ export function useStreamCustom<
221221
effect(() => {
222222
const vals = streamValues();
223223

224-
// Reset dedup set when thread changes
225224
if (options.threadId !== lastThreadId) {
226225
lastThreadId = options.threadId;
227226
handledBrowserTools.clear();
228227
}
229228

230-
const { tools, onTool } = options;
231-
if (!tools?.length) return;
232-
233-
const interrupts = vals?.__interrupt__;
234-
if (!Array.isArray(interrupts) || interrupts.length === 0) return;
235-
236-
for (const interrupt of interrupts) {
237-
if (!isHeadlessToolInterrupt(interrupt.value)) continue;
238-
239-
const interruptId = interrupt.id ?? interrupt.value.toolCall.id ?? "";
240-
if (handledBrowserTools.has(interruptId)) continue;
241-
handledBrowserTools.add(interruptId);
242-
243-
void Promise.resolve().then(() =>
244-
handleHeadlessToolInterrupt(interrupt.value, tools, onTool).then(
245-
(result) => {
246-
void submit(null, {
247-
command: {
248-
resume: result.toolCallId
249-
? { [result.toolCallId]: result.value }
250-
: result.value,
251-
},
252-
});
253-
},
254-
),
255-
);
256-
}
229+
flushPendingHeadlessToolInterrupts(
230+
vals,
231+
options.tools,
232+
handledBrowserTools,
233+
{
234+
onTool: options.onTool,
235+
defer: (run) => void Promise.resolve().then(run),
236+
resumeSubmit: (command) =>
237+
void submit(null, {
238+
command,
239+
}),
240+
},
241+
);
257242
});
258243

259244
const values = computed(() => streamValues() ?? ({} as StateType));
@@ -308,9 +293,9 @@ export function useStreamCustom<
308293
"__interrupt__" in vals &&
309294
Array.isArray(vals.__interrupt__)
310295
) {
311-
const valueInterrupts = vals.__interrupt__;
312-
if (valueInterrupts.length === 0) return [{ when: "breakpoint" }];
313-
return valueInterrupts;
296+
return userFacingInterruptsFromValuesArray<InterruptType>(
297+
vals.__interrupt__ as Interrupt<InterruptType>[],
298+
);
314299
}
315300

316301
return [];

libs/sdk-react/src/stream.custom.tsx

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
StreamManager,
1414
MessageTupleManager,
1515
extractInterrupts,
16+
userFacingInterruptsFromValuesArray,
1617
FetchStreamTransport,
1718
toMessageClass,
1819
ensureMessageInstances,
@@ -27,15 +28,14 @@ import {
2728
type MessageMetadata,
2829
} from "@langchain/langgraph-sdk/ui";
2930
import { getToolCallsWithResults } from "@langchain/langgraph-sdk/utils";
30-
import type { BaseMessage } from "@langchain/core/messages";
3131
import type {
3232
BagTemplate,
3333
Message,
3434
Interrupt,
3535
ThreadState,
36-
isHeadlessToolInterrupt,
37-
handleHeadlessToolInterrupt,
3836
} from "@langchain/langgraph-sdk";
37+
import { flushPendingHeadlessToolInterrupts } from "@langchain/langgraph-sdk";
38+
import type { BaseMessage } from "@langchain/core/messages";
3939
import { useControllableThreadId } from "./thread.js";
4040
import type { UseStreamCustom } from "./types.js";
4141

@@ -248,34 +248,18 @@ export function useStreamCustom<
248248
}, [threadId]);
249249

250250
useEffect(() => {
251-
const tools = toolsRef.current;
252-
if (!tools?.length) return;
253-
if (!stream.values) return;
254-
255-
const interrupts = stream.values.__interrupt__;
256-
if (!Array.isArray(interrupts) || interrupts.length === 0) return;
257-
258-
for (const interrupt of interrupts) {
259-
if (!isHeadlessToolInterrupt(interrupt.value)) continue;
260-
261-
const interruptId = interrupt.id ?? interrupt.value.toolCall.id ?? "";
262-
if (handledBrowserToolsRef.current.has(interruptId)) continue;
263-
handledBrowserToolsRef.current.add(interruptId);
264-
265-
void handleHeadlessToolInterrupt(
266-
interrupt.value,
267-
tools,
268-
onToolRef.current,
269-
).then((result) => {
270-
void submit(null, {
271-
command: {
272-
resume: result.toolCallId
273-
? { [result.toolCallId]: result.value }
274-
: result.value,
275-
},
276-
});
277-
});
278-
}
251+
flushPendingHeadlessToolInterrupts(
252+
stream.values,
253+
toolsRef.current,
254+
handledBrowserToolsRef.current,
255+
{
256+
onTool: onToolRef.current,
257+
resumeSubmit: (command) =>
258+
void submit(null, {
259+
command,
260+
}),
261+
},
262+
);
279263
}, [stream.values, submit]);
280264

281265
return {
@@ -316,9 +300,9 @@ export function useStreamCustom<
316300
"__interrupt__" in stream.values &&
317301
Array.isArray(stream.values.__interrupt__)
318302
) {
319-
const valueInterrupts = stream.values.__interrupt__;
320-
if (valueInterrupts.length === 0) return [{ when: "breakpoint" }];
321-
return valueInterrupts;
303+
return userFacingInterruptsFromValuesArray<InterruptType>(
304+
stream.values.__interrupt__ as Interrupt<InterruptType>[],
305+
);
322306
}
323307

324308
return [];

libs/sdk-react/src/stream.lgp.tsx

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import {
3232
StreamManager,
3333
MessageTupleManager,
3434
extractInterrupts,
35+
userFacingInterruptsFromThreadTasks,
36+
userFacingInterruptsFromValuesArray,
3537
toMessageClass,
3638
ensureMessageInstances,
3739
ensureHistoryMessageInstances,
@@ -49,10 +51,7 @@ import {
4951
type UseStreamThread,
5052
} from "@langchain/langgraph-sdk/ui";
5153
import { getToolCallsWithResults } from "@langchain/langgraph-sdk/utils";
52-
import {
53-
isHeadlessToolInterrupt,
54-
handleHeadlessToolInterrupt,
55-
} from "@langchain/langgraph-sdk";
54+
import { flushPendingHeadlessToolInterrupts } from "@langchain/langgraph-sdk";
5655
import { useControllableThreadId } from "./thread.js";
5756
import type { UseStream, SubmitOptions } from "./types.js";
5857

@@ -857,33 +856,18 @@ export function useStreamLGP<
857856
}, [threadId]);
858857

859858
useEffect(() => {
860-
const tools = toolsRef.current;
861-
if (!tools?.length) return;
862-
863-
const interrupts = values?.__interrupt__;
864-
if (!Array.isArray(interrupts) || interrupts.length === 0) return;
865-
866-
for (const interrupt of interrupts) {
867-
if (!isHeadlessToolInterrupt(interrupt.value)) continue;
868-
869-
const interruptId = interrupt.id ?? interrupt.value.toolCall.id ?? "";
870-
if (handledBrowserToolsRef.current.has(interruptId)) continue;
871-
handledBrowserToolsRef.current.add(interruptId);
872-
873-
void handleHeadlessToolInterrupt(
874-
interrupt.value,
875-
tools,
876-
onToolRef.current,
877-
).then((result) => {
878-
void submit(null, {
879-
command: {
880-
resume: result.toolCallId
881-
? { [result.toolCallId]: result.value }
882-
: result.value,
883-
},
884-
});
885-
});
886-
}
859+
flushPendingHeadlessToolInterrupts(
860+
values,
861+
toolsRef.current,
862+
handledBrowserToolsRef.current,
863+
{
864+
onTool: onToolRef.current,
865+
resumeSubmit: (command) =>
866+
void submit(null, {
867+
command,
868+
}),
869+
},
870+
);
887871
}, [values, submit]);
888872

889873
return {
@@ -938,9 +922,9 @@ export function useStreamLGP<
938922
"__interrupt__" in values &&
939923
Array.isArray(values.__interrupt__)
940924
) {
941-
const valueInterrupts = values.__interrupt__;
942-
if (valueInterrupts.length === 0) return [{ when: "breakpoint" }];
943-
return valueInterrupts;
925+
return userFacingInterruptsFromValuesArray<InterruptType>(
926+
values.__interrupt__ as Interrupt<InterruptType>[],
927+
);
944928
}
945929

946930
// If we're deferring to old interrupt detection logic, don't show the interrupt if the stream is loading
@@ -950,8 +934,11 @@ export function useStreamLGP<
950934
const allTasks = branchContext.threadHead?.tasks ?? [];
951935
const allInterrupts = allTasks.flatMap((t) => t.interrupts ?? []);
952936

953-
if (allInterrupts.length > 0) {
954-
return allInterrupts as Interrupt<InterruptType>[];
937+
const fromTasks = userFacingInterruptsFromThreadTasks<InterruptType>(
938+
allInterrupts as Interrupt<InterruptType>[],
939+
);
940+
if (fromTasks !== null) {
941+
return fromTasks;
955942
}
956943

957944
// check if there's a next task present (breakpoint-style interrupt)

0 commit comments

Comments
 (0)