Skip to content
Draft
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .changeset/ai-sdk-chat-transport.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
"@trigger.dev/sdk": minor
---

Add AI SDK chat transport integration via two new subpath exports:

**`@trigger.dev/sdk/chat`** (frontend, browser-safe):
- `TriggerChatTransport` — custom `ChatTransport` for the AI SDK's `useChat` hook that runs chat completions as durable Trigger.dev tasks
- `createChatTransport()` — factory function

```tsx
import { useChat } from "@ai-sdk/react";
import { TriggerChatTransport } from "@trigger.dev/sdk/chat";

const { messages, sendMessage } = useChat({
transport: new TriggerChatTransport({
task: "my-chat-task",
accessToken,
}),
});
```

**`@trigger.dev/sdk/ai`** (backend, extends existing `ai.tool`/`ai.currentToolOptions`):
- `chatTask()` — pre-typed task wrapper with auto-pipe support
- `pipeChat()` — pipe a `StreamTextResult` or stream to the frontend
- `CHAT_STREAM_KEY` — the default stream key constant
- `ChatTaskPayload` type

```ts
import { chatTask } from "@trigger.dev/sdk/ai";
import { streamText, convertToModelMessages } from "ai";

export const myChatTask = chatTask({
id: "my-chat-task",
run: async ({ messages }) => {
return streamText({
model: openai("gpt-4o"),
messages: convertToModelMessages(messages),
});
},
});
```
17 changes: 16 additions & 1 deletion packages/trigger-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
"./package.json": "./package.json",
".": "./src/v3/index.ts",
"./v3": "./src/v3/index.ts",
"./ai": "./src/v3/ai.ts"
"./ai": "./src/v3/ai.ts",
"./chat": "./src/v3/chat.ts"
},
"sourceDialects": [
"@triggerdotdev/source"
Expand All @@ -37,6 +38,9 @@
],
"ai": [
"dist/commonjs/v3/ai.d.ts"
],
"chat": [
"dist/commonjs/v3/chat.d.ts"
]
}
},
Expand Down Expand Up @@ -123,6 +127,17 @@
"types": "./dist/commonjs/v3/ai.d.ts",
"default": "./dist/commonjs/v3/ai.js"
}
},
"./chat": {
"import": {
"@triggerdotdev/source": "./src/v3/chat.ts",
"types": "./dist/esm/v3/chat.d.ts",
"default": "./dist/esm/v3/chat.js"
},
"require": {
"types": "./dist/commonjs/v3/chat.d.ts",
"default": "./dist/commonjs/v3/chat.js"
}
}
},
"main": "./dist/commonjs/v3/index.js",
Expand Down
242 changes: 242 additions & 0 deletions packages/trigger-sdk/src/v3/ai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ import {
isSchemaZodEsque,
Task,
type inferSchemaIn,
type PipeStreamOptions,
type TaskOptions,
type TaskSchema,
type TaskWithSchema,
} from "@trigger.dev/core/v3";
import type { UIMessage } from "ai";
import { dynamicTool, jsonSchema, JSONSchema7, Schema, Tool, ToolCallOptions, zodSchema } from "ai";
import { metadata } from "./metadata.js";
import { streams } from "./streams.js";
import { createTask } from "./shared.js";

const METADATA_KEY = "tool.execute.options";

Expand Down Expand Up @@ -116,3 +121,240 @@ export const ai = {
tool: toolFromTask,
currentToolOptions: getToolOptionsFromMetadata,
};

// ---------------------------------------------------------------------------
// Chat transport helpers — backend side
// ---------------------------------------------------------------------------

/**
* The default stream key used for chat transport communication.
* Both `TriggerChatTransport` (frontend) and `pipeChat`/`chatTask` (backend)
* use this key by default.
*/
export const CHAT_STREAM_KEY = "chat";

