Skip to content
Merged
5 changes: 5 additions & 0 deletions .changeset/modern-laws-happen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/cloudflare": patch
---

Add sharding replication for the Durable Object Tag Cache
10 changes: 9 additions & 1 deletion examples/e2e/app-router/open-next.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import doQueue from "@opennextjs/cloudflare/durable-queue";

export default defineCloudflareConfig({
incrementalCache: kvIncrementalCache,
tagCache: shardedTagCache({ numberOfShards: 12 }),
// With such a configuration, we could have up to 12 * (8 + 2) = 120 Durable Objects instances
tagCache: shardedTagCache({
numberOfShards: 12,
enableShardReplication: true,
shardReplicationOptions: {
numberOfSoftReplicas: 8,
numberOfHardReplicas: 2,
},
}),
queue: doQueue,
});
3 changes: 1 addition & 2 deletions examples/e2e/app-router/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
"start": "next start --port 3001",
"lint": "next lint",
"clean": "rm -rf .turbo node_modules .next .open-next",
"d1:clean": "wrangler d1 execute NEXT_CACHE_D1 --command \"DROP TABLE IF EXISTS tags; DROP TABLE IF EXISTS revalidations\"",
"build:worker": "pnpm d1:clean && pnpm opennextjs-cloudflare build",
"build:worker": "pnpm opennextjs-cloudflare build",
"preview:worker": "pnpm opennextjs-cloudflare preview",
"preview": "pnpm build:worker && pnpm preview:worker",
"e2e": "playwright test -c e2e/playwright.config.ts"
Expand Down
9 changes: 1 addition & 8 deletions examples/e2e/app-router/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"class_name": "DurableObjectQueueHandler"
},
{
"name": "NEXT_CACHE_D1_SHARDED",
"name": "NEXT_CACHE_DO_SHARDED",
"class_name": "DOShardedTagCache"
}
]
Expand All @@ -32,13 +32,6 @@
"id": "<BINDING_ID>"
}
],
"d1_databases": [
{
"binding": "NEXT_CACHE_D1",
"database_id": "NEXT_CACHE_D1",
"database_name": "NEXT_CACHE_D1"
}
],
"services": [
{
"binding": "NEXT_CACHE_REVALIDATION_WORKER",
Expand Down
6 changes: 5 additions & 1 deletion packages/cloudflare/src/api/cloudflare-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ declare global {
// Durable Object namespace to use for the durable object queue handler
NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace<DurableObjectQueueHandler>;
// Durables object namespace to use for the sharded tag cache
NEXT_CACHE_D1_SHARDED?: DurableObjectNamespace<DOShardedTagCache>;
NEXT_CACHE_DO_SHARDED?: DurableObjectNamespace<DOShardedTagCache>;
// Queue of failed tag write
// It could be used for monitoring or to reprocess failed writes
// Entirely optional
NEXT_CACHE_DO_SHARDED_DLQ?: Queue;

// Asset binding
ASSETS?: Fetcher;
Expand Down
178 changes: 163 additions & 15 deletions packages/cloudflare/src/api/do-sharded-tag-cache.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import doShardedTagCache from "./do-sharded-tag-cache";
import doShardedTagCache, {
DEFAULT_HARD_REPLICAS,
DEFAULT_SOFT_REPLICAS,
TagCacheDOId,
} from "./do-sharded-tag-cache";

const hasBeenRevalidatedMock = vi.fn();
const writeTagsMock = vi.fn();
Expand All @@ -9,9 +13,15 @@ const getMock = vi
.fn()
.mockReturnValue({ hasBeenRevalidated: hasBeenRevalidatedMock, writeTags: writeTagsMock });
const waitUntilMock = vi.fn().mockImplementation(async (fn) => fn());
const sendDLQMock = vi.fn();
vi.mock("./cloudflare-context", () => ({
getCloudflareContext: () => ({
env: { NEXT_CACHE_D1_SHARDED: { idFromName: idFromNameMock, get: getMock } },
env: {
NEXT_CACHE_DO_SHARDED: { idFromName: idFromNameMock, get: getMock },
NEXT_CACHE_DO_SHARDED_DLQ: {
send: sendDLQMock,
},
},
ctx: { waitUntil: waitUntilMock },
}),
}));
Expand All @@ -23,24 +33,67 @@ describe("DOShardedTagCache", () => {
it("should generate a shardId", () => {
const cache = doShardedTagCache();
const expectedResult = new Map();
expectedResult.set("shard-1", ["tag1"]);
expectedResult.set("shard-2", ["tag2"]);
expect(cache.generateShards(["tag1", "tag2"])).toEqual(expectedResult);
expectedResult.set("tag-hard;shard-1;replica-1", expect.objectContaining({ tags: ["tag1"] }));
expectedResult.set("tag-hard;shard-2;replica-1", expect.objectContaining({ tags: ["tag2"] }));
expect(cache.generateShards({ tags: ["tag1", "tag2"] })).toEqual(expectedResult);
});

it("should group tags by shard", () => {
const cache = doShardedTagCache();
const expectedResult = new Map();
expectedResult.set("shard-1", ["tag1", "tag6"]);
expect(cache.generateShards(["tag1", "tag6"])).toEqual(expectedResult);
expectedResult.set("tag-hard;shard-1;replica-1", expect.objectContaining({ tags: ["tag1", "tag6"] }));
expect(cache.generateShards({ tags: ["tag1", "tag6"] })).toEqual(expectedResult);
});

it("should generate the same shardId for the same tag", () => {
const cache = doShardedTagCache();
const firstResult = cache.generateShards(["tag1"]);
const secondResult = cache.generateShards(["tag1", "tag3", "tag4"]);
const firstResult = cache.generateShards({ tags: ["tag1"] });
const secondResult = cache.generateShards({ tags: ["tag1", "tag3", "tag4"] });
expect(firstResult.get("shard-1")).toEqual(secondResult.get("shard-1"));
});

it("should split hard and soft tags", () => {
const cache = doShardedTagCache();
const expectedResult = new Map();
expectedResult.set("tag-hard;shard-1;replica-1", expect.objectContaining({ tags: ["tag1"] }));
expectedResult.set("tag-soft;shard-3;replica-1", expect.objectContaining({ tags: ["_N_T_/tag1"] }));
expect(cache.generateShards({ tags: ["tag1", "_N_T_/tag1"] })).toEqual(expectedResult);
});

describe("with shard replication", () => {
it("should generate all shards if generateAllShards is true", () => {
const cache = doShardedTagCache({ numberOfShards: 4, enableShardReplication: true });
const expectedResult = new Map();
expectedResult.set("tag-hard;shard-1;replica-1", expect.objectContaining({ tags: ["tag1"] }));
expectedResult.set("tag-hard;shard-1;replica-2", expect.objectContaining({ tags: ["tag1"] }));
expectedResult.set("tag-soft;shard-3;replica-1", expect.objectContaining({ tags: ["_N_T_/tag1"] }));
expectedResult.set("tag-soft;shard-3;replica-2", expect.objectContaining({ tags: ["_N_T_/tag1"] }));
expectedResult.set("tag-soft;shard-3;replica-3", expect.objectContaining({ tags: ["_N_T_/tag1"] }));
expectedResult.set("tag-soft;shard-3;replica-4", expect.objectContaining({ tags: ["_N_T_/tag1"] }));
expect(cache.generateShards({ tags: ["tag1", "_N_T_/tag1"], generateAllShards: true })).toEqual(
expectedResult
);
});

it("should generate only one shard if generateAllShards is false", () => {
const cache = doShardedTagCache({ numberOfShards: 4, enableShardReplication: true });
const shardedMap = cache.generateShards({ tags: ["tag1", "_N_T_/tag1"], generateAllShards: false });
expect(shardedMap.size).toBe(2);
const shardIds = Array.from(shardedMap.keys());
// We can't test against a specific shard id because the last part is random
expect(shardIds[0]).toMatch(/tag-soft;shard-3;replica-\d/);
expect(shardIds[1]).toMatch(/tag-hard;shard-1;replica-\d/);

// We still need to check if the last part is between the correct boundaries
const shardId = shardIds[0]?.substring(shardIds[0].lastIndexOf("-") + 1) ?? "";
expect(parseInt(shardId)).toBeGreaterThanOrEqual(1);
expect(parseInt(shardId)).toBeLessThanOrEqual(DEFAULT_SOFT_REPLICAS);

const shardId2 = shardIds[1]?.substring(shardIds[1].lastIndexOf("-") + 1) ?? "";
expect(parseInt(shardId2)).toBeGreaterThanOrEqual(1);
expect(parseInt(shardId2)).toBeLessThanOrEqual(DEFAULT_HARD_REPLICAS);
});
});
});

describe("hasBeenRevalidated", () => {
Expand Down Expand Up @@ -130,6 +183,11 @@ describe("DOShardedTagCache", () => {
globalThis.openNextConfig = {
dangerous: { disableTagCache: false },
};
vi.useFakeTimers();
vi.setSystemTime(1000);
});
afterEach(() => {
vi.useRealTimers();
});
it("should return early if the cache is disabled", async () => {
globalThis.openNextConfig = {
Expand All @@ -146,24 +204,37 @@ describe("DOShardedTagCache", () => {
await cache.writeTags(["tag1"]);
expect(idFromNameMock).toHaveBeenCalled();
expect(writeTagsMock).toHaveBeenCalled();
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"]);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
});

it("should write the tags to the cache for multiple shards", async () => {
const cache = doShardedTagCache();
await cache.writeTags(["tag1", "tag2"]);
expect(idFromNameMock).toHaveBeenCalledTimes(2);
expect(writeTagsMock).toHaveBeenCalledTimes(2);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"]);
expect(writeTagsMock).toHaveBeenCalledWith(["tag2"]);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
expect(writeTagsMock).toHaveBeenCalledWith(["tag2"], 1000);
});

it('should write to all the double sharded shards if "generateAllShards" is true', async () => {
const cache = doShardedTagCache({ numberOfShards: 4, enableShardReplication: true });
await cache.writeTags(["tag1", "_N_T_/tag1"]);
expect(idFromNameMock).toHaveBeenCalledTimes(6);
expect(writeTagsMock).toHaveBeenCalledTimes(6);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
expect(writeTagsMock).toHaveBeenCalledWith(["_N_T_/tag1"], 1000);
});

it("should call deleteRegionalCache", async () => {
const cache = doShardedTagCache();
cache.deleteRegionalCache = vi.fn();
await cache.writeTags(["tag1"]);
expect(cache.deleteRegionalCache).toHaveBeenCalled();
expect(cache.deleteRegionalCache).toHaveBeenCalledWith("shard-1", ["tag1"]);
expect(cache.deleteRegionalCache).toHaveBeenCalledWith(
expect.objectContaining({ key: "tag-hard;shard-1;replica-1" }),
["tag1"]
);
// expect(cache.deleteRegionalCache).toHaveBeenCalledWith("tag-hard;shard-1;replica-1", ["tag1"]);
});
});

Expand All @@ -190,7 +261,12 @@ describe("DOShardedTagCache", () => {
describe("getFromRegionalCache", () => {
it("should return undefined if regional cache is disabled", async () => {
const cache = doShardedTagCache();
expect(await cache.getFromRegionalCache("shard-1", ["tag1"])).toBeUndefined();
const doId = new TagCacheDOId({
baseShardId: "shard-1",
numberOfReplicas: 1,
shardType: "hard",
});
expect(await cache.getFromRegionalCache(doId, ["tag1"])).toBeUndefined();
});

it("should call .match on the cache", async () => {
Expand All @@ -201,9 +277,81 @@ describe("DOShardedTagCache", () => {
}),
};
const cache = doShardedTagCache({ numberOfShards: 4, regionalCache: true });
expect(await cache.getFromRegionalCache("shard-1", ["tag1"])).toBe("response");
const doId = new TagCacheDOId({
baseShardId: "shard-1",
numberOfReplicas: 1,
shardType: "hard",
});
expect(await cache.getFromRegionalCache(doId, ["tag1"])).toBe("response");
// @ts-expect-error - Defined on cloudfare context
globalThis.caches = undefined;
});
});

describe("getCacheKey", () => {
it("should return the cache key without the random part", async () => {
const cache = doShardedTagCache();
const doId1 = new TagCacheDOId({ baseShardId: "shard-0", numberOfReplicas: 1, shardType: "hard" });
const reqKey = await cache.getCacheKey(doId1, ["_N_T_/tag1"]);
expect(reqKey.url).toBe("http://local.cache/shard/tag-hard;shard-0?tags=_N_T_%2Ftag1");

const doId2 = new TagCacheDOId({
baseShardId: "shard-1",
numberOfReplicas: 1,
shardType: "hard",
});
const reqKey2 = await cache.getCacheKey(doId2, ["tag1"]);
expect(reqKey2.url).toBe("http://local.cache/shard/tag-hard;shard-1?tags=tag1");
});
});

describe("performWriteTagsWithRetry", () => {
it("should retry if it fails", async () => {
vi.useFakeTimers();
vi.setSystemTime(1000);
const cache = doShardedTagCache();
writeTagsMock.mockImplementationOnce(() => {
throw new Error("error");
});
const spiedFn = vi.spyOn(cache, "performWriteTagsWithRetry");
const doId = new TagCacheDOId({
baseShardId: "shard-1",
numberOfReplicas: 1,
shardType: "hard",
});
await cache.performWriteTagsWithRetry(doId, ["tag1"], Date.now());
expect(writeTagsMock).toHaveBeenCalledTimes(2);
expect(spiedFn).toHaveBeenCalledTimes(2);
expect(spiedFn).toHaveBeenCalledWith(doId, ["tag1"], 1000, 1);
expect(sendDLQMock).not.toHaveBeenCalled();

vi.useRealTimers();
});

it("should stop retrying after 3 times", async () => {
vi.useFakeTimers();
vi.setSystemTime(1000);
const cache = doShardedTagCache();
writeTagsMock.mockImplementationOnce(() => {
throw new Error("error");
});
const spiedFn = vi.spyOn(cache, "performWriteTagsWithRetry");
await cache.performWriteTagsWithRetry(
new TagCacheDOId({ baseShardId: "shard-1", numberOfReplicas: 1, shardType: "hard" }),
["tag1"],
Date.now(),
3
);
expect(writeTagsMock).toHaveBeenCalledTimes(1);
expect(spiedFn).toHaveBeenCalledTimes(1);

expect(sendDLQMock).toHaveBeenCalledWith({
failingShardId: "tag-hard;shard-1;replica-1",
failingTags: ["tag1"],
lastModified: 1000,
});

vi.useRealTimers();
});
});
});
Loading