Skip to content

Commit 33b1455

Browse files
committed
fix: add missing workflow fucntionality for queues
1 parent f45f5d5 commit 33b1455

File tree

14 files changed

+541
-64
lines changed

14 files changed

+541
-64
lines changed

examples/sandbox-vercel/src/actors/workflow/workflow-fixtures.ts

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/sandbox/src/actors/workflow/workflow-fixtures.ts

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import {
6363
uniqueVarActor,
6464
} from "./vars";
6565
import {
66+
workflowAccessActor,
6667
workflowCounterActor,
6768
workflowQueueActor,
6869
workflowSleepActor,
@@ -151,6 +152,7 @@ export const registry = setup({
151152
// From workflow.ts
152153
workflowCounterActor,
153154
workflowQueueActor,
155+
workflowAccessActor,
154156
workflowSleepActor,
155157
// From actor-db-raw.ts
156158
dbActorRaw,

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/workflow.ts

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { Loop } from "@rivetkit/workflow-engine";
22
import { actor } from "@/actor/mod";
3+
import { db } from "@/db/mod";
34
import { WORKFLOW_GUARD_KV_KEY } from "@/workflow/constants";
45
import { workflow, workflowQueueName } from "@/workflow/mod";
6+
import type { registry } from "./registry";
57

68
const WORKFLOW_QUEUE_NAME = "workflow-default";
79

@@ -55,19 +57,95 @@ export const workflowQueueActor = actor({
5557
name: "queue",
5658
run: async (loopCtx) => {
5759
const actorLoopCtx = loopCtx as any;
58-
const payload = await loopCtx.listen(
60+
const message = await loopCtx.listen(
5961
"queue-wait",
6062
WORKFLOW_QUEUE_NAME,
6163
);
6264
await loopCtx.step("store-message", async () => {
63-
actorLoopCtx.state.received.push(payload);
65+
actorLoopCtx.state.received.push(message.body);
66+
await message.complete({ echo: message.body });
6467
});
6568
return Loop.continue(undefined);
6669
},
6770
});
6871
}),
6972
actions: {
7073
getMessages: (c) => c.state.received,
74+
sendAndWait: async (c, payload: unknown) => {
75+
const client = c.client<typeof registry>();
76+
const handle = client.workflowQueueActor.getForId(c.actorId);
77+
return await handle.queue[workflowQueueName(WORKFLOW_QUEUE_NAME)].send(
78+
payload,
79+
{ wait: true, timeout: 1_000 },
80+
);
81+
},
82+
},
83+
});
84+
85+
export const workflowAccessActor = actor({
86+
db: db({
87+
onMigrate: async (rawDb) => {
88+
await rawDb.execute(`
89+
CREATE TABLE IF NOT EXISTS workflow_access_log (
90+
id INTEGER PRIMARY KEY AUTOINCREMENT,
91+
created_at INTEGER NOT NULL
92+
)
93+
`);
94+
},
95+
}),
96+
state: {
97+
outsideDbError: null as string | null,
98+
outsideClientError: null as string | null,
99+
insideDbCount: 0,
100+
insideClientAvailable: false,
101+
},
102+
run: workflow(async (ctx) => {
103+
await ctx.loop({
104+
name: "access",
105+
run: async (loopCtx) => {
106+
const actorLoopCtx = loopCtx as any;
107+
let outsideDbError: string | null = null;
108+
let outsideClientError: string | null = null;
109+
110+
try {
111+
// Accessing db outside a step should throw.
112+
// biome-ignore lint/style/noUnusedExpressions: intentionally checking accessor.
113+
actorLoopCtx.db;
114+
} catch (error) {
115+
outsideDbError =
116+
error instanceof Error ? error.message : String(error);
117+
}
118+
119+
try {
120+
actorLoopCtx.client<typeof registry>();
121+
} catch (error) {
122+
outsideClientError =
123+
error instanceof Error ? error.message : String(error);
124+
}
125+
126+
await loopCtx.step("access-step", async () => {
127+
await actorLoopCtx.db.execute(
128+
`INSERT INTO workflow_access_log (created_at) VALUES (${Date.now()})`,
129+
);
130+
const counts = (await actorLoopCtx.db.execute(
131+
`SELECT COUNT(*) as count FROM workflow_access_log`,
132+
)) as Array<{ count: number }>;
133+
const client = actorLoopCtx.client<typeof registry>();
134+
135+
actorLoopCtx.state.outsideDbError = outsideDbError;
136+
actorLoopCtx.state.outsideClientError = outsideClientError;
137+
actorLoopCtx.state.insideDbCount = counts[0]?.count ?? 0;
138+
actorLoopCtx.state.insideClientAvailable =
139+
typeof client.workflowQueueActor.getForId === "function";
140+
});
141+
142+
await loopCtx.sleep("idle", 25);
143+
return Loop.continue(undefined);
144+
},
145+
});
146+
}),
147+
actions: {
148+
getState: (c) => c.state,
71149
},
72150
});
73151

rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,10 @@ export class QueueManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
419419
}
420420

421421
/** Deletes messages matching the provided IDs. Returns the IDs that were removed. */
422-
async deleteMessagesById(ids: bigint[]): Promise<bigint[]> {
422+
async deleteMessagesById(
423+
ids: bigint[],
424+
options: { resolveWaiters?: boolean } = {},
425+
): Promise<bigint[]> {
423426
if (ids.length === 0) {
424427
return [];
425428
}
@@ -431,10 +434,20 @@ export class QueueManager<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
431434
if (toRemove.length === 0) {
432435
return [];
433436
}
434-
await this.#removeMessages(toRemove, { resolveWaiters: true });
437+
await this.#removeMessages(toRemove, {
438+
resolveWaiters: options.resolveWaiters ?? true,
439+
});
435440
return toRemove.map((entry) => entry.id);
436441
}
437442

443+
/** Completes a previously removed message by resolving its waiter, if one exists. */
444+
async completeById(messageId: bigint, response?: unknown): Promise<void> {
445+
this.#resolveCompletionWaiter(messageId, {
446+
status: "completed",
447+
response,
448+
});
449+
}
450+
438451
async #drainMessages(
439452
nameSet: Set<string>,
440453
count: number,

rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-workflow.ts

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,55 @@ export function runActorWorkflowTests(driverTestConfig: DriverTestConfig) {
2323

2424
test("consumes queue messages via workflow listen", async (c) => {
2525
const { client } = await setupDriverTest(c, driverTestConfig);
26-
const actor = client.workflowQueueActor.getOrCreate([
27-
"workflow-queue",
28-
]);
26+
const actor = client.workflowQueueActor.getOrCreate(["workflow-queue"]);
2927

30-
const queueHandle =
31-
actor.queue[workflowQueueName(WORKFLOW_QUEUE_NAME)];
28+
const queueHandle = actor.queue[workflowQueueName(WORKFLOW_QUEUE_NAME)];
3229
await queueHandle.send({ hello: "world" });
3330

3431
await waitFor(driverTestConfig, 200);
3532
const messages = await actor.getMessages();
3633
expect(messages).toEqual([{ hello: "world" }]);
3734
});
3835

39-
test("sleeps and resumes between ticks", async (c) => {
36+
test("workflow listen supports completing wait sends", async (c) => {
4037
const { client } = await setupDriverTest(c, driverTestConfig);
41-
const actor = client.workflowSleepActor.getOrCreate([
42-
"workflow-sleep",
38+
const actor = client.workflowQueueActor.getOrCreate([
39+
"workflow-queue-wait",
4340
]);
4441

42+
const result = await actor.sendAndWait({ value: 123 });
43+
expect(result).toEqual({
44+
status: "completed",
45+
response: { echo: { value: 123 } },
46+
});
47+
});
48+
49+
test("db and client are step-only in workflow context", async (c) => {
50+
const { client } = await setupDriverTest(c, driverTestConfig);
51+
const actor = client.workflowAccessActor.getOrCreate([
52+
"workflow-access",
53+
]);
54+
55+
let state = await actor.getState();
56+
for (let i = 0; i < 20 && state.insideDbCount === 0; i++) {
57+
await waitFor(driverTestConfig, 50);
58+
state = await actor.getState();
59+
}
60+
61+
expect(state.outsideDbError).toBe(
62+
"db is only available inside workflow steps",
63+
);
64+
expect(state.outsideClientError).toBe(
65+
"client is only available inside workflow steps",
66+
);
67+
expect(state.insideDbCount).toBeGreaterThan(0);
68+
expect(state.insideClientAvailable).toBe(true);
69+
});
70+
71+
test("sleeps and resumes between ticks", async (c) => {
72+
const { client } = await setupDriverTest(c, driverTestConfig);
73+
const actor = client.workflowSleepActor.getOrCreate(["workflow-sleep"]);
74+
4575
const initial = await actor.getState();
4676
await waitFor(driverTestConfig, 200);
4777
const next = await actor.getState();

rivetkit-typescript/packages/rivetkit/src/workflow/context.ts

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import type { RunContext } from "@/actor/contexts/run";
2-
import type { AnyDatabaseProvider } from "@/actor/database";
2+
import type { Client } from "@/client/client";
3+
import type { Registry } from "@/registry";
4+
import type { AnyDatabaseProvider, InferDatabaseClient } from "@/actor/database";
35
import type { WorkflowContextInterface } from "@rivetkit/workflow-engine";
46
import type {
57
BranchConfig,
@@ -8,6 +10,7 @@ import type {
810
LoopConfig,
911
LoopResult,
1012
StepConfig,
13+
WorkflowListenMessage,
1114
} from "@rivetkit/workflow-engine";
1215
import { WORKFLOW_GUARD_KV_KEY } from "./constants";
1316

@@ -42,27 +45,27 @@ export class ActorWorkflowContext<
4245
return this.#inner.abortSignal;
4346
}
4447

45-
async step<T>(
46-
nameOrConfig: string | Parameters<WorkflowContextInterface["step"]>[0],
47-
run?: () => Promise<T>,
48-
): Promise<T> {
48+
async step<T>(
49+
nameOrConfig: string | Parameters<WorkflowContextInterface["step"]>[0],
50+
run?: () => Promise<T>,
51+
): Promise<T> {
4952
if (typeof nameOrConfig === "string") {
5053
if (!run) {
5154
throw new Error("Step run function missing");
5255
}
53-
return await this.#wrapActive(() =>
54-
this.#inner.step(nameOrConfig, () =>
55-
this.#withActorAccess(run),
56-
),
57-
);
58-
}
56+
return await this.#wrapActive(() =>
57+
this.#inner.step(nameOrConfig, () =>
58+
this.#withActorAccess(run),
59+
),
60+
);
61+
}
5962
const stepConfig = nameOrConfig as StepConfig<T>;
6063
const config: StepConfig<T> = {
6164
...stepConfig,
6265
run: () => this.#withActorAccess(stepConfig.run),
6366
};
6467
return await this.#wrapActive(() => this.#inner.step(config));
65-
}
68+
}
6669

6770
async loop<T>(
6871
name: string,
@@ -103,7 +106,10 @@ export class ActorWorkflowContext<
103106
return this.#inner.sleepUntil(name, timestampMs);
104107
}
105108

106-
listen<T>(name: string, messageName: string): Promise<T> {
109+
listen<T>(
110+
name: string,
111+
messageName: string | string[],
112+
): Promise<WorkflowListenMessage<T>> {
107113
return this.#inner.listen(name, messageName);
108114
}
109115

@@ -212,6 +218,18 @@ export class ActorWorkflowContext<
212218
return this.#runCtx.vars as TVars extends never ? never : TVars;
213219
}
214220

221+
client<R extends Registry<any>>(): Client<R> {
222+
this.#ensureActorAccess("client");
223+
return this.#runCtx.client<R>();
224+
}
225+
226+
get db(): TDatabase extends never ? never : InferDatabaseClient<TDatabase> {
227+
this.#ensureActorAccess("db");
228+
return this.#runCtx.db as TDatabase extends never
229+
? never
230+
: InferDatabaseClient<TDatabase>;
231+
}
232+
215233
get log() {
216234
return this.#runCtx.log;
217235
}

0 commit comments

Comments
 (0)