/**
* The payload shape that the chat transport sends to the triggered task.
*
* When using `chatTask()`, the payload is automatically typed — you don't need
* to import this type. Use this type only if you're using `task()` directly
* with `pipeChat()`.
*/
export type ChatTaskPayload<TMessage extends UIMessage = UIMessage> = {
/** The conversation messages */
messages: TMessage[];

/** The unique identifier for the chat session */
chatId: string;

/**
* The trigger type:
* - `"submit-message"`: A new user message
* - `"regenerate-message"`: Regenerate the last assistant response
*/
trigger: "submit-message" | "regenerate-message";

/** The ID of the message to regenerate (only for `"regenerate-message"`) */
messageId?: string;

/** Custom metadata from the frontend */
metadata?: unknown;
};

/**
* Options for `pipeChat`.
*/
export type PipeChatOptions = {
/**
* Override the stream key. Must match the `streamKey` on `TriggerChatTransport`.
* @default "chat"
*/
streamKey?: string;

/** An AbortSignal to cancel the stream. */
signal?: AbortSignal;

/**
* The target run ID to pipe to.
* @default "self" (current run)
*/
target?: string;
};

/**
* An object with a `toUIMessageStream()` method (e.g. `StreamTextResult` from `streamText()`).
*/
type UIMessageStreamable = {
toUIMessageStream: (...args: any[]) => AsyncIterable<unknown> | ReadableStream<unknown>;
};

function isUIMessageStreamable(value: unknown): value is UIMessageStreamable {
return (
typeof value === "object" &&
value !== null &&
"toUIMessageStream" in value &&
typeof (value as any).toUIMessageStream === "function"
);
}

function isAsyncIterable(value: unknown): value is AsyncIterable<unknown> {
return typeof value === "object" && value !== null && Symbol.asyncIterator in value;
}

function isReadableStream(value: unknown): value is ReadableStream<unknown> {
return typeof value === "object" && value !== null && typeof (value as any).getReader === "function";
}
Comment on lines +186 to +206
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "package.json" -path "*/trigger-sdk/*" | head -5

Repository: triggerdotdev/trigger.dev

Length of output: 104


🌐 Web query:

Vercel AI SDK v6 streamText toUIMessageStream method documentation

💡 Result:

In AI SDK v6, toUIMessageStream is a method on the streamText() result (not streamText itself):

  • API reference (includes signature + options): streamText()toUIMessageStream(options?: UIMessageStreamOptions): AsyncIterableStream<UIMessageChunk>. Options include onFinish, messageMetadata, sendReasoning, sendSources, sendStart, sendFinish, onError, and consumeSseStream (for abort handling). [1]
  • Related helpers on the same result object: toUIMessageStreamResponse() and pipeUIMessageStreamToResponse() (for returning/piping an HTTP response using the UI message stream protocol). [1]
  • Usage examples:
    • Fastify cookbook shows reply.send(result.toUIMessageStream()) and writer.merge(result.toUIMessageStream({ sendStart: false })). [2]
    • “Streaming Custom Data” shows merging a streamText() UI stream into a custom createUIMessageStream via writer.merge(result.toUIMessageStream()). [3]

Sources: [1] streamText() reference, [2] Fastify cookbook, [3] Streaming Custom Data docs.


🌐 Web query:

Vercel AI SDK method names toDataStream toAIStream history versions

💡 Result:

ai (Vercel AI SDK) streaming helper method name history

AI SDK version What existed What changed / what to use instead
2.x AIStream, StreamingTextResponse, streamToResponse These 2.x exports were removed; Vercel recommends streamText(...).toDataStream() / .toDataStreamResponse() instead. [1]
3.4 streamText(...) result had AI-stream helpers: toAIStream(), pipeAIStreamToResponse(...), toAIStreamResponse() These methods still existed in 3.4 but were later removed. [1]
4.0+ streamText(...) result uses Data-stream helpers: toDataStream(), pipeDataStreamToResponse(...), toDataStreamResponse() In 4.0, the AI-stream methods (toAIStream, etc.) were removed in favor of the Data-stream methods above. [1]
4.0 (LangChain adapter) LangChainAdapter.toAIStream(...) Removed in 4.0; use LangChainAdapter.toDataStream(...). [1]
4.0+ (Express/etc.) streamToResponse(...) Reported removed “after version 4”; migration is towards piping the SDK data stream (pipeDataStreamToResponse) rather than streamToResponse. [2]

Sources: AI SDK 3.4 → 4.0 migration guide (breaking changes) [1], Vercel GitHub discussion on streamToResponse removal [2].


