Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit f4d54f1

Browse files
authored
Support remote KV storage (#430)
This PR adds experimental support for reading/writing from real KV namespaces. The user's computer is treated like another "colo", and values are cached there as they are in KV, respecting `cacheTtl`s passed to `get`. :warning: We're still exploring how this feature will work, so it's likely it'll change in the future.
1 parent 77ce029 commit f4d54f1

File tree

12 files changed

+394
-24
lines changed

12 files changed

+394
-24
lines changed

packages/tre/src/index.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,13 @@ export class Miniflare {
187187
// Initialise plugin gateway factories and routers
188188
this.#gatewayFactories = {} as PluginGatewayFactories;
189189
this.#routers = {} as PluginRouters;
190-
this.#initPlugins();
191190

192191
// Split and validate options
193192
const [sharedOpts, workerOpts] = validateOptions(opts);
194193
this.#optionsVersion = 1;
195194
this.#sharedOpts = sharedOpts;
196195
this.#workerOpts = workerOpts;
196+
this.#initPlugins();
197197

198198
// Get supported shell for executing runtime binary
199199
// TODO: allow this to be configured if necessary
@@ -213,7 +213,12 @@ export class Miniflare {
213213
#initPlugins() {
214214
for (const [key, plugin] of PLUGIN_ENTRIES) {
215215
if (plugin.gateway !== undefined && plugin.router !== undefined) {
216-
const gatewayFactory = new GatewayFactory<any>(key, plugin.gateway);
216+
const gatewayFactory = new GatewayFactory<any>(
217+
this.#sharedOpts.core.cloudflareFetch,
218+
key,
219+
plugin.gateway,
220+
plugin.remoteStorage
221+
);
217222
const router = new plugin.router(gatewayFactory);
218223
// @ts-expect-error this.#gatewayFactories[key] could be any plugin's
219224
this.#gatewayFactories[key] = gatewayFactory;

packages/tre/src/plugins/core/index.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,17 @@ import {
1111
kVoid,
1212
supportedCompatibilityDate,
1313
} from "../../runtime";
14-
import { Awaitable, JsonSchema, MiniflareCoreError } from "../../shared";
15-
import { BINDING_SERVICE_LOOPBACK, Plugin } from "../shared";
14+
import {
15+
Awaitable,
16+
JsonSchema,
17+
MiniflareCoreError,
18+
zAwaitable,
19+
} from "../../shared";
20+
import {
21+
BINDING_SERVICE_LOOPBACK,
22+
CloudflareFetchSchema,
23+
Plugin,
24+
} from "../shared";
1625
import {
1726
ModuleDefinitionSchema,
1827
ModuleLocator,
@@ -25,10 +34,10 @@ const encoder = new TextEncoder();
2534
const numericCompare = new Intl.Collator(undefined, { numeric: true }).compare;
2635

2736
// (request: Request) => Awaitable<Response>
28-
export const ServiceFetch = z
37+
export const ServiceFetchSchema = z
2938
.function()
3039
.args(z.instanceof(Request))
31-
.returns(z.instanceof(Response).or(z.promise(z.instanceof(Response))));
40+
.returns(zAwaitable(z.instanceof(Response)));
3241
export const CoreOptionsSchema = z.object({
3342
name: z.string().optional(),
3443
script: z.string().optional(),
@@ -52,7 +61,9 @@ export const CoreOptionsSchema = z.object({
5261
textBlobBindings: z.record(z.string()).optional(),
5362
dataBlobBindings: z.record(z.string()).optional(),
5463
// TODO: add support for workerd network/external/disk services here
55-
serviceBindings: z.record(z.union([z.string(), ServiceFetch])).optional(),
64+
serviceBindings: z
65+
.record(z.union([z.string(), ServiceFetchSchema]))
66+
.optional(),
5667
});
5768

5869
export const CoreSharedOptionsSchema = z.object({
@@ -62,6 +73,8 @@ export const CoreSharedOptionsSchema = z.object({
6273
inspectorPort: z.number().optional(),
6374
verbose: z.boolean().optional(),
6475

76+
cloudflareFetch: CloudflareFetchSchema.optional(),
77+
6578
// TODO: add back validation of cf object
6679
cf: z.union([z.boolean(), z.string(), z.record(z.any())]).optional(),
6780

packages/tre/src/plugins/kv/gateway.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ export class KVGateway {
9191
`Invalid ${PARAM_CACHE_TTL} of ${cacheTtl}. Cache TTL must be at least ${MIN_CACHE_TTL}.`
9292
);
9393
}
94-
return this.storage.get(key);
94+
return this.storage.get(key, false, cacheTtl);
9595
}
9696

9797
async put(
@@ -187,7 +187,12 @@ export class KVGateway {
187187
const cursor = options.cursor;
188188
const res = await this.storage.list({ limit, prefix, cursor });
189189
return {
190-
keys: res.keys,
190+
keys: res.keys.map((key) => ({
191+
...key,
192+
// workerd expects metadata to be a JSON-serialised string
193+
metadata:
194+
key.metadata === undefined ? undefined : JSON.stringify(key.metadata),
195+
})),
191196
cursor: res.cursor === "" ? undefined : res.cursor,
192197
list_complete: res.cursor === "",
193198
};

packages/tre/src/plugins/kv/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import {
1212
} from "../shared";
1313
import { KV_PLUGIN_NAME } from "./constants";
1414
import { KVGateway } from "./gateway";
15+
import { KVRemoteStorage } from "./remote";
1516
import { KVRouter } from "./router";
1617
import { SitesOptions, getSitesBindings, getSitesService } from "./sites";
1718

1819
export const KVOptionsSchema = z.object({
20+
// TODO: also allow array like Miniflare 2
1921
kvNamespaces: z.record(z.string()).optional(),
2022

2123
// Workers Sites
@@ -42,6 +44,7 @@ export const KV_PLUGIN: Plugin<
4244
> = {
4345
gateway: KVGateway,
4446
router: KVRouter,
47+
remoteStorage: KVRemoteStorage,
4548
options: KVOptionsSchema,
4649
sharedOptions: KVSharedOptionsSchema,
4750
async getBindings(options) {
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
import assert from "assert";
2+
import { Blob } from "buffer";
3+
import { BodyInit, FormData, Response } from "undici";
4+
import { z } from "zod";
5+
import { Clock, millisToSeconds } from "../../shared";
6+
import {
7+
RemoteStorage,
8+
StorageListOptions,
9+
StorageListResult,
10+
StoredValueMeta,
11+
} from "../../storage";
12+
import { KVError } from "./gateway";
13+
14+
interface RemoteCacheMetadata {
15+
// UNIX timestamp in seconds when this key was last modified. KV allows you
16+
// to reduce the cache TTL if it was previously set too high
17+
// (https://developers.cloudflare.com/workers/runtime-apis/kv/#cache-ttl).
18+
// This is used to check if the data should be revalidated from the remote.
19+
storedAt: number;
20+
// Whether this key represents a deleted value and should be treated as
21+
// `undefined`.
22+
tombstone?: true;
23+
// The actual user-specified expiration of this key, if any. We want to return
24+
// this to users instead of our cache expiration.
25+
actualExpiration?: number;
26+
// The actual user-specified metadata of this key, if any. We want to return
27+
// this to users instead of this object.
28+
actualMetadata?: unknown;
29+
}
30+
31+
const APIEnvelopeSchema = z.object({
32+
success: z.boolean(),
33+
errors: z.array(z.object({ code: z.number(), message: z.string() })),
34+
messages: z.array(z.object({ code: z.number(), message: z.string() })),
35+
});
36+
37+
const KVGetMetadataResponseSchema = z.intersection(
38+
APIEnvelopeSchema,
39+
z.object({ result: z.unknown() })
40+
);
41+
42+
const KVListResponseSchema = z.intersection(
43+
APIEnvelopeSchema,
44+
z.object({
45+
result: z.array(
46+
z.object({
47+
name: z.string(),
48+
expiration: z.onumber(),
49+
metadata: z.unknown(),
50+
})
51+
),
52+
result_info: z.optional(
53+
z.object({
54+
count: z.onumber(),
55+
cursor: z.ostring(),
56+
})
57+
),
58+
})
59+
);
60+
61+
async function assertSuccessfulResponse(response: Response) {
62+
if (response.ok) return;
63+
64+
// If this wasn't a successful response, throw a KVError
65+
const contentType = response.headers.get("Content-Type");
66+
if (contentType?.toLowerCase().includes("application/json")) {
67+
const envelope = APIEnvelopeSchema.parse(await response.json());
68+
throw new KVError(
69+
response.status,
70+
envelope.errors.map(({ message }) => message).join("\n")
71+
);
72+
} else {
73+
throw new KVError(response.status, await response.text());
74+
}
75+
}
76+
77+
const DEFAULT_CACHE_TTL = 60;
78+
// Returns seconds since UNIX epoch key should expire, using the specified
79+
// expiration only if it is sooner than the cache TTL
80+
function getCacheExpiration(
81+
clock: Clock,
82+
expiration?: number,
83+
cacheTtl = DEFAULT_CACHE_TTL
84+
): number {
85+
// Return minimum expiration
86+
const cacheExpiration = millisToSeconds(clock()) + cacheTtl;
87+
if (expiration === undefined || isNaN(expiration)) return cacheExpiration;
88+
else return Math.min(cacheExpiration, expiration);
89+
}
90+
91+
export class KVRemoteStorage extends RemoteStorage {
92+
async get(
93+
key: string,
94+
skipMetadata?: boolean,
95+
cacheTtl = DEFAULT_CACHE_TTL
96+
): Promise<StoredValueMeta | undefined> {
97+
// If this key is cached, return it
98+
const cachedValue = await this.cache.get<RemoteCacheMetadata>(key);
99+
if (cachedValue?.metadata?.storedAt !== undefined) {
100+
// cacheTtl may have changed between the original get call that cached
101+
// this value and now, so check the cache is still fresh with the new TTL
102+
const newExpiration = cachedValue.metadata.storedAt + cacheTtl;
103+
if (newExpiration >= millisToSeconds(this.clock())) {
104+
// If the cache is still fresh, update the expiration and return
105+
await this.cache.put<RemoteCacheMetadata>(key, {
106+
value: cachedValue.value,
107+
expiration: newExpiration,
108+
// Intentionally not updating storedAt here, future get()s should
109+
// compare their cacheTtl against the original
110+
metadata: cachedValue.metadata,
111+
});
112+
113+
// If we recently deleted this key, we'll cache a tombstone instead,
114+
// and want to return undefined in that case
115+
if (cachedValue.metadata.tombstone) return undefined;
116+
return {
117+
value: cachedValue.value,
118+
expiration: cachedValue.metadata.actualExpiration,
119+
metadata: cachedValue.metadata.actualMetadata,
120+
};
121+
}
122+
// Otherwise, revalidate...
123+
}
124+
125+
// Otherwise, fetch the key...
126+
const encodedKey = encodeURIComponent(key);
127+
const valueResource = `storage/kv/namespaces/${this.namespace}/values/${encodedKey}`;
128+
const metadataResource = `storage/kv/namespaces/${this.namespace}/metadata/${encodedKey}`;
129+
const [valueResponse, metadataResponse] = await Promise.all([
130+
this.cloudflareFetch(valueResource),
131+
this.cloudflareFetch(metadataResource),
132+
]);
133+
if (valueResponse.status === 404) {
134+
// Don't cache not founds, so new keys always returned instantly
135+
return undefined;
136+
}
137+
await assertSuccessfulResponse(valueResponse);
138+
await assertSuccessfulResponse(metadataResponse);
139+
140+
const value = new Uint8Array(await valueResponse.arrayBuffer());
141+
const metadataEnvelope = KVGetMetadataResponseSchema.parse(
142+
await metadataResponse.json()
143+
);
144+
assert(metadataEnvelope.success);
145+
// The API will return null if there's no metadata, but we treat this as
146+
// undefined
147+
const metadata = metadataEnvelope.result ?? undefined;
148+
149+
const expirationHeader = valueResponse.headers.get("Expiration");
150+
let expiration: number | undefined;
151+
if (expirationHeader !== null) {
152+
const maybeExpiration = parseInt(expirationHeader);
153+
if (!isNaN(maybeExpiration)) expiration = maybeExpiration;
154+
}
155+
156+
// ...and cache it for the specified TTL, then return it
157+
const result: StoredValueMeta = { value, expiration, metadata };
158+
await this.cache.put<RemoteCacheMetadata>(key, {
159+
value: result.value,
160+
expiration: getCacheExpiration(this.clock, expiration, cacheTtl),
161+
metadata: {
162+
storedAt: millisToSeconds(this.clock()),
163+
actualExpiration: result.expiration,
164+
actualMetadata: result.metadata,
165+
},
166+
});
167+
return result;
168+
}
169+
170+
async put(key: string, value: StoredValueMeta): Promise<void> {
171+
// Store new value, expiration and metadata in remote
172+
const encodedKey = encodeURIComponent(key);
173+
const resource = `storage/kv/namespaces/${this.namespace}/values/${encodedKey}`;
174+
175+
const searchParams = new URLSearchParams();
176+
if (value.expiration !== undefined) {
177+
// Send expiration as TTL to avoid "expiration times must be at least 60s
178+
// in the future" issues from clock skew when setting `expirationTtl: 60`.
179+
const desiredTtl = value.expiration - millisToSeconds(this.clock());
180+
const ttl = Math.max(desiredTtl, 60);
181+
searchParams.set("expiration_ttl", ttl.toString());
182+
}
183+
184+
let body: BodyInit = value.value;
185+
if (value.metadata !== undefined) {
186+
body = new FormData();
187+
body.set("value", new Blob([value.value]));
188+
body.set("metadata", JSON.stringify(value.metadata));
189+
}
190+
191+
const response = await this.cloudflareFetch(resource, searchParams, {
192+
method: "PUT",
193+
body,
194+
});
195+
await assertSuccessfulResponse(response);
196+
197+
// Store this value in the cache
198+
await this.cache.put<RemoteCacheMetadata>(key, {
199+
value: value.value,
200+
expiration: getCacheExpiration(this.clock, value.expiration),
201+
metadata: {
202+
storedAt: millisToSeconds(this.clock()),
203+
actualExpiration: value.expiration,
204+
actualMetadata: value.metadata,
205+
},
206+
});
207+
}
208+
209+
async delete(key: string): Promise<boolean> {
210+
// Delete key from remote
211+
const encodedKey = encodeURIComponent(key);
212+
const resource = `storage/kv/namespaces/${this.namespace}/values/${encodedKey}`;
213+
214+
const response = await this.cloudflareFetch(resource, undefined, {
215+
method: "DELETE",
216+
});
217+
await assertSuccessfulResponse(response);
218+
219+
// "Store" delete in cache as tombstone
220+
await this.cache.put<RemoteCacheMetadata>(key, {
221+
value: new Uint8Array(),
222+
expiration: getCacheExpiration(this.clock),
223+
metadata: { storedAt: millisToSeconds(this.clock()), tombstone: true },
224+
});
225+
226+
// Technically, it's incorrect to always say we deleted the key by returning
227+
// true here, as the value may not exist in the remote. However, `KVGateway`
228+
// ignores this result anyway.
229+
return true;
230+
}
231+
232+
async list(options: StorageListOptions): Promise<StorageListResult> {
233+
// Always list from remote, ignore cache
234+
const resource = `storage/kv/namespaces/${this.namespace}/keys`;
235+
const searchParams = new URLSearchParams();
236+
if (options.limit !== undefined) {
237+
searchParams.set("limit", options.limit.toString());
238+
}
239+
if (options.cursor !== undefined) {
240+
searchParams.set("cursor", options.cursor.toString());
241+
}
242+
if (options.prefix !== undefined) {
243+
searchParams.set("prefix", options.prefix.toString());
244+
}
245+
246+
// Make sure unsupported options aren't specified
247+
assert.strictEqual(options.start, undefined);
248+
assert.strictEqual(options.end, undefined);
249+
assert.strictEqual(options.reverse, undefined);
250+
assert.strictEqual(options.delimiter, undefined);
251+
252+
const response = await this.cloudflareFetch(resource, searchParams);
253+
await assertSuccessfulResponse(response);
254+
const value = KVListResponseSchema.parse(await response.json());
255+
assert(value.success);
256+
257+
return {
258+
keys: value.result,
259+
cursor: value.result_info?.cursor ?? "",
260+
};
261+
}
262+
263+
has(): never {
264+
assert.fail("KVGateway should not call has()");
265+
}
266+
head(): never {
267+
assert.fail("KVGateway should not call head()");
268+
}
269+
getRange(): never {
270+
assert.fail("KVGateway should not call getRange()");
271+
}
272+
}

0 commit comments

Comments
 (0)