Skip to content
71 changes: 69 additions & 2 deletions packages/miniflare/src/workers/kv/namespace.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import {
DELETE,
GET,
HttpError,
KeyValueEntry,
KeyValueStorage,
maybeApply,
MiniflareDurableObject,
POST,
PUT,
RouteHandler,
} from "miniflare:shared";
Expand Down Expand Up @@ -73,6 +75,41 @@ function secondsToMillis(seconds: number): number {
return seconds * 1000;
}

async function processKeyValue(
obj: KeyValueEntry<unknown> | null,
type: string = "text",
withMetadata: boolean = false
) {
const decoder = new TextDecoder();
let r = "";
if (obj?.value) {
for await (const chunk of obj?.value) {
r += decoder.decode(chunk, { stream: true });
}
r += decoder.decode();
}

let val = null;
try {
val = obj?.value == null ? null : type === "json" ? JSON.parse(r) : r;
} catch (err: any) {
throw new HttpError(
400,
"At least of of the requested keys corresponds to a non-JSON value"
);
}
if (val == null) {
return null;
}
if (withMetadata) {
return {
value: val,
metadata: obj?.metadata ? JSON.stringify(obj?.metadata) : null,
};
}
return val;
}

export class KVNamespaceObject extends MiniflareDurableObject {
#storage?: KeyValueStorage;
get storage() {
Expand All @@ -81,13 +118,44 @@ export class KVNamespaceObject extends MiniflareDurableObject {
}

@GET("/:key")
@POST("/bulk/get")
get: RouteHandler<KVParams> = async (req, params, url) => {
if (req.method === "POST" && req.body != null) {
let r = "";
const decoder = new TextDecoder();
for await (const chunk of req.body) {
r += decoder.decode(chunk, { stream: true });
}
r += decoder.decode();
const parsedBody = JSON.parse(r);
const keys: string[] = parsedBody.keys;
const type = parsedBody?.type;
if (type && type !== "text" && type !== "json") {
return new Response("", { status: 400 });
}
const obj: { [key: string]: any } = {};
if (keys.length > 100) {
return new Response("", { status: 400 });
}
for (const key of keys) {
validateGetOptions(key, { cacheTtl: parsedBody?.cacheTtl });
const entry = await this.storage.get(key);
const value = await processKeyValue(
entry,
parsedBody?.type,
parsedBody?.withMetadata
);
obj[key] = value;
}

return new Response(JSON.stringify(obj));
}

// Decode URL parameters
const key = decodeKey(params, url.searchParams);
const cacheTtlParam = url.searchParams.get(KVParams.CACHE_TTL);
const cacheTtl =
cacheTtlParam === null ? undefined : parseInt(cacheTtlParam);

// Get value from storage
validateGetOptions(key, { cacheTtl });
const entry = await this.storage.get(key);
Expand All @@ -114,7 +182,6 @@ export class KVNamespaceObject extends MiniflareDurableObject {
const rawExpiration = url.searchParams.get(KVParams.EXPIRATION);
const rawExpirationTtl = url.searchParams.get(KVParams.EXPIRATION_TTL);
const rawMetadata = req.headers.get(KVHeaders.METADATA);

// Validate key, expiration and metadata
const now = millisToSeconds(this.timers.now());
const { expiration, metadata } = validatePutOptions(key, {
Expand Down
50 changes: 50 additions & 0 deletions packages/miniflare/test/plugins/kv/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,56 @@ test("get: returns value", async (t) => {
const result = await kv.get("key");
t.is(result, "value");
});

test("bulk get: returns value", async (t) => {
const { kv } = t.context;
await kv.put("key1", "value1");
const result: any = await kv.get(["key1", "key2"]);
const expectedResult = new Map([
["key1", "value1"],
["key2", null],
]);

t.deepEqual(result, expectedResult);
});

test("bulk get: check max keys", async (t) => {
const { kv } = t.context;
await kv.put("key1", "value1");
const keyArray = [];
for (let i = 0; i <= 100; i++) {
keyArray.push(`key${i}`);
}
try {
await kv.get(keyArray);
} catch (error: any) {
t.is(error.message, "KV GET_BULK failed: 400 Bad Request");
}
});

test("bulk get: request json type", async (t) => {
const { kv } = t.context;
await kv.put("key1", '{"example": "ex"}');
await kv.put("key2", "example");
let result: any = await kv.get(["key1"]);
let expectedResult: any = new Map([["key1", '{"example": "ex"}']]);
expectedResult = new Map([["key1", '{"example": "ex"}']]);
t.deepEqual(result, expectedResult);

result = await kv.get(["key1"], "json");
expectedResult = new Map([["key1", { example: "ex" }]]);
t.deepEqual(result, expectedResult);

try {
await kv.get(["key1", "key2"], "json");
} catch (error: any) {
t.is(
error.message,
"KV GET_BULK failed: 400 At least of of the requested keys corresponds to a non-JSON value"
);
}
});

test("get: returns null for non-existent keys", async (t) => {
const { kv } = t.context;
t.is(await kv.get("key"), null);
Expand Down
20 changes: 18 additions & 2 deletions packages/miniflare/test/test-shared/miniflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,23 @@ export function namespace<T>(ns: string, binding: T): Namespaced<T> {
return (keys: unknown, ...args: unknown[]) => {
if (typeof keys === "string") keys = ns + keys;
if (Array.isArray(keys)) keys = keys.map((key) => ns + key);
return (value as (...args: unknown[]) => unknown)(keys, ...args);
const result = (value as (...args: unknown[]) => unknown)(
keys,
...args
);
if (result instanceof Promise) {
return result.then((res) => {
if (res instanceof Map) {
const newResult = new Map<string, unknown>();
for (const [key, value] of res) {
newResult.set(key.slice(ns.length), value);
}
return newResult;
}
return res;
});
}
return result;
};
}
return value;
Expand Down Expand Up @@ -83,7 +99,7 @@ export function miniflareTest<
status: 500,
headers: { "MF-Experimental-Error-Stack": "true" },
});
}
}
}
}
`;
Expand Down
Loading