Skip to content

Commit 3fccd97

Browse files
committed
feat: implement agent streaming
1 parent 5655551 commit 3fccd97

File tree

1 file changed

+89
-80
lines changed

1 file changed

+89
-80
lines changed

packages/agent-api/src/functions/chats-post.ts

Lines changed: 89 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,15 @@ import { HttpRequest, InvocationContext, HttpResponseInit, app } from '@azure/fu
44
import { AIChatCompletionRequest, AIChatCompletionDelta } from '@microsoft/ai-chat-protocol';
55
import { AzureChatOpenAI } from '@langchain/openai';
66
import { AzureCosmsosDBNoSQLChatMessageHistory } from '@langchain/azure-cosmosdb';
7-
import { BaseChatModel } from '@langchain/core/language_models/chat_models';
8-
import { RunnableWithMessageHistory } from '@langchain/core/runnables';
9-
import { ChatPromptTemplate } from '@langchain/core/prompts';
10-
import { createToolCallingAgent } from 'langchain/agents';
11-
import { AgentExecutor } from 'langchain/agents';
7+
import { createReactAgent } from "@langchain/langgraph/prebuilt";
8+
import { AIMessage, HumanMessage } from '@langchain/core/messages';
129
import { loadMcpTools } from '@langchain/mcp-adapters';
10+
import { StreamEvent } from '@langchain/core/tracers/log_stream';
1311
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
12+
import { Client } from '@modelcontextprotocol/sdk/client/index';
1413
import { getAzureOpenAiTokenProvider, getCredentials, getInternalUserId } from '../auth.js';
15-
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
16-
import { ChainValues } from '@langchain/core/utils/types.js';
1714

18-
const agentSystemPrompt = `
19-
## Role
15+
const agentSystemPrompt = `## Role
2016
You an expert assistant that helps users with managing burger orders. Use the provided tools to get the information you need and perform actions on behalf of the user.
2117
Only answer to requests that are related to burger orders and the menu. If the user asks for something else, politely inform them that you can only assist with burger orders.
2218
Be conversational and friendly, like a real person would be, but keep your answers concise and to the point.
@@ -73,8 +69,6 @@ export async function postChats(request: HttpRequest, context: InvocationContext
7369
};
7470
}
7571

76-
let model: BaseChatModel;
77-
let chatHistory;
7872
const sessionId = ((chatContext as any)?.sessionId as string) || randomUUID();
7973
context.log(`userId: ${userId}, sessionId: ${sessionId}`);
8074

