Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/wicked-ads-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/react-hooks": patch
"@trigger.dev/core": patch
---

Fixes an issue with realtime when re-subscribing to a run, that would temporarily display stale data and the changes. Now when re-subscribing to a run only the latest changes will be vended
19 changes: 17 additions & 2 deletions packages/core/src/v3/apiClient/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
},
});

let updatedKeys = new Set<string>();

// Create the transformed stream that processes messages and emits complete rows
this.#changeStream = createAsyncIterableStream(source, {
transform: (messages, controller) => {
Expand All @@ -122,9 +124,13 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
}

try {
const updatedKeys = new Set<string>();
let isUpToDate = false;

console.log(`Processing ${messages.length} messages`);

for (const message of messages) {
console.log("shape message", message);

if (isChangeMessage(message)) {
const key = message.key;
switch (message.headers.operation) {
Expand All @@ -147,18 +153,27 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
if (message.headers.control === "must-refetch") {
this.#currentState.clear();
this.#error = false;
} else if (message.headers.control === "up-to-date") {
console.log("Setting isUpToDate to true");
isUpToDate = true;
}
}
}

// Now enqueue only one updated row per key, after all messages have been processed.
if (!this.#isStreamClosed) {
// If the stream is not up to date, we don't want to enqueue any rows.
if (!this.#isStreamClosed && isUpToDate) {
for (const key of updatedKeys) {
const finalRow = this.#currentState.get(key);
if (finalRow) {
console.log("enqueueing finalRow", finalRow);
controller.enqueue(finalRow);
}
}

updatedKeys.clear();
} else {
console.log("Not enqueuing any rows because the stream is not up to date");
}
} catch (error) {
console.error("Error processing stream messages:", error);
Expand Down
27 changes: 27 additions & 0 deletions references/hello-world/src/trigger/realtime.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { logger, runs, task } from "@trigger.dev/sdk";
import { helloWorldTask } from "./example.js";
import { setTimeout } from "timers/promises";

export const realtimeByTagsTask = task({
id: "realtime-by-tags",
Expand Down Expand Up @@ -32,3 +33,29 @@ export const realtimeByTagsTask = task({
};
},
});

export const realtimeUpToDateTask = task({
id: "realtime-up-to-date",
run: async ({ runId }: { runId?: string }) => {
if (!runId) {
const handle = await helloWorldTask.trigger(
{ hello: "world" },
{
tags: ["hello-world", "realtime"],
}
);

runId = handle.id;
}

logger.info("runId", { runId });

for await (const run of runs.subscribeToRun(runId, { stopOnCompletion: true })) {
logger.info("run", { run });
}

return {
message: "Hello, world!",
};
},
});
Loading