@@ -155,6 +155,11 @@ async def _handle_stream_events(self, input: RunAgentInput) -> AsyncGenerator[st
155155 current_graph_state = state
156156
157157 async for event in stream :
158+ subgraphs_stream_enabled = input .forwarded_props .get ('stream_subgraphs' ) if input .forwarded_props else False
159+ is_subgraph_stream = (subgraphs_stream_enabled and (
160+ event .get ("event" , "" ).startswith ("events" ) or
161+ event .get ("event" , "" ).startswith ("values" )
162+ ))
158163 if event ["event" ] == "error" :
159164 yield self ._dispatch_event (
160165 RunErrorEvent (type = EventType .RUN_ERROR , message = event ["data" ]["message" ], raw_event = event )
@@ -323,8 +328,18 @@ async def prepare_stream(self, input: RunAgentInput, agent_state: State, config:
323328 )
324329 stream_input = {** forwarded_props , ** payload_input } if payload_input else None
325330
331+
332+ subgraphs_stream_enabled = input .forwarded_props .get ('stream_subgraphs' ) if input .forwarded_props else False
333+
334+ stream = self .graph .astream_events (
335+ stream_input ,
336+ config = config ,
337+ subgraps = bool (subgraphs_stream_enabled ),
338+ version = "v2"
339+ )
340+
326341 return {
327- "stream" : self . graph . astream_events ( stream_input , config , version = "v2" ) ,
342+ "stream" : stream ,
328343 "state" : state ,
329344 "config" : config
330345 }
@@ -349,7 +364,13 @@ async def prepare_regenerate_stream( # pylint: disable=too-many-arguments
349364 )
350365
351366 stream_input = self .langgraph_default_merge_state (time_travel_checkpoint .values , [message_checkpoint ], tools )
352- stream = self .graph .astream_events (stream_input , fork , version = "v2" )
367+ subgraphs_stream_enabled = input .forwarded_props .get ('stream_subgraphs' ) if input .forwarded_props else False
368+ stream = self .graph .astream_events (
369+ stream_input ,
370+ fork ,
371+ subgraps = bool (subgraphs_stream_enabled ),
372+ version = "v2"
373+ )
353374
354375 return {
355376 "stream" : stream ,
0 commit comments