Skip to content
Open
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1fe1e02
feat: add streams.input() for inbound realtime data to running tasks
claude Feb 22, 2026
05139d0
refactor: connect input stream tail lazily from listener side
claude Feb 22, 2026
2cbdb9c
fix: gate input stream tail connection on v2 realtime streams
claude Feb 22, 2026
66f7d3d
fix: throw error when input streams are used without v2 realtime streams
claude Feb 22, 2026
e17bf77
docs: add input streams documentation to realtime guides
claude Feb 22, 2026
19c77f4
docs: add API/SDK design proposal for temporal signals versioning
claude Feb 22, 2026
ec26e2c
chore: remove temporal signals versioning design doc
claude Feb 22, 2026
f9273e1
docs: add SDK design proposal for input stream .wait() method
claude Feb 22, 2026
04a1d6f
input stream waitpoints and tests in the hello world reference project
ericallam Feb 23, 2026
6a83fa4
No more input stream multiplexing, no naming collision risk, removed …
ericallam Feb 23, 2026
024426f
Better span experience, show input streams on dashboard, cleanup the …
ericallam Feb 23, 2026
3776d2f
adopt s2-lite, upgrade s2 package with support
ericallam Feb 23, 2026
732ead3
add input streams example
ericallam Feb 27, 2026
a5d104c
better changesets
ericallam Feb 27, 2026
0c9db46
Upgrade the deployment s2 client to 0.22.5 as well and fix some typec…
ericallam Feb 27, 2026
d768007
fix: address CodeRabbit review feedback on input streams PR
ericallam Feb 27, 2026
a08f44f
fix typecheck issue
ericallam Feb 27, 2026
4f379f6
Add result type and .unwrap() helper to inputStream.once().
ericallam Feb 27, 2026
29db992
small jsdocs tweak
ericallam Feb 27, 2026
bcbf874
Reconnect input stream tail after SSE timeout
ericallam Feb 27, 2026
4846fe0
automatically cleanup input stream on handlers and tail SSE requests …
ericallam Feb 27, 2026
6d1a29f
remove realtime rules update, we'll do this in a separate PR
ericallam Feb 28, 2026
e3cbe7d
undo changes to local claude realtime skill, we'll do that in another PR
ericallam Feb 28, 2026
1ba5cd7
delete design doc
ericallam Feb 28, 2026
480f967
Add mock agentRelay example
ericallam Feb 28, 2026
eb70542
remove unused function
ericallam Feb 28, 2026
9d9970d
prevent duplicate messages by reconnecting and passing in the last se…
ericallam Mar 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/input-stream-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/react-hooks": patch
---

Add input streams for bidirectional communication with running tasks. Define typed input streams with `streams.input<T>({ id })`, then consume inside tasks via `.wait()` (suspends the process), `.once()` (waits for next message), or `.on()` (subscribes to a continuous stream). Send data from backends with `.send(runId, data)` or from frontends with the new `useInputStreamSend` React hook.

