Skip to content

Commit bde0b0f

Browse files
committed
feat: actor messages & queues
1 parent b9334b3 commit bde0b0f

File tree

32 files changed

+1658
-73
lines changed

32 files changed

+1658
-73
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { actor } from "rivetkit";
2+
import type { registry } from "./registry";
3+
4+
export const queueActor = actor({
5+
state: {},
6+
actions: {
7+
receiveOne: async (
8+
c,
9+
name: string,
10+
opts?: { count?: number; timeout?: number },
11+
) => {
12+
const message = await c.queue.next(name, opts);
13+
if (!message) {
14+
return null;
15+
}
16+
return { name: message.name, body: message.body };
17+
},
18+
receiveMany: async (
19+
c,
20+
names: string[],
21+
opts?: { count?: number; timeout?: number },
22+
) => {
23+
const messages = await c.queue.next(names, opts);
24+
return (messages ?? []).map(
25+
(message: { name: string; body: unknown }) => ({
26+
name: message.name,
27+
body: message.body,
28+
}),
29+
);
30+
},
31+
receiveRequest: async (
32+
c,
33+
request: {
34+
name: string | string[];
35+
count?: number;
36+
timeout?: number;
37+
},
38+
) => {
39+
const messages = await c.queue.next(request);
40+
return (messages ?? []).map(
41+
(message: { name: string; body: unknown }) => ({
42+
name: message.name,
43+
body: message.body,
44+
}),
45+
);
46+
},
47+
sendToSelf: async (c, name: string, body: unknown) => {
48+
const client = c.client<typeof registry>();
49+
const handle = client.queueActor.getForId(c.actorId);
50+
await handle.queue[name].send(body);
51+
return true;
52+
},
53+
waitForAbort: async (c) => {
54+
setTimeout(() => {
55+
c.destroy();
56+
}, 10);
57+
await c.queue.next("abort", { timeout: 10_000 });
58+
return true;
59+
},
60+
},
61+
});
62+
63+
export const queueLimitedActor = actor({
64+
state: {},
65+
actions: {},
66+
options: {
67+
maxQueueSize: 1,
68+
maxQueueMessageSize: 64,
69+
},
70+
});

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { kvActor } from "./kv";
2626
import { largePayloadActor, largePayloadConnActor } from "./large-payloads";
2727
import { counterWithLifecycle } from "./lifecycle";
2828
import { metadataActor } from "./metadata";
29+
import { queueActor, queueLimitedActor } from "./queue";
2930
import {
3031
rawHttpActor,
3132
rawHttpHonoActor,
@@ -76,6 +77,9 @@ export const registry = setup({
7677
inlineClientActor,
7778
// From kv.ts
7879
kvActor,
80+
// From queue.ts
81+
queueActor,
82+
queueLimitedActor,
7983
// From action-inputs.ts
8084
inputActor,
8185
// From action-timeout.ts

rivetkit-typescript/packages/rivetkit/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@
163163
],
164164
"scripts": {
165165
"build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/serve-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts",
166-
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts",
166+
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v3.bare -o dist/schemas/client-protocol/v3.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v4.bare -o dist/schemas/actor-persist/v4.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v2.bare -o dist/schemas/actor-inspector/v2.ts",
167167
"check-types": "tsc --noEmit",
168168
"lint": "biome check .",
169169
"lint:fix": "biome check --write .",

rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v1.bare

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type ToServerBody union {
3434
PatchStateRequest |
3535
StateRequest |
3636
ConnectionsRequest |
37-
ActionRequest |
37+
ActionRequest |
3838
EventsRequest |
3939
ClearEventsRequest |
4040
RpcsListRequest
@@ -159,4 +159,4 @@ type ToClientBody union {
159159

160160
type ToClient struct {
161161
body: ToClientBody
162-
}
162+
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# MARK: Message To Server
2+
3+
type PatchStateRequest struct {
4+
state: data
5+
}
6+
7+
type ActionRequest struct {
8+
id: uint
9+
name: str
10+
args: data
11+
}
12+
13+
type StateRequest struct {
14+
id: uint
15+
}
16+
17+
type ConnectionsRequest struct {
18+
id: uint
19+
}
20+
21+
type EventsRequest struct {
22+
id: uint
23+
}
24+
25+
type ClearEventsRequest struct {
26+
id: uint
27+
}
28+
29+
type RpcsListRequest struct {
30+
id: uint
31+
}
32+
33+
type ToServerBody union {
34+
PatchStateRequest |
35+
StateRequest |
36+
ConnectionsRequest |
37+
ActionRequest |
38+
EventsRequest |
39+
ClearEventsRequest |
40+
RpcsListRequest
41+
}
42+
43+
type ToServer struct {
44+
body: ToServerBody
45+
}
46+
47+
# MARK: Message To Client
48+
49+
type State data
50+
51+
type Connection struct {
52+
id: str
53+
details: data
54+
}
55+
56+
type ActionEvent struct {
57+
name: str
58+
args: data
59+
connId: str
60+
}
61+
62+
type BroadcastEvent struct {
63+
eventName: str
64+
args: data
65+
}
66+
67+
type SubscribeEvent struct {
68+
eventName: str
69+
connId: str
70+
}
71+
72+
type UnSubscribeEvent struct {
73+
eventName: str
74+
connId: str
75+
}
76+
77+
type FiredEvent struct {
78+
eventName: str
79+
args: data
80+
connId: str
81+
}
82+
83+
type EventBody union {
84+
ActionEvent |
85+
BroadcastEvent |
86+
SubscribeEvent |
87+
UnSubscribeEvent |
88+
FiredEvent
89+
}
90+
91+
type Event struct {
92+
id: str
93+
timestamp: uint
94+
body: EventBody
95+
}
96+
97+
type Init struct {
98+
connections: list<Connection>
99+
events: list<Event>
100+
state: optional<State>
101+
isStateEnabled: bool
102+
rpcs: list<str>
103+
isDatabaseEnabled: bool
104+
queueSize: uint
105+
}
106+
107+
type ConnectionsResponse struct {
108+
rid: uint
109+
connections: list<Connection>
110+
}
111+
112+
type StateResponse struct {
113+
rid: uint
114+
state: optional<State>
115+
isStateEnabled: bool
116+
}
117+
118+
type EventsResponse struct {
119+
rid: uint
120+
events: list<Event>
121+
}
122+
123+
type ActionResponse struct {
124+
rid: uint
125+
output: data
126+
}
127+
128+
type StateUpdated struct {
129+
state: State
130+
}
131+
132+
type EventsUpdated struct {
133+
events: list<Event>
134+
}
135+
136+
type QueueUpdated struct {
137+
queueSize: uint
138+
}
139+
140+
type RpcsListResponse struct {
141+
rid: uint
142+
rpcs: list<str>
143+
}
144+
145+
type ConnectionsUpdated struct {
146+
connections: list<Connection>
147+
}
148+
type Error struct {
149+
message: str
150+
}
151+
152+
type ToClientBody union {
153+
StateResponse |
154+
ConnectionsResponse |
155+
EventsResponse |
156+
ActionResponse |
157+
ConnectionsUpdated |
158+
EventsUpdated |
159+
QueueUpdated |
160+
StateUpdated |
161+
RpcsListResponse |
162+
Error |
163+
Init
164+
}
165+
166+
type ToClient struct {
167+
body: ToClientBody
168+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
type GatewayId data[4]
2+
type RequestId data[4]
3+
type MessageIndex u16
4+
5+
type Cbor data
6+
7+
# MARK: Connection
8+
type Subscription struct {
9+
eventName: str
10+
}
11+
12+
# Connection associated with hibernatable WebSocket that should persist across lifecycles.
13+
type Conn struct {
14+
# Connection ID generated by RivetKit
15+
id: str
16+
parameters: Cbor
17+
state: Cbor
18+
subscriptions: list<Subscription>
19+
20+
gatewayId: GatewayId
21+
requestId: RequestId
22+
serverMessageIndex: u16
23+
clientMessageIndex: u16
24+
25+
requestPath: str
26+
requestHeaders: map<str><str>
27+
}
28+
29+
# MARK: Schedule Event
30+
type ScheduleEvent struct {
31+
eventId: str
32+
timestamp: i64
33+
action: str
34+
args: optional<Cbor>
35+
}
36+
37+
# MARK: Actor
38+
type Actor struct {
39+
# Input data passed to the actor on initialization
40+
input: optional<Cbor>
41+
hasInitialized: bool
42+
state: Cbor
43+
scheduledEvents: list<ScheduleEvent>
44+
}
45+
46+
# MARK: Queue
47+
type QueueMetadata struct {
48+
nextId: u64
49+
size: u32
50+
}
51+
52+
type QueueMessage struct {
53+
name: str
54+
body: Cbor
55+
createdAt: i64
56+
}

rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v1.bare

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,3 @@ type HttpResolveRequest void
8181
type HttpResolveResponse struct {
8282
actorId: str
8383
}
84-

0 commit comments

Comments
 (0)