🏁 Script executed:

cat packages/trigger-sdk/package.json | grep -A 10 -B 2 "peerDependencies"

Repository: triggerdotdev/trigger.dev

Length of output: 377


🏁 Script executed:

cat -n packages/trigger-sdk/src/v3/ai.ts | head -230 | tail -50

Repository: triggerdotdev/trigger.dev

Length of output: 2084


🏁 Script executed:

rg "isUIMessageStreamable|toUIMessageStream|toDataStream" packages/trigger-sdk/src/v3/ai.ts

Repository: triggerdotdev/trigger.dev

Length of output: 881


Update isUIMessageStreamable to support AI SDK v4/v5 or declare v6+ requirement in peer dependency.

The code only checks for toUIMessageStream() (v6 method), but the peer dependency explicitly includes ^4.2.0 and ^5.0.0, which use toDataStream() instead. Users on v4 or v5 will experience silent failures when pipeChat() or auto-piping encounters a StreamTextResult—the type guard returns false, causing the stream to be rejected.

Either:

  1. Support both: add a fallback check for toDataStream() and abstract the method call
  2. Update peer dependency to ^6.0.0 only if v6 is the minimum supported version
  3. Document the implicit v6+ requirement in comments (least preferred)
🤖 Prompt for AI Agents
In `@packages/trigger-sdk/src/v3/ai.ts` around lines 186 - 206, The type guard
isUIMessageStreamable only detects the v6 method toUIMessageStream, causing
v4/v5 StreamTextResult (which expose toDataStream) to be rejected; update
isUIMessageStreamable to accept either method (check for toUIMessageStream OR
toDataStream) and ensure calling code that uses toUIMessageStream (e.g.,
pipeChat/auto-piping logic) is adjusted to call the available method (prefer
toUIMessageStream if present, otherwise call toDataStream) so both v4/v5 and v6+
SDKs work; alternatively, if you choose to drop v4/v5 support, update the peer
dependency to ^6.0.0 and add a clear comment, but the recommended change is to
extend isUIMessageStreamable and dispatch to the existing method names.


/**
* Pipes a chat stream to the realtime stream, making it available to the
* `TriggerChatTransport` on the frontend.
*
* Accepts:
* - A `StreamTextResult` from `streamText()` (has `.toUIMessageStream()`)
* - An `AsyncIterable` of `UIMessageChunk`s
* - A `ReadableStream` of `UIMessageChunk`s
*
* Must be called from inside a Trigger.dev task's `run` function.
*
* @example
* ```ts
* import { task } from "@trigger.dev/sdk";
* import { pipeChat, type ChatTaskPayload } from "@trigger.dev/sdk/ai";
* import { streamText, convertToModelMessages } from "ai";
*
* export const myChatTask = task({
* id: "my-chat-task",
* run: async (payload: ChatTaskPayload) => {
* const result = streamText({
* model: openai("gpt-4o"),
* messages: convertToModelMessages(payload.messages),
* });
*
* await pipeChat(result);
* },
* });
* ```
*
* @example
* ```ts
* // Works from anywhere inside a task — even deep in your agent code
* async function runAgentLoop(messages: CoreMessage[]) {
* const result = streamText({ model, messages });
* await pipeChat(result);
* }
* ```
*/
export async function pipeChat(
source: UIMessageStreamable | AsyncIterable<unknown> | ReadableStream<unknown>,
options?: PipeChatOptions
): Promise<void> {
const streamKey = options?.streamKey ?? CHAT_STREAM_KEY;

let stream: AsyncIterable<unknown> | ReadableStream<unknown>;

if (isUIMessageStreamable(source)) {
stream = source.toUIMessageStream();
} else if (isAsyncIterable(source) || isReadableStream(source)) {
stream = source;
} else {
throw new Error(
"pipeChat: source must be a StreamTextResult (with .toUIMessageStream()), " +
"an AsyncIterable, or a ReadableStream"
);
}

const pipeOptions: PipeStreamOptions = {};
if (options?.signal) {
pipeOptions.signal = options.signal;
}
if (options?.target) {
pipeOptions.target = options.target;
}

const { waitUntilComplete } = streams.pipe(streamKey, stream, pipeOptions);
await waitUntilComplete();
}

