Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { actor } from "rivetkit";
import type { registry } from "./registry";

export const queueActor = actor({
state: {},
actions: {
receiveOne: async (
c,
name: string,
opts?: { count?: number; timeout?: number },
) => {
const message = await c.queue.next(name, opts);
if (!message) {
return null;
}
return { name: message.name, body: message.body };
},
receiveMany: async (
c,
names: string[],
opts?: { count?: number; timeout?: number },
) => {
const messages = await c.queue.next(names, opts);
return (messages ?? []).map(
(message: { name: string; body: unknown }) => ({
name: message.name,
body: message.body,
}),
);
},
receiveRequest: async (
c,
request: {
name: string | string[];
count?: number;
timeout?: number;
},
) => {
const messages = await c.queue.next(request);
return (messages ?? []).map(
(message: { name: string; body: unknown }) => ({
name: message.name,
body: message.body,
}),
);
},
sendToSelf: async (c, name: string, body: unknown) => {
const client = c.client<typeof registry>();
const handle = client.queueActor.getForId(c.actorId);
await handle.queue[name].send(body);
return true;
},
waitForAbort: async (c) => {
setTimeout(() => {
c.destroy();
}, 10);
await c.queue.next("abort", { timeout: 10_000 });
return true;
},
},
});

export const queueLimitedActor = actor({
state: {},
actions: {},
options: {
maxQueueSize: 1,
maxQueueMessageSize: 64,
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { kvActor } from "./kv";
import { largePayloadActor, largePayloadConnActor } from "./large-payloads";
import { counterWithLifecycle } from "./lifecycle";
import { metadataActor } from "./metadata";
import { queueActor, queueLimitedActor } from "./queue";
import {
rawHttpActor,
rawHttpHonoActor,
Expand Down Expand Up @@ -75,6 +76,9 @@ export const registry = setup({
inlineClientActor,
// From kv.ts
kvActor,
// From queue.ts
queueActor,
queueLimitedActor,
// From action-inputs.ts
inputActor,
// From action-timeout.ts
Expand Down
2 changes: 1 addition & 1 deletion rivetkit-typescript/packages/rivetkit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
],
"scripts": {
"build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts",
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts",
"build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v3.bare -o dist/schemas/client-protocol/v3.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v4.bare -o dist/schemas/actor-persist/v4.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v2.bare -o dist/schemas/actor-inspector/v2.ts",
"check-types": "tsc --noEmit",
"lint": "biome check .",
"lint:fix": "biome check --write .",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ToServerBody union {
PatchStateRequest |
StateRequest |
ConnectionsRequest |
ActionRequest |
ActionRequest |
EventsRequest |
ClearEventsRequest |
RpcsListRequest
Expand Down Expand Up @@ -159,4 +159,4 @@ type ToClientBody union {

type ToClient struct {
body: ToClientBody
}
}
168 changes: 168 additions & 0 deletions rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v2.bare
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# MARK: Message To Server

type PatchStateRequest struct {
state: data
}

type ActionRequest struct {
id: uint
name: str
args: data
}

type StateRequest struct {
id: uint
}

type ConnectionsRequest struct {
id: uint
}

type EventsRequest struct {
id: uint
}

type ClearEventsRequest struct {
id: uint
}

type RpcsListRequest struct {
id: uint
}

type ToServerBody union {
PatchStateRequest |
StateRequest |
ConnectionsRequest |
ActionRequest |
EventsRequest |
ClearEventsRequest |
RpcsListRequest
}

type ToServer struct {
body: ToServerBody
}

# MARK: Message To Client

type State data

type Connection struct {
id: str
details: data
}

type ActionEvent struct {
name: str
args: data
connId: str
}

type BroadcastEvent struct {
eventName: str
args: data
}

type SubscribeEvent struct {
eventName: str
connId: str
}

type UnSubscribeEvent struct {
eventName: str
connId: str
}

type FiredEvent struct {
eventName: str
args: data
connId: str
}

type EventBody union {
ActionEvent |
BroadcastEvent |
SubscribeEvent |
UnSubscribeEvent |
FiredEvent
}

type Event struct {
id: str
timestamp: uint
body: EventBody
}

type Init struct {
connections: list<Connection>
events: list<Event>
state: optional<State>
isStateEnabled: bool
rpcs: list<str>
isDatabaseEnabled: bool
queueSize: uint
}

type ConnectionsResponse struct {
rid: uint
connections: list<Connection>
}

type StateResponse struct {
rid: uint
state: optional<State>
isStateEnabled: bool
}

type EventsResponse struct {
rid: uint
events: list<Event>
}

type ActionResponse struct {
rid: uint
output: data
}

type StateUpdated struct {
state: State
}

type EventsUpdated struct {
events: list<Event>
}

type QueueUpdated struct {
queueSize: uint
}

type RpcsListResponse struct {
rid: uint
rpcs: list<str>
}

type ConnectionsUpdated struct {
connections: list<Connection>
}
type Error struct {
message: str
}

type ToClientBody union {
StateResponse |
ConnectionsResponse |
EventsResponse |
ActionResponse |
ConnectionsUpdated |
EventsUpdated |
QueueUpdated |
StateUpdated |
RpcsListResponse |
Error |
Init
}

type ToClient struct {
body: ToClientBody
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
type GatewayId data[4]
type RequestId data[4]
type MessageIndex u16

type Cbor data

# MARK: Connection
type Subscription struct {
eventName: str
}

# Connection associated with hibernatable WebSocket that should persist across lifecycles.
type Conn struct {
# Connection ID generated by RivetKit
id: str
parameters: Cbor
state: Cbor
subscriptions: list<Subscription>

gatewayId: GatewayId
requestId: RequestId
serverMessageIndex: u16
clientMessageIndex: u16

requestPath: str
requestHeaders: map<str><str>
}

# MARK: Schedule Event
type ScheduleEvent struct {
eventId: str
timestamp: i64
action: str
args: optional<Cbor>
}

# MARK: Actor
type Actor struct {
# Input data passed to the actor on initialization
input: optional<Cbor>
hasInitialized: bool
state: Cbor
scheduledEvents: list<ScheduleEvent>
}

# MARK: Queue
type QueueMetadata struct {
nextId: u64
size: u32
}

type QueueMessage struct {
name: str
body: Cbor
createdAt: i64
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,3 @@ type HttpResolveRequest void
type HttpResolveResponse struct {
actorId: str
}

Loading
Loading