Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions docs/realtime/backend/input-streams.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
---
title: Input Streams
sidebarTitle: Input Streams
description: Send data into running tasks from your backend code
---

The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while [output streams](/realtime/backend/streams) let you read data from tasks, input streams let you push data into them.

<Note>
To learn how to receive input stream data inside your tasks, see [Input
Streams](/tasks/streams#input-streams) in the Streams doc.
</Note>

## Sending data to a running task

### Using defined input streams (Recommended)

The recommended approach is to use [defined input streams](/tasks/streams#defining-input-streams) for full type safety:

```ts
import { cancelSignal, approval } from "./trigger/streams";

// Cancel a running AI stream
await cancelSignal.send(runId, { reason: "User clicked stop" });

// Approve a draft
await approval.send(runId, { approved: true, reviewer: "alice@example.com" });
```

The `.send()` method is fully typed — the data parameter must match the generic type you defined on the input stream.

<Note>
`.send()` works the same regardless of how the task is listening — whether it uses `.wait()`
(suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know
how the task is consuming the data. See [Input Streams](/tasks/streams#input-streams) for details on each
receiving method.
</Note>

## Practical examples

### Cancel from a Next.js API route

```ts app/api/cancel/route.ts
import { cancelStream } from "@/trigger/streams";

export async function POST(req: Request) {
const { runId } = await req.json();

await cancelStream.send(runId, { reason: "User clicked stop" });

return Response.json({ cancelled: true });
}
```

### Approval workflow API

```ts app/api/approve/route.ts
import { approval } from "@/trigger/streams";

export async function POST(req: Request) {
const { runId, approved, reviewer } = await req.json();

await approval.send(runId, {
approved,
reviewer,
});

return Response.json({ success: true });
}
```

### Remix action handler

```ts app/routes/api.approve.ts
import { json, type ActionFunctionArgs } from "@remix-run/node";
import { approval } from "~/trigger/streams";

export async function action({ request }: ActionFunctionArgs) {
const formData = await request.formData();
const runId = formData.get("runId") as string;
const approved = formData.get("approved") === "true";
const reviewer = formData.get("reviewer") as string;

await approval.send(runId, { approved, reviewer });

return json({ success: true });
}
```

### Express handler

```ts
import express from "express";
import { cancelSignal } from "./trigger/streams";

const app = express();
app.use(express.json());

app.post("/api/cancel", async (req, res) => {
const { runId, reason } = req.body;

await cancelSignal.send(runId, { reason });

res.json({ cancelled: true });
});
```

### Sending from another task

You can send input stream data from one task to another running task:

```ts
import { task } from "@trigger.dev/sdk";
import { approval } from "./streams";

export const reviewerTask = task({
id: "auto-reviewer",
run: async (payload: { targetRunId: string }) => {
// Perform automated review logic...
const isApproved = await performReview();

// Send approval to the waiting task
await approval.send(payload.targetRunId, {
approved: isApproved,
reviewer: "auto-reviewer",
});
},
});
```

## Error handling

The `.send()` method will throw if:

- The run has already completed, failed, or been canceled
- The payload exceeds the 1MB size limit
- The run ID is invalid

```ts
import { cancelSignal } from "./trigger/streams";

try {
await cancelSignal.send(runId, { reason: "User clicked stop" });
} catch (error) {
console.error("Failed to send:", error);
// Handle the error — the run may have already completed
}
```

## Important notes

- Maximum payload size per `.send()` call is **1MB**
- You cannot send data to a completed, failed, or canceled run
- Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches
- Input streams require the current streams implementation (v2 is the default in SDK 4.1.0+). See [Streams](/tasks/streams) for details.
65 changes: 64 additions & 1 deletion docs/realtime/react-hooks/streams.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,70 @@ const { parts, error } = useRealtimeStream<string>(runId, {
});
```

For more information on defining and using streams, see the [Realtime Streams v2](/tasks/streams) documentation.
For more information on defining and using streams, see the [Realtime Streams](/tasks/streams) documentation.

## useInputStreamSend

The `useInputStreamSend` hook lets you send data from your frontend into a running task's [input stream](/tasks/streams#input-streams). Use it for cancel buttons, approval forms, or any UI that needs to push typed data into a running task.

### Basic usage

Pass the input stream's `id` (string), the run ID, and options such as `accessToken`. You typically get `runId` and `accessToken` from the object returned when you trigger the task (e.g. `handle.id`, `handle.publicAccessToken`). The hook returns `send`, `isLoading`, `error`, and `isReady`:

```tsx
"use client";

import { useInputStreamSend } from "@trigger.dev/react-hooks";
import { approval } from "@/trigger/streams";

export function ApprovalForm({
runId,
accessToken,
}: {
runId: string;
accessToken: string;
}) {
const { send, isLoading, isReady } = useInputStreamSend(
approval.id,
runId,
{ accessToken }
);

return (
<button
disabled={!isReady || isLoading}
onClick={() => send({ approved: true, reviewer: "alice" })}
>
Approve
</button>
);
}
```

With a generic for type-safe payloads when not using a defined stream:

```tsx
type ApprovalPayload = { approved: boolean; reviewer: string };
const { send } = useInputStreamSend<ApprovalPayload>("approval", runId, {
accessToken,
});
send({ approved: true, reviewer: "alice" });
```

### Options and return value

- **`streamId`**: The input stream identifier (string). Use the `id` from your defined stream (e.g. `approval.id`) or the same string you used in `streams.input<T>({ id: "approval" })`.
- **`runId`**: The run to send input to. When `runId` is undefined, `isReady` is false and `send` will not trigger.
- **`options`**: `accessToken` (required for client usage), `baseURL` (optional). See [Realtime auth](/realtime/auth) for generating a public access token with the right scopes (e.g. input streams write for that run).

Return value:

- **`send(data)`**: Sends typed data to the input stream. Uses SWR mutation under the hood.
- **`isLoading`**: True while a send is in progress.
- **`error`**: Set if the last send failed.
- **`isReady`**: True when both `runId` and access token are available.

For receiving input stream data inside a task (`.wait()`, `.once()`, `.on()`), see [Input Streams](/tasks/streams#input-streams) in the Streams doc.

## useRealtimeRunWithStreams

Expand Down
Loading