Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions docs/realtime/backend/input-streams.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
---
title: Input Streams
sidebarTitle: Input Streams
description: Send data into running tasks from your backend code
---

The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while [output streams](/realtime/backend/streams) let you read data from tasks, input streams let you push data into them.

<Note>
To learn how to receive input stream data inside your tasks, see our [Input
Streams](/tasks/input-streams) documentation.
</Note>

## Sending data to a running task

### Using defined input streams (Recommended)

The recommended approach is to use [defined input streams](/tasks/input-streams#defining-input-streams) for full type safety:

```ts
import { cancelSignal, approval } from "./trigger/streams";

// Cancel a running AI stream
await cancelSignal.send(runId, { reason: "User clicked stop" });

// Approve a draft
await approval.send(runId, { approved: true, reviewer: "alice@example.com" });
```

The `.send()` method is fully typed — the data parameter must match the generic type you defined on the input stream.

<Note>
`.send()` works the same regardless of how the task is listening — whether it uses `.wait()`
(suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know
how the task is consuming the data. See [Input Streams](/tasks/input-streams) for details on each
receiving method.
</Note>

## Practical examples

### Cancel from a Next.js API route

```ts app/api/cancel/route.ts
import { cancelStream } from "@/trigger/streams";

export async function POST(req: Request) {
const { runId } = await req.json();

await cancelStream.send(runId, { reason: "User clicked stop" });

return Response.json({ cancelled: true });
}
```

### Approval workflow API

```ts app/api/approve/route.ts
import { approval } from "@/trigger/streams";

export async function POST(req: Request) {
const { runId, approved, reviewer } = await req.json();

await approval.send(runId, {
approved,
reviewer,
});

return Response.json({ success: true });
}
```

### Remix action handler

```ts app/routes/api.approve.ts
import { json, type ActionFunctionArgs } from "@remix-run/node";
import { approval } from "~/trigger/streams";

export async function action({ request }: ActionFunctionArgs) {
const formData = await request.formData();
const runId = formData.get("runId") as string;
const approved = formData.get("approved") === "true";
const reviewer = formData.get("reviewer") as string;

await approval.send(runId, { approved, reviewer });

return json({ success: true });
}
```

### Express handler

```ts
import express from "express";
import { cancelSignal } from "./trigger/streams";

const app = express();
app.use(express.json());

app.post("/api/cancel", async (req, res) => {
const { runId, reason } = req.body;

await cancelSignal.send(runId, { reason });

res.json({ cancelled: true });
});
```

### Sending from another task

You can send input stream data from one task to another running task:

```ts
import { task } from "@trigger.dev/sdk";
import { approval } from "./streams";

export const reviewerTask = task({
id: "auto-reviewer",
run: async (payload: { targetRunId: string }) => {
// Perform automated review logic...
const isApproved = await performReview();

// Send approval to the waiting task
await approval.send(payload.targetRunId, {
approved: isApproved,
reviewer: "auto-reviewer",
});
},
});
```

## Error handling

The `.send()` method will throw if:

- The run has already completed, failed, or been canceled
- The payload exceeds the 1MB size limit
- The run ID is invalid

```ts
import { cancelSignal } from "./trigger/streams";

try {
await cancelSignal.send(runId, { reason: "User clicked stop" });
} catch (error) {
console.error("Failed to send:", error);
// Handle the error — the run may have already completed
}
```

## Important notes

- Maximum payload size per `.send()` call is **1MB**
- You cannot send data to a completed, failed, or canceled run
- Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches
- Input streams require [Realtime Streams v2](/tasks/streams#enabling-streams-v2) (enabled by default in SDK 4.1.0+)
Loading