Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ Data structures often include:

## Testing Guidelines
- When running tests, always pipe the test to a file in /tmp/ then grep it in a second step. You can grep test logs multiple times to search for different log lines.
- For RivetKit TypeScript tests, run from `rivetkit-typescript/packages/rivetkit` and use `pnpm test <filter>` with `-t` to narrow to specific suites. For example: `pnpm test driver-file-system -t ".*Actor KV.*"`.

## Optimizations

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { actor, type ActorContext } from "rivetkit";

export const kvActor = actor({
actions: {
putText: async (
c: ActorContext<any, any, any, any, any, any>,
key: string,
value: string,
) => {
await c.kv.put(key, value);
return true;
},
getText: async (
c: ActorContext<any, any, any, any, any, any>,
key: string,
) => {
return await c.kv.get(key);
},
listText: async (
c: ActorContext<any, any, any, any, any, any>,
prefix: string,
) => {
const results = await c.kv.list(prefix, { keyType: "text" });
return results.map(([key, value]) => ({
key,
value,
}));
},
roundtripArrayBuffer: async (
c: ActorContext<any, any, any, any, any, any>,
key: string,
values: number[],
) => {
const buffer = new Uint8Array(values).buffer;
await c.kv.put(key, buffer, { type: "arrayBuffer" });
const result = await c.kv.get(key, { type: "arrayBuffer" });
if (!result) {
return null;
}
return Array.from(new Uint8Array(result));
},
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { destroyActor, destroyObserver } from "./destroy";
import { customTimeoutActor, errorHandlingActor } from "./error-handling";
import { hibernationActor } from "./hibernation";
import { inlineClientActor } from "./inline-client";
import { kvActor } from "./kv";
import { largePayloadActor, largePayloadConnActor } from "./large-payloads";
import { counterWithLifecycle } from "./lifecycle";
import { metadataActor } from "./metadata";
Expand Down Expand Up @@ -72,6 +73,8 @@ export const registry = setup({
customTimeoutActor,
// From inline-client.ts
inlineClientActor,
// From kv.ts
kvActor,
// From action-inputs.ts
inputActor,
// From action-timeout.ts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { Conn, ConnId } from "../../conn/mod";
import type { AnyDatabaseProvider, InferDatabaseClient } from "../../database";
import type { ActorDefinition, AnyActorDefinition } from "../../definition";
import type { ActorInstance, SaveStateOptions } from "../../instance/mod";
import { ActorKv } from "../../instance/kv";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import for ActorKv is inserted in the middle of other imports, which violates import sorting rules. It should be moved to maintain alphabetical order with the other imports.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

import type { Schedule } from "../../schedule";

/**
Expand All @@ -27,6 +28,7 @@ export class ActorContext<
TInput,
TDatabase
>;
#kv: ActorKv | undefined;

constructor(
actor: ActorInstance<
Expand All @@ -41,6 +43,16 @@ export class ActorContext<
this.#actor = actor;
}

/**
* Gets the KV storage interface.
*/
get kv(): ActorKv {
if (!this.#kv) {
this.#kv = new ActorKv(this.#actor.driver, this.#actor.id);
}
return this.#kv;
}

/**
* Get the actor state
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
import type { AnyDatabaseProvider } from "../database";
import { CachedSerializer } from "../protocol/serde";
import { deadline } from "../utils";
import { makeConnKey } from "./kv";
import { makeConnKey } from "./keys";
import type { ActorInstance } from "./mod";
/**
* Manages all connection-related operations for an actor instance.
Expand Down
29 changes: 29 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/actor/instance/keys.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
export const KEYS = {
PERSIST_DATA: Uint8Array.from([1]),
CONN_PREFIX: Uint8Array.from([2]), // Prefix for connection keys
INSPECTOR_TOKEN: Uint8Array.from([3]), // Inspector token key
KV: Uint8Array.from([4]), // Prefix for user-facing KV storage
};

// Helper to create a prefixed key for user-facing KV storage
export function makePrefixedKey(key: Uint8Array): Uint8Array {
const prefixed = new Uint8Array(KEYS.KV.length + key.length);
prefixed.set(KEYS.KV, 0);
prefixed.set(key, KEYS.KV.length);
return prefixed;
}

// Helper to remove the prefix from a key
export function removePrefixFromKey(prefixedKey: Uint8Array): Uint8Array {
return prefixedKey.slice(KEYS.KV.length);
}

// Helper to create a connection key
export function makeConnKey(connId: string): Uint8Array {
const encoder = new TextEncoder();
const connIdBytes = encoder.encode(connId);
const key = new Uint8Array(KEYS.CONN_PREFIX.length + connIdBytes.length);
key.set(KEYS.CONN_PREFIX, 0);
key.set(connIdBytes, KEYS.CONN_PREFIX.length);
return key;
}
254 changes: 240 additions & 14 deletions rivetkit-typescript/packages/rivetkit/src/actor/instance/kv.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,241 @@
export const KEYS = {
PERSIST_DATA: Uint8Array.from([1]),
CONN_PREFIX: Uint8Array.from([2]), // Prefix for connection keys
INSPECTOR_TOKEN: Uint8Array.from([3]), // Inspector token key
};

// Helper to create a connection key
export function makeConnKey(connId: string): Uint8Array {
const encoder = new TextEncoder();
const connIdBytes = encoder.encode(connId);
const key = new Uint8Array(KEYS.CONN_PREFIX.length + connIdBytes.length);
key.set(KEYS.CONN_PREFIX, 0);
key.set(connIdBytes, KEYS.CONN_PREFIX.length);
return key;
import type { ActorDriver } from "../driver";
import { makePrefixedKey, removePrefixFromKey } from "./keys";

/**
* User-facing KV storage interface exposed on ActorContext.
*/
type KvValueType = "text" | "arrayBuffer" | "binary";
type KvKeyType = "text" | "binary";
type KvKey = Uint8Array | string;

type KvValueTypeMap = {
text: string;
arrayBuffer: ArrayBuffer;
binary: Uint8Array;
};

type KvKeyTypeMap = {
text: string;
binary: Uint8Array;
};

type KvValueOptions<T extends KvValueType = "text"> = {
type?: T;
};

type KvListOptions<
T extends KvValueType = "text",
K extends KvKeyType = "text",
> = KvValueOptions<T> & {
keyType?: K;
};

const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder();

function encodeKey<K extends KvKeyType = KvKeyType>(
key: KvKeyTypeMap[K],
keyType?: K,
): Uint8Array {
if (key instanceof Uint8Array) {
return key;
}
const resolvedKeyType = keyType ?? "text";
if (resolvedKeyType === "binary") {
throw new TypeError("Expected a Uint8Array when keyType is binary");
}
return textEncoder.encode(key);
}

function decodeKey<K extends KvKeyType = "text">(
key: Uint8Array,
keyType?: K,
): KvKeyTypeMap[K] {
const resolvedKeyType = keyType ?? "text";
switch (resolvedKeyType) {
case "text":
return textDecoder.decode(key) as KvKeyTypeMap[K];
case "binary":
return key as KvKeyTypeMap[K];
default:
throw new TypeError("Invalid kv key type");
}
}

function resolveValueType(
value: string | Uint8Array | ArrayBuffer,
): KvValueType {
if (typeof value === "string") {
return "text";
}
if (value instanceof Uint8Array) {
return "binary";
}
if (value instanceof ArrayBuffer) {
return "arrayBuffer";
}
throw new TypeError("Invalid kv value");
}

function encodeValue<T extends KvValueType = KvValueType>(
value: KvValueTypeMap[T],
options?: KvValueOptions<T>,
): Uint8Array {
const type =
options?.type ??
resolveValueType(value as string | Uint8Array | ArrayBuffer);
switch (type) {
case "text":
if (typeof value !== "string") {
throw new TypeError("Expected a string when type is text");
}
return textEncoder.encode(value);
case "arrayBuffer":
if (!(value instanceof ArrayBuffer)) {
throw new TypeError("Expected an ArrayBuffer when type is arrayBuffer");
}
return new Uint8Array(value);
case "binary":
if (!(value instanceof Uint8Array)) {
throw new TypeError("Expected a Uint8Array when type is binary");
}
return value;
default:
throw new TypeError("Invalid kv value type");
}
}

function decodeValue<T extends KvValueType = "text">(
value: Uint8Array,
options?: KvValueOptions<T>,
): KvValueTypeMap[T] {
const type = options?.type ?? "text";
switch (type) {
case "text":
return textDecoder.decode(value) as KvValueTypeMap[T];
case "arrayBuffer": {
const copy = new Uint8Array(value.byteLength);
copy.set(value);
return copy.buffer as KvValueTypeMap[T];
}
case "binary":
return value as KvValueTypeMap[T];
default:
throw new TypeError("Invalid kv value type");
}
}

export class ActorKv {
#driver: ActorDriver;
#actorId: string;

constructor(driver: ActorDriver, actorId: string) {
this.#driver = driver;
this.#actorId = actorId;
}

/**
* Get a single value by key.
*/
async get<T extends KvValueType = "text">(
key: KvKey,
options?: KvValueOptions<T>,
): Promise<KvValueTypeMap[T] | null> {
const results = await this.#driver.kvBatchGet(this.#actorId, [
makePrefixedKey(encodeKey(key)),
]);
const result = results[0];
if (!result) {
return null;
}
return decodeValue(result, options);
}

/**
* Get multiple values by keys.
*/
async getBatch<T extends KvValueType = "text">(
keys: KvKey[],
options?: KvValueOptions<T>,
): Promise<(KvValueTypeMap[T] | null)[]> {
const prefixedKeys = keys.map((key) =>
makePrefixedKey(encodeKey(key)),
);
const results = await this.#driver.kvBatchGet(
this.#actorId,
prefixedKeys,
);
return results.map((result) =>
result ? decodeValue(result, options) : null,
);
}

/**
* Put a single key-value pair.
*/
async put<T extends KvValueType = KvValueType>(
key: KvKey,
value: KvValueTypeMap[T],
options?: KvValueOptions<T>,
): Promise<void> {
await this.#driver.kvBatchPut(this.#actorId, [
[makePrefixedKey(encodeKey(key)), encodeValue(value, options)],
]);
}

/**
* Put multiple key-value pairs.
*/
async putBatch<T extends KvValueType = KvValueType>(
entries: [KvKey, KvValueTypeMap[T]][],
options?: KvValueOptions<T>,
): Promise<void> {
const prefixedEntries: [Uint8Array, Uint8Array][] = entries.map(
([key, value]) => [
makePrefixedKey(encodeKey(key)),
encodeValue(value, options),
],
);
await this.#driver.kvBatchPut(this.#actorId, prefixedEntries);
}

/**
* Delete a single key.
*/
async delete(key: KvKey): Promise<void> {
await this.#driver.kvBatchDelete(this.#actorId, [
makePrefixedKey(encodeKey(key)),
]);
}

/**
* Delete multiple keys.
*/
async deleteBatch(keys: KvKey[]): Promise<void> {
const prefixedKeys = keys.map((key) =>
makePrefixedKey(encodeKey(key)),
);
await this.#driver.kvBatchDelete(this.#actorId, prefixedKeys);
}

/**
* List all keys with a given prefix.
* Returns key-value pairs where keys have the user prefix removed.
*/
async list<T extends KvValueType = "text", K extends KvKeyType = "text">(
prefix: KvKeyTypeMap[K],
options?: KvListOptions<T, K>,
): Promise<[KvKeyTypeMap[K], KvValueTypeMap[T]][]> {
const prefixedPrefix = makePrefixedKey(
encodeKey(prefix, options?.keyType),
);
const results = await this.#driver.kvListPrefix(
this.#actorId,
prefixedPrefix,
);
return results.map(([key, value]) => [
decodeKey<K>(removePrefixFromKey(key), options?.keyType),
decodeValue<T>(value, options),
]);
}
}
Loading
Loading