Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/consistent-stream-targets.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Aligned the SDK's `getRunIdForOptions` logic with the Core package to handle semantic targets (`root`, `parent`) in root tasks.
32 changes: 20 additions & 12 deletions apps/webapp/app/utils/throttle.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
//From: https://kettanaito.com/blog/debounce-vs-throttle

/** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */
/** A throttle that fires the first call immediately and ensures the last call during the duration is also fired. */
export function throttle(
func: (...args: any[]) => void,
durationMs: number
): (...args: any[]) => void {
let isPrimedToFire = false;

return (...args: any[]) => {
if (!isPrimedToFire) {
isPrimedToFire = true;
let timeoutId: NodeJS.Timeout | null = null;
let nextArgs: any[] | null = null;

setTimeout(() => {
func(...args);
isPrimedToFire = false;
}, durationMs);
const wrapped = (...args: any[]) => {
if (timeoutId) {
nextArgs = args;
return;
}

func(...args);

timeoutId = setTimeout(() => {
timeoutId = null;
if (nextArgs) {
const argsToUse = nextArgs;
nextArgs = null;
wrapped(...argsToUse);
}
}, durationMs);
};

return wrapped;
}
64 changes: 64 additions & 0 deletions packages/trigger-sdk/src/v3/streams.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { streams } from "./streams.js";
import { taskContext, realtimeStreams } from "@trigger.dev/core/v3";

vi.mock("@trigger.dev/core/v3", async (importOriginal) => {
const original = await importOriginal<typeof import("@trigger.dev/core/v3")>();
return {
...original,
taskContext: {
ctx: {
run: {
id: "run_123",
// parentTaskRunId and rootTaskRunId are undefined for root tasks
},
},
},
realtimeStreams: {
pipe: vi.fn().mockReturnValue({
wait: () => Promise.resolve(),
stream: new ReadableStream(),
}),
},
};
});

describe("streams.pipe consistency", () => {
beforeEach(() => {
vi.clearAllMocks();
});

it("should not throw and should use self runId when target is 'root' in a root task", async () => {
const mockStream = new ReadableStream();

// This should not throw anymore
const { waitUntilComplete } = streams.pipe("test-key", mockStream, {
target: "root",
});

expect(realtimeStreams.pipe).toHaveBeenCalledWith(
"test-key",
mockStream,
expect.objectContaining({
target: "run_123",
})
);
});

it("should not throw and should use self runId when target is 'parent' in a root task", async () => {
const mockStream = new ReadableStream();

// This should not throw anymore
const { waitUntilComplete } = streams.pipe("test-key", mockStream, {
target: "parent",
});

expect(realtimeStreams.pipe).toHaveBeenCalledWith(
"test-key",
mockStream,
expect.objectContaining({
target: "run_123",
})
);
});
});
4 changes: 2 additions & 2 deletions packages/trigger-sdk/src/v3/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,11 @@ export const streams = {
function getRunIdForOptions(options?: RealtimeStreamOperationOptions): string | undefined {
if (options?.target) {
if (options.target === "parent") {
return taskContext.ctx?.run?.parentTaskRunId;
return taskContext.ctx?.run?.parentTaskRunId ?? taskContext.ctx?.run?.id;
}

if (options.target === "root") {
return taskContext.ctx?.run?.rootTaskRunId;
return taskContext.ctx?.run?.rootTaskRunId ?? taskContext.ctx?.run?.id;
}

if (options.target === "self") {
Expand Down