Upgrade S2 SDK from 0.17 to 0.22 with support for custom endpoints (s2-lite) via the new `endpoints` configuration, `AppendRecord.string()` API, and `maxInflightBytes` session option.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ apps/**/public/build
**/.claude/settings.local.json
.mcp.log
.mcp.json
.cursor/debug.log
.cursor/debug.log
ailogger-output.log
6 changes: 6 additions & 0 deletions .server-changes/input-stream-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add input streams with API routes for sending data to running tasks, SSE reading, and waitpoint creation. Includes Redis cache for fast `.send()` to `.wait()` bridging, dashboard span support for input stream operations, and s2-lite support with configurable S2 endpoint, access token skipping, and S2-Basin headers for self-hosted deployments. Adds s2-lite to Docker Compose for local development.
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const S2EnvSchema = z.preprocess(
S2_ENABLED: z.literal("1"),
S2_ACCESS_TOKEN: z.string(),
S2_DEPLOYMENT_LOGS_BASIN_NAME: z.string(),
S2_DEPLOYMENT_STREAMS_LOCAL: z.string().default("0"),
}),
z.object({
S2_ENABLED: z.literal("0"),
Expand Down Expand Up @@ -1344,6 +1345,8 @@ const EnvironmentSchema = z

REALTIME_STREAMS_S2_BASIN: z.string().optional(),
REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(),
REALTIME_STREAMS_S2_ENDPOINT: z.string().optional(),
REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS: z.enum(["true", "false"]).default("false"),
REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS: z.coerce
.number()
.int()
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ export class DeploymentPresenter {
let eventStream = undefined;
if (
env.S2_ENABLED === "1" &&
(buildServerMetadata || gitMetadata?.source === "trigger_github_app")
(buildServerMetadata || gitMetadata?.source === "trigger_github_app" || env.S2_DEPLOYMENT_STREAMS_LOCAL === "1")
) {
const [error, accessToken] = await tryCatch(this.getS2AccessToken(project.externalRef));

Expand Down Expand Up @@ -290,9 +290,9 @@ export class DeploymentPresenter {
return cachedToken;
}

const { access_token: accessToken } = await s2.accessTokens.issue({
const { accessToken } = await s2.accessTokens.issue({
id: `${projectRef}-${new Date().getTime()}`,
expires_at: new Date(Date.now() + 60 * 60 * 1000).toISOString(), // 1 hour
expiresAt: new Date(Date.now() + 60 * 60 * 1000), // 1 hour
scope: {
ops: ["read"],
basins: {
Expand Down
35 changes: 35 additions & 0 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,41 @@ export class SpanPresenter extends BasePresenter {
},
};
}
case "input-stream": {
if (!span.entity.id) {
logger.error(`SpanPresenter: No input stream id`, {
spanId,
inputStreamId: span.entity.id,
});
return { ...data, entity: null };
}

const [runId, streamId] = span.entity.id.split(":");

if (!runId || !streamId) {
logger.error(`SpanPresenter: Invalid input stream id`, {
spanId,
inputStreamId: span.entity.id,
});
return { ...data, entity: null };
}

// Translate user-facing stream ID to internal S2 stream name
const s2StreamKey = `$trigger.input:${streamId}`;

return {
...data,
entity: {
type: "realtime-stream" as const,
object: {
runId,
streamKey: s2StreamKey,
displayName: streamId,
metadata: undefined,
},
},
};
}
default:
return { ...data, entity: null };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,14 @@ export default function Page() {

const readSession = await stream.readSession(
{
seq_num: 0,
wait: 60,
as: "bytes",
start: { from: { seqNum: 0 }, clamp: true },
stop: { waitSecs: 60 },
},
{ signal: abortController.signal }
);

const decoder = new TextDecoder();

for await (const record of readSession) {
const decoded = decoder.decode(record.body);
const decoded = record.body;
const result = DeploymentEventFromString.safeParse(decoded);

if (!result.success) {
Expand All @@ -217,8 +214,8 @@ export default function Page() {
const headers: Record<string, string> = {};

if (record.headers) {
for (const [nameBytes, valueBytes] of record.headers) {
headers[decoder.decode(nameBytes)] = decoder.decode(valueBytes);
for (const [name, value] of record.headers) {
headers[name] = value;
}
}
const level = (headers["level"]?.toLowerCase() as LogEntry["level"]) ?? "info";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import {
CreateInputStreamWaitpointRequestBody,
type CreateInputStreamWaitpointResponseBody,
} from "@trigger.dev/core/v3";
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
import { $replica } from "~/db.server";
import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server";
import {
deleteInputStreamWaitpoint,
setInputStreamWaitpoint,
} from "~/services/inputStreamWaitpointCache.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { parseDelay } from "~/utils/delays";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { engine } from "~/v3/runEngine.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";

const ParamsSchema = z.object({
runFriendlyId: z.string(),
});

const { action, loader } = createActionApiRoute(
{
params: ParamsSchema,
body: CreateInputStreamWaitpointRequestBody,
maxContentLength: 1024 * 10, // 10KB
method: "POST",
},
async ({ authentication, body, params }) => {
try {
const run = await $replica.taskRun.findFirst({
where: {
friendlyId: params.runFriendlyId,
runtimeEnvironmentId: authentication.environment.id,
},
select: {
id: true,
friendlyId: true,
realtimeStreamsVersion: true,
},
});

if (!run) {
return json({ error: "Run not found" }, { status: 404 });
}

const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
: undefined;

const timeout = await parseDelay(body.timeout);

// Process tags (same pattern as api.v1.waitpoints.tokens.ts)
const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags;

if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) {
throw new ServiceValidationError(
`Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.`
);
}

if (bodyTags && bodyTags.length > 0) {
for (const tag of bodyTags) {
await createWaitpointTag({
tag,
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
});
}
}

// Step 1: Create the waitpoint
const result = await engine.createManualWaitpoint({
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
idempotencyKey: body.idempotencyKey,
idempotencyKeyExpiresAt,
timeout,
tags: bodyTags,
});

// Step 2: Cache the mapping in Redis for fast lookup from .send()
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
await setInputStreamWaitpoint(
run.friendlyId,
body.streamId,
result.waitpoint.id,
ttlMs && ttlMs > 0 ? ttlMs : undefined
);

// Step 3: Check if data was already sent to this input stream (race condition handling).
// If .send() landed before .wait(), the data is in the S2 stream but no waitpoint
// existed to complete. We check from the client's last known position.
if (!result.isCached) {
try {
const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
run.realtimeStreamsVersion
);

const records = await realtimeStream.readRecords(
run.friendlyId,
`$trigger.input:${body.streamId}`,
body.lastSeqNum
);

if (records.length > 0) {
const record = records[0]!;

// Record data is the raw user payload — no wrapper to unwrap
await engine.completeWaitpoint({
id: result.waitpoint.id,
output: {
value: record.data,
type: "application/json",
isError: false,
},
});

// Clean up the Redis cache since we completed it ourselves
await deleteInputStreamWaitpoint(run.friendlyId, body.streamId);
}
} catch {
// Non-fatal: if the S2 check fails, the waitpoint is still PENDING.
// The next .send() will complete it via the Redis cache path.
}
}

return json<CreateInputStreamWaitpointResponseBody>({
waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id),
isCached: result.isCached,
});
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json({ error: error.message }, { status: 500 });
}

return json({ error: "Something went wrong" }, { status: 500 });
}
}
);

export { action, loader };
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async function responseHeaders(
const claims = {
sub: environment.id,
pub: true,
scopes: [`read:runs:${run.friendlyId}`],
scopes: [`read:runs:${run.friendlyId}`, `write:inputStreams:${run.friendlyId}`],
realtime,
};

Expand Down
Loading