/**
* Options for defining a chat task.
*
* Extends the standard `TaskOptions` but pre-types the payload as `ChatTaskPayload`
* and overrides `run` to accept `ChatTaskPayload` directly.
*
* **Auto-piping:** If the `run` function returns a value with `.toUIMessageStream()`
* (like a `StreamTextResult`), the stream is automatically piped to the frontend.
* For complex flows, use `pipeChat()` manually from anywhere in your code.
*/
export type ChatTaskOptions<TIdentifier extends string> = Omit<
TaskOptions<TIdentifier, ChatTaskPayload, unknown>,
"run"
> & {
/**
* The run function for the chat task.
*
* Receives a `ChatTaskPayload` with the conversation messages, chat session ID,
* and trigger type.
*
* **Auto-piping:** If this function returns a value with `.toUIMessageStream()`,
* the stream is automatically piped to the frontend.
*/
run: (payload: ChatTaskPayload) => Promise<unknown>;
};

/**
* Creates a Trigger.dev task pre-configured for AI SDK chat.
*
* - **Pre-types the payload** as `ChatTaskPayload` — no manual typing needed
* - **Auto-pipes the stream** if `run` returns a `StreamTextResult`
* - For complex flows, use `pipeChat()` from anywhere inside your task code
*
* @example
* ```ts
* import { chatTask } from "@trigger.dev/sdk/ai";
* import { streamText, convertToModelMessages } from "ai";
* import { openai } from "@ai-sdk/openai";
*
* // Simple: return streamText result — auto-piped to the frontend
* export const myChatTask = chatTask({
* id: "my-chat-task",
* run: async ({ messages }) => {
* return streamText({
* model: openai("gpt-4o"),
* messages: convertToModelMessages(messages),
* });
* },
* });
* ```
*
* @example
* ```ts
* import { chatTask, pipeChat } from "@trigger.dev/sdk/ai";
*
* // Complex: pipeChat() from deep in your agent code
* export const myAgentTask = chatTask({
* id: "my-agent-task",
* run: async ({ messages }) => {
* await runComplexAgentLoop(messages);
* },
* });
* ```
*/
export function chatTask<TIdentifier extends string>(
options: ChatTaskOptions<TIdentifier>
): Task<TIdentifier, ChatTaskPayload, unknown> {
const { run: userRun, ...restOptions } = options;

return createTask<TIdentifier, ChatTaskPayload, unknown>({
...restOptions,
run: async (payload: ChatTaskPayload) => {
const result = await userRun(payload);

// Auto-pipe if the run function returned a StreamTextResult or similar
if (isUIMessageStreamable(result)) {
await pipeChat(result);
}

return result;
},
});
}
Comment on lines +342 to +360
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential double-pipe if user calls pipeChat() inside run and also returns a streamable.

If a user both calls pipeChat(result) manually inside their run function and returns the StreamTextResult, the auto-pipe logic on line 353 would attempt to pipe a second time to the same stream key. Consider either guarding against this (e.g., tracking if pipeChat was already called for the current run) or adding a clear warning in the JSDoc that returning a streamable result and calling pipeChat() manually are mutually exclusive.

🤖 Prompt for AI Agents
In `@packages/trigger-sdk/src/v3/ai.ts` around lines 342 - 360, The chatTask
wrapper can double-pipe if userRun both calls pipeChat(result) and returns a
streamable; update chatTask to guard against double-piping by tracking whether a
stream has already been piped for the current run (e.g., add a local WeakSet/Set
or attach a unique Symbol flag to the StreamTextResult inside pipeChat and check
it before calling pipeChat again in chatTask). Specifically, modify chatTask
(the run wrapper) to check isUIMessageStreamable(result) and then verify the
"already piped" marker on the result before calling pipeChat(result), or
alternatively export/define a Symbol that pipeChat sets so chatTask can skip
piping when present; also add a JSDoc note on chatTask/pipeChat that returning a
streamable and manually calling pipeChat are mutually exclusive if you opt for
the simpler documentation approach.

Loading
Loading