Skip to content

Commit 20d84dd

Browse files
committed
feat: actor messages & queues
1 parent 6ce365e commit 20d84dd

File tree

25 files changed

+1139
-40
lines changed

25 files changed

+1139
-40
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { actor } from "rivetkit";
2+
import type { registry } from "./registry";
3+
4+
export const queueActor = actor({
5+
state: {},
6+
actions: {
7+
receiveOne: async (
8+
c,
9+
name: string,
10+
opts?: { count?: number; timeout?: number },
11+
) => {
12+
const message = await c.queue.next(name, opts);
13+
if (!message) {
14+
return null;
15+
}
16+
return { name: message.name, body: message.body };
17+
},
18+
receiveMany: async (
19+
c,
20+
names: string[],
21+
opts?: { count?: number; timeout?: number },
22+
) => {
23+
const messages = await c.queue.next(names, opts);
24+
return (messages ?? []).map(
25+
(message: { name: string; body: unknown }) => ({
26+
name: message.name,
27+
body: message.body,
28+
}),
29+
);
30+
},
31+
receiveRequest: async (
32+
c,
33+
request: {
34+
name: string | string[];
35+
count?: number;
36+
timeout?: number;
37+
},
38+
) => {
39+
const messages = await c.queue.next(request);
40+
return (messages ?? []).map(
41+
(message: { name: string; body: unknown }) => ({
42+
name: message.name,
43+
body: message.body,
44+
}),
45+
);
46+
},
47+
sendToSelf: async (c, name: string, body: unknown) => {
48+
const client = c.client<typeof registry>();
49+
const handle = client.queueActor.getForId(c.actorId);
50+
await handle.queue[name].send(body);
51+
return true;
52+
},
53+
waitForAbort: async (c) => {
54+
setTimeout(() => {
55+
c.destroy();
56+
}, 10);
57+
await c.queue.next("abort", { timeout: 10_000 });
58+
return true;
59+
},
60+
},
61+
});
62+
63+
export const queueLimitedActor = actor({
64+
state: {},
65+
actions: {},
66+
options: {
67+
maxQueueSize: 1,
68+
maxQueueMessageSize: 64,
69+
},
70+
});

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { kvActor } from "./kv";
2626
import { largePayloadActor, largePayloadConnActor } from "./large-payloads";
2727
import { counterWithLifecycle } from "./lifecycle";
2828
import { metadataActor } from "./metadata";
29+
import { queueActor, queueLimitedActor } from "./queue";
2930
import {
3031
rawHttpActor,
3132
rawHttpHonoActor,
@@ -75,6 +76,9 @@ export const registry = setup({
7576
inlineClientActor,
7677
// From kv.ts
7778
kvActor,
79+
// From queue.ts
80+
queueActor,
81+
queueLimitedActor,
7882
// From action-inputs.ts
7983
inputActor,
8084
// From action-timeout.ts

rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v1.bare

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ type Init struct {
101101
isStateEnabled: bool
102102
rpcs: list<str>
103103
isDatabaseEnabled: bool
104+
queueSize: uint
104105
}
105106

106107
type ConnectionsResponse struct {
@@ -132,6 +133,10 @@ type EventsUpdated struct {
132133
events: list<Event>
133134
}
134135

136+
type QueueUpdated struct {
137+
queueSize: uint
138+
}
139+
135140
type RpcsListResponse struct {
136141
rid: uint
137142
rpcs: list<str>
@@ -151,6 +156,7 @@ type ToClientBody union {
151156
ActionResponse |
152157
ConnectionsUpdated |
153158
EventsUpdated |
159+
QueueUpdated |
154160
StateUpdated |
155161
RpcsListResponse |
156162
Error |

rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v1.bare

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,17 @@ type HttpActionResponse struct {
6767
output: data
6868
}
6969

70+
# MARK: HTTP Queue
71+
72+
type HttpQueueSendRequest struct {
73+
name: str
74+
body: data
75+
}
76+
77+
type HttpQueueSendResponse struct {
78+
ok: bool
79+
}
80+
7081
# MARK: HTTP Error
7182
type HttpResponseError struct {
7283
group: str

rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v2.bare

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,17 @@ type HttpActionResponse struct {
6666
output: data
6767
}
6868

69+
# MARK: HTTP Queue
70+
71+
type HttpQueueSendRequest struct {
72+
name: str
73+
body: data
74+
}
75+
76+
type HttpQueueSendResponse struct {
77+
ok: bool
78+
}
79+
6980
# MARK: HTTP Error
7081
type HttpResponseError struct {
7182
group: str

rivetkit-typescript/packages/rivetkit/src/actor/config.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ export const ActorConfigSchema = z
8383
connectionLivenessInterval: z.number().positive().default(5000),
8484
noSleep: z.boolean().default(false),
8585
sleepTimeout: z.number().positive().default(30_000),
86+
maxQueueSize: z.number().positive().default(1000),
87+
maxQueueMessageSize: z
88+
.number()
89+
.positive()
90+
.default(1024 * 1024),
8691
/**
8792
* Can hibernate WebSockets for onWebSocket.
8893
*
@@ -689,6 +694,18 @@ export const DocActorOptionsSchema = z
689694
.describe(
690695
"Time in ms of inactivity before the actor sleeps. Default: 30000",
691696
),
697+
maxQueueSize: z
698+
.number()
699+
.optional()
700+
.describe(
701+
"Maximum number of queue messages before rejecting new messages. Default: 1000",
702+
),
703+
maxQueueMessageSize: z
704+
.number()
705+
.optional()
706+
.describe(
707+
"Maximum size of each queue message in bytes. Default: 1048576",
708+
),
692709
canHibernateWebSocket: z
693710
.boolean()
694711
.optional()

rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type { ActorDefinition, AnyActorDefinition } from "../../definition";
88
import type { ActorInstance, SaveStateOptions } from "../../instance/mod";
99
import { ActorKv } from "../../instance/kv";
1010
import type { Schedule } from "../../schedule";
11+
import type { QueueContext } from "../queue";
1112

1213
/**
1314
* ActorContext class that provides access to actor methods and state
@@ -91,6 +92,20 @@ export class ActorContext<
9192
return this.#actor.log;
9293
}
9394

95+
/**
96+
* Access to queue receive helpers.
97+
*/
98+
get queue(): QueueContext<
99+
TState,
100+
TConnParams,
101+
TConnState,
102+
TVars,
103+
TInput,
104+
TDatabase
105+
> {
106+
return this.#actor.queue;
107+
}
108+
94109
/**
95110
* Gets actor ID.
96111
*/

rivetkit-typescript/packages/rivetkit/src/actor/contexts/index.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
// Base contexts
2-
export { ActorContext, type ActorContextOf } from "./base/actor";
3-
export { ConnContext, type ConnContextOf } from "./base/conn";
4-
export { ConnInitContext, type ConnInitContextOf } from "./base/conn-init";
52

63
// Lifecycle contexts
74
export { ActionContext, type ActionContextOf } from "./action";
5+
export { ActorContext, type ActorContextOf } from "./base/actor";
6+
export { ConnContext, type ConnContextOf } from "./base/conn";
7+
export { ConnInitContext, type ConnInitContextOf } from "./base/conn-init";
88
export {
99
BeforeActionResponseContext,
1010
type BeforeActionResponseContextOf,
@@ -22,6 +22,7 @@ export {
2222
export { CreateVarsContext, type CreateVarsContextOf } from "./create-vars";
2323
export { DestroyContext, type DestroyContextOf } from "./destroy";
2424
export { DisconnectContext, type DisconnectContextOf } from "./disconnect";
25+
export { QueueContext, type QueueContextOf } from "./queue";
2526
export { RequestContext, type RequestContextOf } from "./request";
2627
export { SleepContext, type SleepContextOf } from "./sleep";
2728
export {
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import type { AnyDatabaseProvider } from "../database";
2+
import type { ActorDefinition, AnyActorDefinition } from "../definition";
3+
import type { ActorInstance } from "../instance/mod";
4+
import type { QueueMessage } from "../instance/queue-manager";
5+
6+
export interface QueueReceiveOptions {
7+
count?: number;
8+
timeout?: number;
9+
}
10+
11+
export interface QueueReceiveRequest extends QueueReceiveOptions {
12+
name: string | string[];
13+
}
14+
15+
export class QueueContext<
16+
TState,
17+
TConnParams,
18+
TConnState,
19+
TVars,
20+
TInput,
21+
TDatabase extends AnyDatabaseProvider,
22+
> {
23+
#actor: ActorInstance<
24+
TState,
25+
TConnParams,
26+
TConnState,
27+
TVars,
28+
TInput,
29+
TDatabase
30+
>;
31+
32+
constructor(
33+
actor: ActorInstance<
34+
TState,
35+
TConnParams,
36+
TConnState,
37+
TVars,
38+
TInput,
39+
TDatabase
40+
>,
41+
) {
42+
this.#actor = actor;
43+
}
44+
45+
next(
46+
name: string,
47+
opts?: QueueReceiveOptions,
48+
): Promise<QueueMessage | undefined>;
49+
next(
50+
name: string[],
51+
opts?: QueueReceiveOptions,
52+
): Promise<QueueMessage[] | undefined>;
53+
next(request: QueueReceiveRequest): Promise<QueueMessage[] | undefined>;
54+
async next(
55+
nameOrRequest: string | string[] | QueueReceiveRequest,
56+
opts: QueueReceiveOptions = {},
57+
): Promise<QueueMessage | QueueMessage[] | undefined> {
58+
const request =
59+
typeof nameOrRequest === "object" && !Array.isArray(nameOrRequest)
60+
? nameOrRequest
61+
: { name: nameOrRequest };
62+
const mergedOptions = request === nameOrRequest ? request : opts;
63+
const names = Array.isArray(request.name)
64+
? request.name
65+
: [request.name];
66+
const count = mergedOptions.count ?? 1;
67+
68+
const messages = await this.#actor.queueManager.receive(
69+
names,
70+
count,
71+
mergedOptions.timeout,
72+
this.#actor.abortSignal,
73+
);
74+
75+
if (Array.isArray(request.name)) {
76+
return messages;
77+
}
78+
79+
if (!messages || messages.length === 0) {
80+
return undefined;
81+
}
82+
83+
return messages[0];
84+
}
85+
}
86+
87+
export type QueueContextOf<AD extends AnyActorDefinition> =
88+
AD extends ActorDefinition<
89+
infer S,
90+
infer CP,
91+
infer CS,
92+
infer V,
93+
infer I,
94+
infer DB extends AnyDatabaseProvider,
95+
any
96+
>
97+
? QueueContext<S, CP, CS, V, I, DB>
98+
: never;

rivetkit-typescript/packages/rivetkit/src/actor/errors.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,45 @@ export class Unsupported extends ActorError {
188188
}
189189
}
190190

191+
export class QueueFull extends ActorError {
192+
constructor(limit: number) {
193+
super("queue", "full", `Queue is full. Limit is ${limit} messages.`, {
194+
public: true,
195+
metadata: { limit },
196+
});
197+
}
198+
}
199+
200+
export class QueueMessageTooLarge extends ActorError {
201+
constructor(size: number, limit: number) {
202+
super(
203+
"queue",
204+
"message_too_large",
205+
`Queue message too large (${size} bytes). Limit is ${limit} bytes.`,
206+
{ public: true, metadata: { size, limit } },
207+
);
208+
}
209+
}
210+
211+
export class QueueMessageInvalid extends ActorError {
212+
constructor(path?: string) {
213+
super(
214+
"queue",
215+
"message_invalid",
216+
path
217+
? `Queue message body contains unsupported type at ${path}.`
218+
: "Queue message body contains unsupported type.",
219+
{ public: true, metadata: path ? { path } : undefined },
220+
);
221+
}
222+
}
223+
224+
export class ActorAborted extends ActorError {
225+
constructor() {
226+
super("actor", "aborted", "Actor aborted.", { public: true });
227+
}
228+
}
229+
191230
/**
192231
* Options for the UserError class.
193232
*/

0 commit comments

Comments
 (0)