Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ async def _handle_stream_events(self, input: RunAgentInput) -> AsyncGenerator[st
"thread_id": thread_id,
"thinking_process": None,
"node_name": None,
"has_function_streaming": False,
}
self.active_run = INITIAL_ACTIVE_RUN

Expand Down Expand Up @@ -485,6 +486,9 @@ async def _handle_single_event(self, event: Any, state: State) -> AsyncGenerator
is_tool_call_args_event = has_current_stream and current_stream.get("tool_call_id") and tool_call_data and tool_call_data.get("args")
is_tool_call_end_event = has_current_stream and current_stream.get("tool_call_id") and not tool_call_data

if is_tool_call_start_event or is_tool_call_end_event or is_tool_call_args_event:
self.active_run["has_function_streaming"] = True

reasoning_data = resolve_reasoning_content(event["data"]["chunk"]) if event["data"]["chunk"] else None
message_content = resolve_message_content(event["data"]["chunk"].content) if event["data"]["chunk"] and event["data"]["chunk"].content else None
is_message_content_event = tool_call_data is None and message_content
Expand Down Expand Up @@ -649,6 +653,35 @@ async def _handle_single_event(self, event: Any, state: State) -> AsyncGenerator
CustomEvent(type=EventType.CUSTOM, name=event["name"], value=event["data"], raw_event=event)
)

elif event_type == LangGraphEventTypes.OnToolEnd:
if self.active_run["has_function_streaming"]:
return
tool_call_output = event["data"]["output"]
yield self._dispatch_event(
ToolCallStartEvent(
type=EventType.TOOL_CALL_START,
tool_call_id=tool_call_output.tool_call_id,
tool_call_name=tool_call_output.name,
parent_message_id=tool_call_output.id,
raw_event=event,
)
)
yield self._dispatch_event(
ToolCallArgsEvent(
type=EventType.TOOL_CALL_ARGS,
tool_call_id=tool_call_output.tool_call_id,
delta=json.dumps(event["data"]["input"]),
raw_event=event
)
)
yield self._dispatch_event(
ToolCallEndEvent(
type=EventType.TOOL_CALL_END,
tool_call_id=tool_call_output.tool_call_id,
raw_event=event
)
)

def handle_thinking_event(self, reasoning_data: LangGraphReasoning) -> Generator[str, Any, str | None]:
if not reasoning_data or "type" not in reasoning_data or "text" not in reasoning_data:
return ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class CustomEventNames(str, Enum):
"exiting_node": NotRequired[bool],
"manually_emitted_state": NotRequired[Optional[State]],
"thread_id": NotRequired[Optional[ThinkingProcess]],
"thinking_process": NotRequired[Optional[str]]
"thinking_process": NotRequired[Optional[str]],
"has_function_streaming": NotRequired[bool],
})

MessagesInProgressRecord = Dict[str, Optional[MessageInProgress]]
Expand Down
26 changes: 26 additions & 0 deletions typescript-sdk/integrations/langgraph/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export class LangGraphAgent extends AbstractAgent {
this.activeRun = {
id: input.runId,
threadId: input.threadId,
hasFunctionStreaming: false,
};
this.subscriber = subscriber;
if (!this.assistant) {
Expand Down Expand Up @@ -571,6 +572,10 @@ export class LangGraphAgent extends AbstractAgent {
hasCurrentStream && currentStream?.toolCallId && toolCallData?.args;
const isToolCallEndEvent = hasCurrentStream && currentStream?.toolCallId && !toolCallData;

if (isToolCallEndEvent || isToolCallArgsEvent || isToolCallStartEvent) {
this.activeRun!.hasFunctionStreaming = true;
}

const reasoningData = resolveReasoningContent(event.data);
const messageContent = resolveMessageContent(event.data.chunk.content);
const isMessageContentEvent = Boolean(!toolCallData && messageContent);
Expand Down Expand Up @@ -768,6 +773,27 @@ export class LangGraphAgent extends AbstractAgent {
rawEvent: event,
});
break;
case LangGraphEventTypes.OnToolEnd:
if (this.activeRun!.hasFunctionStreaming) break;
const toolCallOutput = event.data.output
this.dispatchEvent({
type: EventType.TOOL_CALL_START,
toolCallId: toolCallOutput.tool_call_id,
toolCallName: toolCallOutput.name,
parentMessageId: toolCallOutput.id,
rawEvent: event,
})
this.dispatchEvent({
type: EventType.TOOL_CALL_ARGS,
toolCallId: toolCallOutput.tool_call_id,
delta: JSON.stringify(event.data.input),
rawEvent: event,
});
this.dispatchEvent({
type: EventType.TOOL_CALL_END,
toolCallId: toolCallOutput.tool_call_id,
rawEvent: event,
});
}
}

Expand Down
1 change: 1 addition & 0 deletions typescript-sdk/integrations/langgraph/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export interface RunMetadata {
manuallyEmittedState?: State | null;
threadId?: string;
graphInfo?: AssistantGraph
hasFunctionStreaming?: boolean;
}

export type MessagesInProgressRecord = Record<string, MessageInProgress | null>;
Expand Down
Loading