Skip to content

Commit 4b0cbc8

Browse files
committed
feat: kv api
1 parent 825c812 commit 4b0cbc8

File tree

14 files changed

+354
-21
lines changed

14 files changed

+354
-21
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { actor, type ActorContext } from "rivetkit";
2+
3+
const textEncoder = new TextEncoder();
4+
const textDecoder = new TextDecoder();
5+
6+
export const kvActor = actor({
7+
actions: {
8+
putText: async (
9+
c: ActorContext<any, any, any, any, any, any>,
10+
key: string,
11+
value: string,
12+
) => {
13+
await c.kv.put(textEncoder.encode(key), value, { type: "text" });
14+
return true;
15+
},
16+
getText: async (
17+
c: ActorContext<any, any, any, any, any, any>,
18+
key: string,
19+
) => {
20+
return await c.kv.get(textEncoder.encode(key), { type: "text" });
21+
},
22+
listText: async (
23+
c: ActorContext<any, any, any, any, any, any>,
24+
prefix: string,
25+
) => {
26+
const results = await c.kv.list(textEncoder.encode(prefix), {
27+
type: "text",
28+
});
29+
return results.map(([key, value]) => {
30+
if (typeof value !== "string") {
31+
throw new TypeError("Expected text value");
32+
}
33+
return {
34+
key: textDecoder.decode(key),
35+
value,
36+
};
37+
});
38+
},
39+
roundtripArrayBuffer: async (
40+
c: ActorContext<any, any, any, any, any, any>,
41+
key: string,
42+
values: number[],
43+
) => {
44+
const buffer = new Uint8Array(values).buffer;
45+
await c.kv.put(textEncoder.encode(key), buffer, {
46+
type: "arrayBuffer",
47+
});
48+
const result = await c.kv.get(textEncoder.encode(key), {
49+
type: "arrayBuffer",
50+
});
51+
if (!result) {
52+
return null;
53+
}
54+
if (typeof result === "string") {
55+
throw new TypeError("Expected arrayBuffer result");
56+
}
57+
return Array.from(new Uint8Array(result));
58+
},
59+
},
60+
});

rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { destroyActor, destroyObserver } from "./destroy";
2222
import { customTimeoutActor, errorHandlingActor } from "./error-handling";
2323
import { hibernationActor } from "./hibernation";
2424
import { inlineClientActor } from "./inline-client";
25+
import { kvActor } from "./kv";
2526
import { largePayloadActor, largePayloadConnActor } from "./large-payloads";
2627
import { counterWithLifecycle } from "./lifecycle";
2728
import { metadataActor } from "./metadata";
@@ -72,6 +73,8 @@ export const registry = setup({
7273
customTimeoutActor,
7374
// From inline-client.ts
7475
inlineClientActor,
76+
// From kv.ts
77+
kvActor,
7578
// From action-inputs.ts
7679
inputActor,
7780
// From action-timeout.ts

rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { Conn, ConnId } from "../../conn/mod";
66
import type { AnyDatabaseProvider, InferDatabaseClient } from "../../database";
77
import type { ActorDefinition, AnyActorDefinition } from "../../definition";
88
import type { ActorInstance, SaveStateOptions } from "../../instance/mod";
9+
import { ActorKv } from "../../instance/kv";
910
import type { Schedule } from "../../schedule";
1011

1112
/**
@@ -27,6 +28,7 @@ export class ActorContext<
2728
TInput,
2829
TDatabase
2930
>;
31+
#kv: ActorKv | undefined;
3032

3133
constructor(
3234
actor: ActorInstance<
@@ -41,6 +43,16 @@ export class ActorContext<
4143
this.#actor = actor;
4244
}
4345

46+
/**
47+
* Gets the KV storage interface.
48+
*/
49+
get kv(): ActorKv {
50+
if (!this.#kv) {
51+
this.#kv = new ActorKv(this.#actor.driver, this.#actor.id);
52+
}
53+
return this.#kv;
54+
}
55+
4456
/**
4557
* Get the actor state
4658
*

rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import {
3131
import type { AnyDatabaseProvider } from "../database";
3232
import { CachedSerializer } from "../protocol/serde";
3333
import { deadline } from "../utils";
34-
import { makeConnKey } from "./kv";
34+
import { makeConnKey } from "./keys";
3535
import type { ActorInstance } from "./mod";
3636
/**
3737
* Manages all connection-related operations for an actor instance.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
export const KEYS = {
2+
PERSIST_DATA: Uint8Array.from([1]),
3+
CONN_PREFIX: Uint8Array.from([2]), // Prefix for connection keys
4+
INSPECTOR_TOKEN: Uint8Array.from([3]), // Inspector token key
5+
KV: Uint8Array.from([4]), // Prefix for user-facing KV storage
6+
};
7+
8+
// Helper to create a prefixed key for user-facing KV storage
9+
export function makePrefixedKey(key: Uint8Array): Uint8Array {
10+
const prefixed = new Uint8Array(KEYS.KV.length + key.length);
11+
prefixed.set(KEYS.KV, 0);
12+
prefixed.set(key, KEYS.KV.length);
13+
return prefixed;
14+
}
15+
16+
// Helper to remove the prefix from a key
17+
export function removePrefixFromKey(prefixedKey: Uint8Array): Uint8Array {
18+
return prefixedKey.slice(KEYS.KV.length);
19+
}
20+
21+
// Helper to create a connection key
22+
export function makeConnKey(connId: string): Uint8Array {
23+
const encoder = new TextEncoder();
24+
const connIdBytes = encoder.encode(connId);
25+
const key = new Uint8Array(KEYS.CONN_PREFIX.length + connIdBytes.length);
26+
key.set(KEYS.CONN_PREFIX, 0);
27+
key.set(connIdBytes, KEYS.CONN_PREFIX.length);
28+
return key;
29+
}
Lines changed: 193 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,196 @@
1-
export const KEYS = {
2-
PERSIST_DATA: Uint8Array.from([1]),
3-
CONN_PREFIX: Uint8Array.from([2]), // Prefix for connection keys
4-
INSPECTOR_TOKEN: Uint8Array.from([3]), // Inspector token key
1+
import type { ActorDriver } from "../driver";
2+
import { makePrefixedKey, removePrefixFromKey } from "./keys";
3+
4+
/**
5+
* User-facing KV storage interface exposed on ActorContext.
6+
*/
7+
type KvValueType = "text" | "arrayBuffer";
8+
9+
type KvOptions = {
10+
type?: KvValueType;
11+
};
12+
13+
const textEncoder = new TextEncoder();
14+
const textDecoder = new TextDecoder();
15+
16+
const encodeValue = (
17+
value: Uint8Array | string | ArrayBuffer,
18+
options?: KvOptions,
19+
): Uint8Array => {
20+
const type = options?.type;
21+
if (!type) {
22+
if (value instanceof Uint8Array) {
23+
return value;
24+
}
25+
if (typeof value === "string") {
26+
return textEncoder.encode(value);
27+
}
28+
if (value instanceof ArrayBuffer) {
29+
return new Uint8Array(value);
30+
}
31+
}
32+
33+
switch (type) {
34+
case "text":
35+
if (typeof value !== "string") {
36+
throw new TypeError("Expected a string when type is text");
37+
}
38+
return textEncoder.encode(value);
39+
case "arrayBuffer":
40+
if (!(value instanceof ArrayBuffer)) {
41+
throw new TypeError("Expected an ArrayBuffer when type is arrayBuffer");
42+
}
43+
return new Uint8Array(value);
44+
default:
45+
throw new TypeError("Invalid kv value type");
46+
}
47+
};
48+
49+
const decodeValue = (
50+
value: Uint8Array,
51+
options?: KvOptions,
52+
): Uint8Array | string | ArrayBuffer => {
53+
const type = options?.type;
54+
if (!type) {
55+
return value;
56+
}
57+
58+
switch (type) {
59+
case "text":
60+
return textDecoder.decode(value);
61+
case "arrayBuffer": {
62+
const copy = new Uint8Array(value.byteLength);
63+
copy.set(value);
64+
return copy.buffer;
65+
}
66+
default:
67+
throw new TypeError("Invalid kv value type");
68+
}
569
};
670

7-
// Helper to create a connection key
8-
export function makeConnKey(connId: string): Uint8Array {
9-
const encoder = new TextEncoder();
10-
const connIdBytes = encoder.encode(connId);
11-
const key = new Uint8Array(KEYS.CONN_PREFIX.length + connIdBytes.length);
12-
key.set(KEYS.CONN_PREFIX, 0);
13-
key.set(connIdBytes, KEYS.CONN_PREFIX.length);
14-
return key;
71+
export class ActorKv {
72+
#driver: ActorDriver;
73+
#actorId: string;
74+
75+
constructor(driver: ActorDriver, actorId: string) {
76+
this.#driver = driver;
77+
this.#actorId = actorId;
78+
}
79+
80+
/**
81+
* Get a single value by key.
82+
*/
83+
async get(key: Uint8Array): Promise<Uint8Array | null>;
84+
async get(key: Uint8Array, options: KvOptions): Promise<string | ArrayBuffer | null>;
85+
async get(
86+
key: Uint8Array,
87+
options?: KvOptions,
88+
): Promise<Uint8Array | string | ArrayBuffer | null> {
89+
const results = await this.#driver.kvBatchGet(this.#actorId, [
90+
makePrefixedKey(key),
91+
]);
92+
const result = results[0];
93+
if (!result) {
94+
return null;
95+
}
96+
return decodeValue(result, options);
97+
}
98+
99+
/**
100+
* Get multiple values by keys.
101+
*/
102+
async getBatch(keys: Uint8Array[]): Promise<(Uint8Array | null)[]>;
103+
async getBatch(
104+
keys: Uint8Array[],
105+
options: KvOptions,
106+
): Promise<(string | ArrayBuffer | null)[]>;
107+
async getBatch(
108+
keys: Uint8Array[],
109+
options?: KvOptions,
110+
): Promise<(Uint8Array | string | ArrayBuffer | null)[]> {
111+
const prefixedKeys = keys.map(makePrefixedKey);
112+
const results = await this.#driver.kvBatchGet(
113+
this.#actorId,
114+
prefixedKeys,
115+
);
116+
return results.map((result) =>
117+
result ? decodeValue(result, options) : null,
118+
);
119+
}
120+
121+
/**
122+
* Put a single key-value pair.
123+
*/
124+
async put(key: Uint8Array, value: Uint8Array): Promise<void>;
125+
async put(
126+
key: Uint8Array,
127+
value: string | ArrayBuffer,
128+
options: KvOptions,
129+
): Promise<void>;
130+
async put(
131+
key: Uint8Array,
132+
value: Uint8Array | string | ArrayBuffer,
133+
options?: KvOptions,
134+
): Promise<void> {
135+
await this.#driver.kvBatchPut(this.#actorId, [
136+
[makePrefixedKey(key), encodeValue(value, options)],
137+
]);
138+
}
139+
140+
/**
141+
* Put multiple key-value pairs.
142+
*/
143+
async putBatch(entries: [Uint8Array, Uint8Array][]): Promise<void>;
144+
async putBatch(
145+
entries: [Uint8Array, string | ArrayBuffer][],
146+
options: KvOptions,
147+
): Promise<void>;
148+
async putBatch(
149+
entries: [Uint8Array, Uint8Array | string | ArrayBuffer][],
150+
options?: KvOptions,
151+
): Promise<void> {
152+
const prefixedEntries: [Uint8Array, Uint8Array][] = entries.map(
153+
([key, value]) => [makePrefixedKey(key), encodeValue(value, options)],
154+
);
155+
await this.#driver.kvBatchPut(this.#actorId, prefixedEntries);
156+
}
157+
158+
/**
159+
* Delete a single key.
160+
*/
161+
async delete(key: Uint8Array): Promise<void> {
162+
await this.#driver.kvBatchDelete(this.#actorId, [makePrefixedKey(key)]);
163+
}
164+
165+
/**
166+
* Delete multiple keys.
167+
*/
168+
async deleteBatch(keys: Uint8Array[]): Promise<void> {
169+
const prefixedKeys = keys.map(makePrefixedKey);
170+
await this.#driver.kvBatchDelete(this.#actorId, prefixedKeys);
171+
}
172+
173+
/**
174+
* List all keys with a given prefix.
175+
* Returns key-value pairs where keys have the user prefix removed.
176+
*/
177+
async list(prefix: Uint8Array): Promise<[Uint8Array, Uint8Array][]>;
178+
async list(
179+
prefix: Uint8Array,
180+
options: KvOptions,
181+
): Promise<[Uint8Array, string | ArrayBuffer][]>;
182+
async list(
183+
prefix: Uint8Array,
184+
options?: KvOptions,
185+
): Promise<[Uint8Array, Uint8Array | string | ArrayBuffer][]> {
186+
const prefixedPrefix = makePrefixedKey(prefix);
187+
const results = await this.#driver.kvListPrefix(
188+
this.#actorId,
189+
prefixedPrefix,
190+
);
191+
return results.map(([key, value]) => [
192+
removePrefixFromKey(key),
193+
decodeValue(value, options),
194+
]);
195+
}
15196
}

rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import {
4444
} from "../utils";
4545
import { ConnectionManager } from "./connection-manager";
4646
import { EventManager } from "./event-manager";
47-
import { KEYS } from "./kv";
47+
import { KEYS } from "./keys";
4848
import {
4949
convertActorFromBarePersisted,
5050
type PersistedActor,

rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { convertConnToBarePersistedConn } from "../conn/persisted";
1212
import type { ActorDriver } from "../driver";
1313
import * as errors from "../errors";
1414
import { isConnStatePath, isStatePath } from "../utils";
15-
import { KEYS, makeConnKey } from "./kv";
15+
import { KEYS, makeConnKey } from "./keys";
1616
import type { ActorInstance } from "./mod";
1717
import { convertActorToBarePersisted, type PersistedActor } from "./persisted";
1818

rivetkit-typescript/packages/rivetkit/src/actor/mod.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ export type { AnyConn, Conn } from "./conn/mod";
7676
export type { ActorDefinition, AnyActorDefinition } from "./definition";
7777
export { lookupInRegistry } from "./definition";
7878
export { UserError, type UserErrorOptions } from "./errors";
79-
export { KEYS as KV_KEYS } from "./instance/kv";
79+
export { KEYS as KV_KEYS } from "./instance/keys";
80+
export { ActorKv } from "./instance/kv";
8081
export type { AnyActorInstance } from "./instance/mod";
8182
export {
8283
type ActorRouter,

rivetkit-typescript/packages/rivetkit/src/client/mod.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export {
2222
ManagerError,
2323
} from "@/client/errors";
2424
export type { CreateRequest } from "@/manager/protocol/query";
25-
export { KEYS as KV_KEYS } from "../actor/instance/kv";
25+
export { KEYS as KV_KEYS } from "../actor/instance/keys";
2626
export type { ActorActionFunction } from "./actor-common";
2727
export type {
2828
ActorConn,

0 commit comments

Comments
 (0)