@@ -92,10 +86,8 @@ export async function postChats(request: HttpRequest, context: InvocationContext
9286
const credentials = getCredentials();
9387
const azureADTokenProvider = getAzureOpenAiTokenProvider();
9488

95-
model = new AzureChatOpenAI({ azureADTokenProvider });
96-
97-
// Initialize chat history
98-
chatHistory = new AzureCosmsosDBNoSQLChatMessageHistory({
89+
const model = new AzureChatOpenAI({ azureADTokenProvider, streaming: true });
90+
const chatHistory = new AzureCosmsosDBNoSQLChatMessageHistory({
9991
sessionId,
10092
userId,
10193
credentials,
@@ -111,64 +103,56 @@ export async function postChats(request: HttpRequest, context: InvocationContext
111103
await client.connect(transport);
112104
context.log('Connected to Burger MCP server using Streamable HTTP transport');
113105

114-
const tools = await loadMcpTools('burger', client, {
115-
// Whether to throw errors if a tool fails to load (optional, default: true)
116-
throwOnLoadError: true,
117-
// Whether to prefix tool names with the server name (optional, default: false)
118-
prefixToolNameWithServerName: false,
119-
// Optional additional prefix for tool names (optional, default: "")
120-
additionalToolNamePrefix: '',
121-
});
122-
123-
for (const tool of tools) {
124-
if (!(tool.schema as any).properties) {
125-
(tool as any).schema = undefined;
126-
}
127-
}
128-
106+
const tools = await loadMcpTools('burger', client);
129107
context.log(`Loaded ${tools.length} tools from Burger MCP server`);
130108

131-
const prompt = ChatPromptTemplate.fromMessages([
132-
['system', agentSystemPrompt],
133-
['human', 'userId: {userId}'],
134-
['placeholder', '{chat_history}'],
135-
['human', '{question}'],
136-
['placeholder', '{agent_scratchpad}'],
137-
]);
138-
139-
const agent = createToolCallingAgent({
109+
const agent = createReactAgent({
140110
llm: model,
141111
tools,
142-
prompt,
143-
});
144-
const agentExecutor = new AgentExecutor({
145-
agent,
146-
tools,
147-
returnIntermediateSteps: true,
148-
verbose: true,
112+
prompt: agentSystemPrompt
149113
});
150114

151-
// Handle chat history
152-
const agentChainWithHistory = new RunnableWithMessageHistory({
153-
runnable: agentExecutor,
154-
inputMessagesKey: 'question',
155-
historyMessagesKey: 'chat_history',
156-
getMessageHistory: async () => chatHistory,
157-
});
158-
// Add question and start the agent
159115
const question = messages.at(-1)!.content;
160-
const responseStream = await agentChainWithHistory.stream({ userId, question }, { configurable: { sessionId } });
161-
const jsonStream = Readable.from(createJsonStream(responseStream, sessionId));
116+
const previousMessages = await chatHistory.getMessages();
117+
context.log(`Previous messages in history: ${previousMessages.length}`);
118+
119+
// Start the agent and stream the response events
120+
const responseStream = await agent.streamEvents(
121+
{
122+
messages: [
123+
['human', `userId: ${userId}`],
124+
...previousMessages,
125+
['human', question]
126+
]
127+
},
128+
{
129+
configurable: { sessionId },
130+
version: 'v2'
131+
},
132+
);
133+
134+
// Update chat history when the response is complete
135+
const onResponseComplete = async (content: string) => {
136+
try {
137+
await chatHistory.addMessages([
138+
new HumanMessage(question),
139+
new AIMessage(content),
140+
]);
141+
context.log('Chat history updated successfully');
142+
} catch (error) {
143+
context.error('Error updating chat history:', error);
144+
}
145+
}
146+
147+
const jsonStream = Readable.from(createJsonStream(responseStream, sessionId, onResponseComplete));
162148

163149
// Create a short title for this chat session
164150
const { title } = await chatHistory.getContext();
165151
if (!title) {
166-
const response = await ChatPromptTemplate.fromMessages([
152+
const response = await model.invoke([
167153
['system', titleSystemPrompt],
168-
['human', '{question}'],
169-
])
170-
.pipe(model)
171-
.invoke({ question });
154+
['human', question],
155+
]);
172156
context.log(`Title for session: ${response.content as string}`);
173157
chatHistory.setContext({ title: response.content });
174158
}
@@ -179,7 +163,7 @@ export async function postChats(request: HttpRequest, context: InvocationContext
179163
'Transfer-Encoding': 'chunked',
180164
},
181165
body: jsonStream,
182-
}
166+
};
183167
} catch (_error: unknown) {
184168
const error = _error as Error;
185169
context.error(`Error when processing chat-post request: ${error.message}`);
@@ -189,29 +173,54 @@ export async function postChats(request: HttpRequest, context: InvocationContext
189173
jsonBody: {
190174
error: 'Internal server error while processing the request',
191175
},
192-
}
176+
};
193177
}
194178
}
195179

196180
// Transform the response chunks into a JSON stream
197-
async function* createJsonStream(chunks: AsyncIterable<ChainValues>, sessionId: string) {
181+
async function* createJsonStream(chunks: AsyncIterable<StreamEvent>, sessionId: string, onComplete: (responseContent: string) => Promise<void>) {
198182
for await (const chunk of chunks) {
199-
if (!chunk) continue;
200-
201-
const responseChunk: AIChatCompletionDelta = {
202-
delta: {
203-
content: chunk.output ?? '',
204-
role: 'assistant',
205-
context: chunk.intermediateSteps
206-
? {
207-
intermediateSteps: chunk.intermediateSteps,
208-
}
209-
: undefined,
210-
},
211-
context: {
212-
sessionId,
213-
},
214-
};
183+
const data = chunk.data;
184+
let responseChunk: AIChatCompletionDelta | undefined;
185+
186+
if (chunk.event === 'on_chain_end' && chunk.name === 'RunnableSequence') {
187+
// End of our agentic chain
188+
const content = data?.output?.content ?? '';
189+
await onComplete(content);
190+
191+
} else if (chunk.event === 'on_chat_model_stream' && data.chunk.content.length > 0) {
192+
// Streaming response from the LLM
193+
responseChunk = {
194+
delta: {
195+
content: data.chunk.content,
196+
role: 'assistant',
197+
},
198+
context: {
199+
sessionId,
200+
},
201+
};
202+
} else if (chunk.event === 'on_tool_end') {
203+
// Tool call completed
204+
responseChunk = {
205+
delta: {
206+
context: {
207+
intermediateSteps: [{
208+
type: 'tool',
209+
name: chunk.name,
210+
input: data?.input?.input ? data.input?.input : undefined,
211+
output: data?.output.content ? data?.output.content : undefined,
212+
}],
213+
}
214+
},
215+
context: {
216+
sessionId,
217+
},
218+
};
219+
}
220+
221+
if (!responseChunk) {
222+
continue;
223+
}
215224

216225
// Format response chunks in Newline delimited JSON
217226
// see https://github.com/ndjson/ndjson-spec

0 commit comments

Comments
 (0)