Skip to content

Commit d7952c4

Browse files
authored
πŸ“¦ NEW: Stream support in tool call (#101)
* πŸ“¦ NEW: Stream support when tools are present * πŸ“¦ NEW: Stream tool call example * πŸ‘Œ IMPROVE: Code * πŸ› FIX: Anthropic tool call stream support * πŸ‘Œ IMPROVE: Code * πŸ‘Œ IMPROVE: Code * πŸ“¦ NEW: Anthropic check
1 parent 386b9f8 commit d7952c4

File tree

8 files changed

+262
-13
lines changed

8 files changed

+262
-13
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import pipeWithToolsStream from '@/baseai/pipes/pipe-with-tool-stream';
2+
import {Pipe, RunResponseStream} from '@baseai/core';
3+
import {NextRequest} from 'next/server';
4+
5+
export async function POST(req: NextRequest) {
6+
const runOptions = await req.json();
7+
8+
// 1. Initiate the Pipe.
9+
const pipe = new Pipe(pipeWithToolsStream());
10+
11+
// 2. Run the pipe with user messages and other run options.
12+
let {stream, threadId} = (await pipe.run(
13+
runOptions,
14+
)) as unknown as RunResponseStream;
15+
16+
// 3. Stream the response.
17+
return new Response(stream, {
18+
status: 200,
19+
headers: {
20+
'lb-thread-id': threadId ?? '',
21+
},
22+
});
23+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import PipeRunToolStreamExample from '@/components/pipe-run-with-tool-stream';
2+
import GoHome from '@/components/ui/go-home';
3+
4+
export default function Page() {
5+
return (
6+
<div className="w-full max-w-md">
7+
<GoHome />
8+
9+
<h1 className="text-2xl font-light text-gray-800 mb-1 text-center">
10+
AI Agent Pipes: Tool Calling
11+
</h1>
12+
<p className="text-muted-foreground text-base font-light mb-20 text-center">
13+
Run a pipe with tool calling.
14+
</p>
15+
<PipeRunToolStreamExample />
16+
</div>
17+
);
18+
}

β€Žexamples/nextjs/app/page.tsx

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ import Link from 'next/link';
22

33
export default function Page() {
44
const examples = [
5-
{title: 'Pipe Run', href: '/demo/pipe-run'},
6-
{title: 'Pipe Run Stream', href: '/demo/pipe-run-stream'},
7-
{title: 'Chat Simple', href: '/demo/chat-simple'},
8-
{title: 'Chat Advanced', href: '/demo/chat-advanced'},
9-
{title: 'Tool Calling', href: '/demo/tool-calling'},
5+
{ title: 'Pipe Run', href: '/demo/pipe-run' },
6+
{ title: 'Pipe Run Stream', href: '/demo/pipe-run-stream' },
7+
{ title: 'Chat Simple', href: '/demo/chat-simple' },
8+
{ title: 'Chat Advanced', href: '/demo/chat-advanced' },
9+
{ title: 'Tool Calling', href: '/demo/tool-calling' },
10+
{ title: 'Tool Calling Stream', href: '/demo/tool-calling-stream' },
1011
{
1112
title: 'Tool Calling: Pipes as Tools',
1213
href: '/demo/pipe-run-pipes-as-tools',
1314
},
14-
{title: 'Memory', href: '/demo/memory'},
15+
{ title: 'Memory', href: '/demo/memory' },
1516
];
1617

1718
return (
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import {PipeI} from '@baseai/core';
2+
import toolCalculator from '../tools/calculator';
3+
import toolGetWeather from '../tools/weather';
4+
5+
const pipeWithToolsStream = (): PipeI => ({
6+
apiKey: process.env.LANGBASE_API_KEY!,
7+
name: 'pipe-with-tool',
8+
description: 'An AI agent pipe that can call tools',
9+
status: 'public',
10+
model: 'openai:gpt-4o-mini',
11+
stream: true,
12+
json: false,
13+
store: true,
14+
moderate: true,
15+
top_p: 1,
16+
max_tokens: 1000,
17+
temperature: 0.7,
18+
presence_penalty: 1,
19+
frequency_penalty: 1,
20+
stop: [],
21+
tool_choice: 'auto',
22+
parallel_tool_calls: true,
23+
messages: [{role: 'system', content: `You are a helpful AI assistant.`}],
24+
variables: [],
25+
memory: [],
26+
tools: [toolGetWeather(), toolCalculator()],
27+
});
28+
export default pipeWithToolsStream;

β€Žexamples/nextjs/baseai/pipes/pipe-with-tool.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const pipeWithTools = (): PipeI => ({
88
description: 'An AI agent pipe that can call tools',
99
status: 'public',
1010
model: 'openai:gpt-4o-mini',
11-
stream: true,
11+
stream: false,
1212
json: false,
1313
store: true,
1414
moderate: true,
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
'use client';
2+
3+
import { Button } from '@/components/ui/button';
4+
import { Input } from '@/components/ui/input';
5+
import { getRunner, getTextContent } from '@baseai/core';
6+
import { useState } from 'react';
7+
8+
export default function PipeRunToolStreamExample() {
9+
const [prompt, setPrompt] = useState(
10+
'What is the weather in SF. Square root of 9 and then add 7?',
11+
);
12+
const [completion, setCompletion] = useState('');
13+
const [loading, setLoading] = useState(false);
14+
15+
const handleSubmit = async (e: any) => {
16+
e.preventDefault();
17+
if (!prompt.trim()) return;
18+
19+
setLoading(true);
20+
try {
21+
const response = await fetch('/api/langbase/pipes/run-tool-stream', {
22+
method: 'POST',
23+
headers: { 'Content-Type': 'application/json' },
24+
body: JSON.stringify({
25+
messages: [{ role: 'user', content: prompt }],
26+
}),
27+
});
28+
29+
if (!response.ok) {
30+
throw new Error('Network response was not ok');
31+
}
32+
33+
const runner = getRunner(response.body as ReadableStream<Uint8Array>);
34+
35+
let localCompletion = '';
36+
for await (const chunk of runner) {
37+
const textPart = getTextContent(chunk);
38+
localCompletion += textPart;
39+
setCompletion(localCompletion);
40+
}
41+
} catch (error) {
42+
console.error('Error:', error);
43+
setCompletion('An error occurred while generating the completion.');
44+
} finally {
45+
setLoading(false);
46+
}
47+
};
48+
49+
return (
50+
<div className="bg-neutral-200 rounded-md p-2 flex flex-col gap-2 w-full">
51+
<form
52+
onSubmit={handleSubmit}
53+
className="flex flex-col w-full items-center gap-2"
54+
>
55+
<Input
56+
type="text"
57+
placeholder="Enter prompt message here"
58+
value={prompt}
59+
onChange={e => setPrompt(e.target.value)}
60+
required
61+
/>
62+
63+
<Button type="submit" className="w-full" disabled={loading}>
64+
{loading ? 'AI is thinking...' : 'Ask AI'}
65+
</Button>
66+
</form>
67+
68+
{!loading && completion && (
69+
<p className="mt-4">
70+
<strong>AI:</strong> {completion}
71+
</p>
72+
)}
73+
</div>
74+
);
75+
}

β€Žpackages/core/src/helpers/stream.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {ChatCompletionStream} from 'openai/lib/ChatCompletionStream';
22
import {ChunkStream} from 'src/pipes';
33
import {Stream} from 'openai/streaming';
4+
import {ToolCall} from 'types/pipes';
45

56
export interface Runner extends ChatCompletionStream<null> {}
67

@@ -81,3 +82,17 @@ export function handleResponseStream({
8182
}
8283
return result;
8384
}
85+
86+
/**
87+
* Retrieves tool calls from a given readable stream.
88+
*
89+
* @param stream - The readable stream from which to extract tool calls.
90+
* @returns A promise that resolves to an array of `ToolCall` objects.
91+
*/
92+
export async function getToolsFromStream(
93+
stream: ReadableStream<any>,
94+
): Promise<ToolCall[]> {
95+
let run = getRunner(stream);
96+
const {choices} = await run.finalChatCompletion();
97+
return choices[0].message.tool_calls;
98+
}

β€Žpackages/core/src/pipes/pipes.ts

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import {getLLMApiKey} from '../utils/get-llm-api-key';
55
import {getApiUrl, isProd} from '../utils/is-prod';
66
import {toOldPipeFormat} from '../utils/to-old-pipe-format';
77
import {isLocalServerRunning} from 'src/utils/local-server-running';
8+
import {getToolsFromStream} from 'src/helpers';
9+
import {ANTHROPIC} from 'src/data/models';
810

911
export interface Variable {
1012
name: string;
@@ -140,17 +142,89 @@ export class Pipe {
140142
}
141143

142144
private isStreamRequested(options: RunOptions | RunOptionsStream): boolean {
143-
return 'stream' in options && options.stream === true;
145+
return (
146+
('stream' in options && options.stream === true) ||
147+
this.pipe.meta.stream
148+
);
144149
}
145150

146151
private warnIfToolsWithStream(requestedStream: boolean): void {
147152
if (this.hasTools && requestedStream) {
148153
console.warn(
149-
'Warning: Streaming is not yet supported when tools are present in the pipe. Falling back to non-streaming mode.',
154+
'Warning: Streaming is not yet supported in Anthropic models when tools are present in the pipe. Falling back to non-streaming mode.',
150155
);
151156
}
152157
}
153158

159+
private async handleStreamResponse(
160+
options: RunOptionsStream,
161+
response: RunResponseStream,
162+
): Promise<RunResponseStream> {
163+
const endpoint = '/beta/pipes/run';
164+
const stream = this.isStreamRequested(options);
165+
const body = {...options, stream};
166+
167+
const [streamForToolCall, streamForReturn] = response.stream.tee();
168+
const tools = await getToolsFromStream(streamForToolCall);
169+
170+
if (tools.length) {
171+
let messages = options.messages || [];
172+
173+
let currentResponse: RunResponseStream = {
174+
stream: streamForReturn,
175+
threadId: response.threadId,
176+
rawResponse: response.rawResponse,
177+
};
178+
179+
let callCount = 0;
180+
181+
while (callCount < this.maxCalls) {
182+
const [streamForToolCall, streamForReturn] =
183+
currentResponse.stream.tee();
184+
185+
const tools = await getToolsFromStream(streamForToolCall);
186+
187+
if (tools.length === 0) {
188+
return {
189+
stream: streamForReturn,
190+
threadId: currentResponse.threadId,
191+
rawResponse: response.rawResponse,
192+
};
193+
}
194+
195+
const toolResults = await this.runTools(tools);
196+
197+
const responseMessage = {
198+
role: 'assistant',
199+
content: null,
200+
tool_calls: tools,
201+
} as Message;
202+
203+
messages = this.getMessagesToSend(
204+
messages,
205+
responseMessage,
206+
toolResults,
207+
);
208+
209+
currentResponse = await this.createRequest<RunResponseStream>(
210+
endpoint,
211+
{
212+
...body,
213+
messages,
214+
threadId: currentResponse.threadId,
215+
},
216+
);
217+
218+
callCount++;
219+
}
220+
}
221+
222+
return {
223+
...response,
224+
stream: streamForReturn,
225+
} as RunResponseStream;
226+
}
227+
154228
public async run(options: RunOptionsStream): Promise<RunResponseStream>;
155229
public async run(options: RunOptions): Promise<RunResponse>;
156230
public async run(
@@ -163,9 +237,16 @@ export class Pipe {
163237
// logger('pipe.run.options');
164238
// logger(options, {depth: null, colors: true});
165239

166-
const requestedStream = this.isStreamRequested(options);
167-
const stream = this.hasTools ? false : requestedStream;
168-
this.warnIfToolsWithStream(requestedStream);
240+
const isAnthropic = this.pipe.model.provider === ANTHROPIC;
241+
const hasTools = this.pipe.tools.length > 0;
242+
243+
let stream = this.isStreamRequested(options);
244+
245+
// Anthropic models don't support streaming with tools.
246+
if (isAnthropic && hasTools && stream) {
247+
this.warnIfToolsWithStream(stream);
248+
stream = false;
249+
}
169250

170251
const runTools = options.runTools ?? true;
171252
delete options.runTools;
@@ -179,10 +260,18 @@ export class Pipe {
179260
return {} as RunResponse | RunResponseStream;
180261
}
181262

182-
if (stream || !runTools) {
263+
if (!runTools) {
183264
return response as RunResponseStream;
184265
}
185266

267+
if (stream) {
268+
return await this.handleStreamResponse(
269+
options as RunOptionsStream,
270+
response as RunResponseStream,
271+
);
272+
}
273+
274+
// STREAM IS OFF
186275
let messages = options.messages || [];
187276
let currentResponse = response as RunResponse;
188277
let callCount = 0;

0 commit comments

Comments
Β (0)