Skip to content

Commit 7d05faa

Browse files
committed
feat: add subgraphs support in langgraph integrations
1 parent 2c07161 commit 7d05faa

File tree

2 files changed

+44
-9
lines changed

2 files changed

+44
-9
lines changed

typescript-sdk/integrations/langgraph/python/ag_ui_langgraph/agent.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ async def _handle_stream_events(self, input: RunAgentInput) -> AsyncGenerator[st
155155
current_graph_state = state
156156

157157
async for event in stream:
158+
subgraphs_stream_enabled = input.forwarded_props.get('stream_subgraphs') if input.forwarded_props else False
159+
is_subgraph_stream = (subgraphs_stream_enabled and (
160+
event.get("event", "").startswith("events") or
161+
event.get("event", "").startswith("values")
162+
))
158163
if event["event"] == "error":
159164
yield self._dispatch_event(
160165
RunErrorEvent(type=EventType.RUN_ERROR, message=event["data"]["message"], raw_event=event)
@@ -323,8 +328,18 @@ async def prepare_stream(self, input: RunAgentInput, agent_state: State, config:
323328
)
324329
stream_input = {**forwarded_props, **payload_input} if payload_input else None
325330

331+
332+
subgraphs_stream_enabled = input.forwarded_props.get('stream_subgraphs') if input.forwarded_props else False
333+
334+
stream = self.graph.astream_events(
335+
stream_input,
336+
config=config,
337+
subgraps=bool(subgraphs_stream_enabled),
338+
version="v2"
339+
)
340+
326341
return {
327-
"stream": self.graph.astream_events(stream_input, config, version="v2"),
342+
"stream": stream,
328343
"state": state,
329344
"config": config
330345
}
@@ -349,7 +364,13 @@ async def prepare_regenerate_stream( # pylint: disable=too-many-arguments
349364
)
350365

351366
stream_input = self.langgraph_default_merge_state(time_travel_checkpoint.values, [message_checkpoint], tools)
352-
stream = self.graph.astream_events(stream_input, fork, version="v2")
367+
subgraphs_stream_enabled = input.forwarded_props.get('stream_subgraphs') if input.forwarded_props else False
368+
stream = self.graph.astream_events(
369+
stream_input,
370+
fork,
371+
subgraps=bool(subgraphs_stream_enabled),
372+
version="v2"
373+
)
353374

354375
return {
355376
"stream": stream,

typescript-sdk/integrations/langgraph/src/agent.ts

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,15 @@ export class LangGraphAgent extends AbstractAgent {
194194
}
195195
);
196196

197+
const payload = {
198+
...(input.forwardedProps ?? {}),
199+
input: this.langGraphDefaultMergeState(timeTravelCheckpoint.values, [messageCheckpoint], tools),
200+
// @ts-ignore
201+
checkpointId: fork.checkpoint.checkpoint_id!,
202+
streamMode,
203+
};
197204
return {
198-
streamResponse: this.client.runs.stream(threadId, this.assistant.assistant_id, {
199-
input: this.langGraphDefaultMergeState(timeTravelCheckpoint.values, [messageCheckpoint], tools),
200-
// @ts-ignore
201-
checkpointId: fork.checkpoint.checkpoint_id!,
202-
streamMode,
203-
}),
205+
streamResponse: this.client.runs.stream(threadId, this.assistant.assistant_id, payload),
204206
state: timeTravelCheckpoint as ThreadState<State>,
205207
streamMode,
206208
};
@@ -360,8 +362,14 @@ export class LangGraphAgent extends AbstractAgent {
360362
}
361363

362364
for await (let streamResponseChunk of streamResponse) {
365+
const subgraphsStreamEnabled = input.forwardedProps?.streamSubgraphs
366+
const isSubgraphStream = (subgraphsStreamEnabled && (
367+
streamResponseChunk.event.startsWith("events") ||
368+
streamResponseChunk.event.startsWith("values")
369+
))
370+
363371
// @ts-ignore
364-
if (!streamMode.includes(streamResponseChunk.event as StreamMode)) {
372+
if (!streamMode.includes(streamResponseChunk.event as StreamMode) && !isSubgraphStream) {
365373
continue;
366374
}
367375

@@ -391,6 +399,12 @@ export class LangGraphAgent extends AbstractAgent {
391399
if (streamResponseChunk.event === "values") {
392400
latestStateValues = chunk.data;
393401
continue;
402+
} else if (subgraphsStreamEnabled && chunk.event.startsWith("values|")) {
403+
latestStateValues = {
404+
...latestStateValues,
405+
...chunk.data,
406+
};
407+
continue;
394408
}
395409

396410
const chunkData = chunk.data;

0 commit comments

Comments
 (0)