Skip to content
Merged
5 changes: 5 additions & 0 deletions .changeset/modern-laws-happen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/cloudflare": patch
---

Add sharding replication for the Durable Object Tag Cache
10 changes: 9 additions & 1 deletion examples/e2e/app-router/open-next.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import doQueue from "@opennextjs/cloudflare/durable-queue";

export default defineCloudflareConfig({
incrementalCache: kvIncrementalCache,
tagCache: shardedTagCache({ numberOfShards: 12 }),
// With such a configuration, we could have up to 12 * (8 + 2) = 120 Durable Objects instances
tagCache: shardedTagCache({
numberOfShards: 12,
enableShardReplication: true,
shardReplicationOptions: {
numberOfSoftReplicas: 8,
numberOfHardReplicas: 2,
},
}),
queue: doQueue,
});
3 changes: 1 addition & 2 deletions examples/e2e/app-router/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
"start": "next start --port 3001",
"lint": "next lint",
"clean": "rm -rf .turbo node_modules .next .open-next",
"d1:clean": "wrangler d1 execute NEXT_CACHE_D1 --command \"DROP TABLE IF EXISTS tags; DROP TABLE IF EXISTS revalidations\"",
"build:worker": "pnpm d1:clean && pnpm opennextjs-cloudflare build",
"build:worker": "pnpm opennextjs-cloudflare build",
"preview:worker": "pnpm opennextjs-cloudflare preview",
"preview": "pnpm build:worker && pnpm preview:worker",
"e2e": "playwright test -c e2e/playwright.config.ts"
Expand Down
9 changes: 1 addition & 8 deletions examples/e2e/app-router/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"class_name": "DurableObjectQueueHandler"
},
{
"name": "NEXT_CACHE_D1_SHARDED",
"name": "NEXT_CACHE_DO_SHARDED",
"class_name": "DOShardedTagCache"
}
]
Expand All @@ -32,13 +32,6 @@
"id": "<BINDING_ID>"
}
],
"d1_databases": [
{
"binding": "NEXT_CACHE_D1",
"database_id": "NEXT_CACHE_D1",
"database_name": "NEXT_CACHE_D1"
}
],
"services": [
{
"binding": "NEXT_CACHE_REVALIDATION_WORKER",
Expand Down
6 changes: 5 additions & 1 deletion packages/cloudflare/src/api/cloudflare-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ declare global {
// Durable Object namespace to use for the durable object queue handler
NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace<DurableObjectQueueHandler>;
// Durables object namespace to use for the sharded tag cache
NEXT_CACHE_D1_SHARDED?: DurableObjectNamespace<DOShardedTagCache>;
NEXT_CACHE_DO_SHARDED?: DurableObjectNamespace<DOShardedTagCache>;
// Queue of failed tag write
// It could be used for monitoring or to reprocess failed writes
// Entirely optional
NEXT_CACHE_DO_SHARDED_DLQ?: Queue;

// Asset binding
ASSETS?: Fetcher;
Expand Down
204 changes: 183 additions & 21 deletions packages/cloudflare/src/api/do-sharded-tag-cache.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import doShardedTagCache from "./do-sharded-tag-cache";
import doShardedTagCache, {
DEFAULT_HARD_REPLICAS,
DEFAULT_SOFT_REPLICAS,
TagCacheDOId,
} from "./do-sharded-tag-cache";

const hasBeenRevalidatedMock = vi.fn();
const writeTagsMock = vi.fn();
Expand All @@ -9,9 +13,15 @@ const getMock = vi
.fn()
.mockReturnValue({ hasBeenRevalidated: hasBeenRevalidatedMock, writeTags: writeTagsMock });
const waitUntilMock = vi.fn().mockImplementation(async (fn) => fn());
const sendDLQMock = vi.fn();
vi.mock("./cloudflare-context", () => ({
getCloudflareContext: () => ({
env: { NEXT_CACHE_D1_SHARDED: { idFromName: idFromNameMock, get: getMock } },
env: {
NEXT_CACHE_DO_SHARDED: { idFromName: idFromNameMock, get: getMock },
NEXT_CACHE_DO_SHARDED_DLQ: {
send: sendDLQMock,
},
},
ctx: { waitUntil: waitUntilMock },
}),
}));
Expand All @@ -22,24 +32,81 @@ describe("DOShardedTagCache", () => {
describe("generateShardId", () => {
it("should generate a shardId", () => {
const cache = doShardedTagCache();
const expectedResult = new Map();
expectedResult.set("shard-1", ["tag1"]);
expectedResult.set("shard-2", ["tag2"]);
expect(cache.generateShards(["tag1", "tag2"])).toEqual(expectedResult);
const expectedResult = [
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1"] },
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-2" }), tags: ["tag2"] },
];
const result = cache.groupTagsByDO({ tags: ["tag1", "tag2"] });
expect(result).toEqual(expectedResult);
expect(result[0]?.doId.key).toBe("tag-hard;shard-1;replica-1");
expect(result[1]?.doId.key).toBe("tag-hard;shard-2;replica-1");
});

it("should group tags by shard", () => {
const cache = doShardedTagCache();
const expectedResult = new Map();
expectedResult.set("shard-1", ["tag1", "tag6"]);
expect(cache.generateShards(["tag1", "tag6"])).toEqual(expectedResult);
const expectedResult = [
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1", "tag6"] },
];
const result = cache.groupTagsByDO({ tags: ["tag1", "tag6"] });
expect(result).toEqual(expectedResult);
expect(result[0]?.doId.key).toBe("tag-hard;shard-1;replica-1");
});

it("should generate the same shardId for the same tag", () => {
const cache = doShardedTagCache();
const firstResult = cache.generateShards(["tag1"]);
const secondResult = cache.generateShards(["tag1", "tag3", "tag4"]);
expect(firstResult.get("shard-1")).toEqual(secondResult.get("shard-1"));
const firstResult = cache.groupTagsByDO({ tags: ["tag1"] });
const secondResult = cache.groupTagsByDO({ tags: ["tag1", "tag3", "tag4"] });
expect(firstResult[0]).toEqual(secondResult[0]);
});

it("should split hard and soft tags", () => {
const cache = doShardedTagCache();
const expectedResult = [
{ doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] },
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-1", replicaId: 1 }), tags: ["tag1"] },
];
const result = cache.groupTagsByDO({ tags: ["tag1", "_N_T_/tag1"] });
expect(result).toEqual(expectedResult);
expect(result[1]?.doId.key).toBe("tag-hard;shard-1;replica-1");
expect(result[0]?.doId.key).toBe("tag-soft;shard-3;replica-1");
});

describe("with shard replication", () => {
it("should generate all doIds if generateAllReplicas is true", () => {
const cache = doShardedTagCache({ baseShardSize: 4, enableShardReplication: true });
const expectedResult = [
{ doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] },
{ doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] },
{ doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] },
{ doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] },
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1"] },
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1"] },
];
const result = cache.groupTagsByDO({ tags: ["tag1", "_N_T_/tag1"], generateAllReplicas: true });
console.log(result);
expect(result).toEqual(expectedResult);
});

it("should generate only one doIds by tag type if generateAllReplicas is false", () => {
const cache = doShardedTagCache({ baseShardSize: 4, enableShardReplication: true });
const shardedTagCollection = cache.groupTagsByDO({
tags: ["tag1", "_N_T_/tag1"],
generateAllReplicas: false,
});
expect(shardedTagCollection.length).toBe(2);
const firstDOId = shardedTagCollection[0]?.doId;
const secondDOId = shardedTagCollection[1]?.doId;

expect(firstDOId?.shardId).toBe("tag-soft;shard-3");
expect(secondDOId?.shardId).toBe("tag-hard;shard-1");

// We still need to check if the last part is between the correct boundaries
expect(firstDOId?.replicaId).toBeGreaterThanOrEqual(1);
expect(firstDOId?.replicaId).toBeLessThanOrEqual(DEFAULT_SOFT_REPLICAS);

expect(secondDOId?.replicaId).toBeGreaterThanOrEqual(1);
expect(secondDOId?.replicaId).toBeLessThanOrEqual(DEFAULT_HARD_REPLICAS);
});
});
});

Expand Down Expand Up @@ -115,7 +182,7 @@ describe("DOShardedTagCache", () => {
expect(cache.putToRegionalCache).toHaveBeenCalled();
});

it("should call all the shards", async () => {
it("should call all the durable object instance", async () => {
const cache = doShardedTagCache();
cache.getFromRegionalCache = vi.fn();
const result = await cache.hasBeenRevalidated(["tag1", "tag2"], 123456);
Expand All @@ -130,6 +197,11 @@ describe("DOShardedTagCache", () => {
globalThis.openNextConfig = {
dangerous: { disableTagCache: false },
};
vi.useFakeTimers();
vi.setSystemTime(1000);
});
afterEach(() => {
vi.useRealTimers();
});
it("should return early if the cache is disabled", async () => {
globalThis.openNextConfig = {
Expand All @@ -146,24 +218,37 @@ describe("DOShardedTagCache", () => {
await cache.writeTags(["tag1"]);
expect(idFromNameMock).toHaveBeenCalled();
expect(writeTagsMock).toHaveBeenCalled();
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"]);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
});

it("should write the tags to the cache for multiple shards", async () => {
const cache = doShardedTagCache();
await cache.writeTags(["tag1", "tag2"]);
expect(idFromNameMock).toHaveBeenCalledTimes(2);
expect(writeTagsMock).toHaveBeenCalledTimes(2);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"]);
expect(writeTagsMock).toHaveBeenCalledWith(["tag2"]);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
expect(writeTagsMock).toHaveBeenCalledWith(["tag2"], 1000);
});

it('should write to all the replicated shards if "generateAllReplicas" is true', async () => {
const cache = doShardedTagCache({ baseShardSize: 4, enableShardReplication: true });
await cache.writeTags(["tag1", "_N_T_/tag1"]);
expect(idFromNameMock).toHaveBeenCalledTimes(6);
expect(writeTagsMock).toHaveBeenCalledTimes(6);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
expect(writeTagsMock).toHaveBeenCalledWith(["_N_T_/tag1"], 1000);
});

it("should call deleteRegionalCache", async () => {
const cache = doShardedTagCache();
cache.deleteRegionalCache = vi.fn();
await cache.writeTags(["tag1"]);
expect(cache.deleteRegionalCache).toHaveBeenCalled();
expect(cache.deleteRegionalCache).toHaveBeenCalledWith("shard-1", ["tag1"]);
expect(cache.deleteRegionalCache).toHaveBeenCalledWith(
expect.objectContaining({ key: "tag-hard;shard-1;replica-1" }),
["tag1"]
);
// expect(cache.deleteRegionalCache).toHaveBeenCalledWith("tag-hard;shard-1;replica-1", ["tag1"]);
});
});

Expand All @@ -178,7 +263,7 @@ describe("DOShardedTagCache", () => {
globalThis.caches = {
open: vi.fn().mockResolvedValue("cache"),
};
const cache = doShardedTagCache({ numberOfShards: 4, regionalCache: true });
const cache = doShardedTagCache({ baseShardSize: 4, regionalCache: true });
expect(cache.localCache).toBeUndefined();
expect(await cache.getCacheInstance()).toBe("cache");
expect(cache.localCache).toBe("cache");
Expand All @@ -190,7 +275,12 @@ describe("DOShardedTagCache", () => {
describe("getFromRegionalCache", () => {
it("should return undefined if regional cache is disabled", async () => {
const cache = doShardedTagCache();
expect(await cache.getFromRegionalCache("shard-1", ["tag1"])).toBeUndefined();
const doId = new TagCacheDOId({
baseShardId: "shard-1",
numberOfReplicas: 1,
shardType: "hard",
});
expect(await cache.getFromRegionalCache(doId, ["tag1"])).toBeUndefined();
});

it("should call .match on the cache", async () => {
Expand All @@ -200,10 +290,82 @@ describe("DOShardedTagCache", () => {
match: vi.fn().mockResolvedValue("response"),
}),
};
const cache = doShardedTagCache({ numberOfShards: 4, regionalCache: true });
expect(await cache.getFromRegionalCache("shard-1", ["tag1"])).toBe("response");
const cache = doShardedTagCache({ baseShardSize: 4, regionalCache: true });
const doId = new TagCacheDOId({
baseShardId: "shard-1",
numberOfReplicas: 1,
shardType: "hard",
});
expect(await cache.getFromRegionalCache(doId, ["tag1"])).toBe("response");
// @ts-expect-error - Defined on cloudfare context
globalThis.caches = undefined;
});
});

describe("getCacheKey", () => {
it("should return the cache key without the random part", async () => {
const cache = doShardedTagCache();
const doId1 = new TagCacheDOId({ baseShardId: "shard-0", numberOfReplicas: 1, shardType: "hard" });
const reqKey = await cache.getCacheKey(doId1, ["_N_T_/tag1"]);
expect(reqKey.url).toBe("http://local.cache/shard/tag-hard;shard-0?tags=_N_T_%2Ftag1");

const doId2 = new TagCacheDOId({
baseShardId: "shard-1",
numberOfReplicas: 1,
shardType: "hard",
});
const reqKey2 = await cache.getCacheKey(doId2, ["tag1"]);
expect(reqKey2.url).toBe("http://local.cache/shard/tag-hard;shard-1?tags=tag1");
});
});

describe("performWriteTagsWithRetry", () => {
it("should retry if it fails", async () => {
vi.useFakeTimers();
vi.setSystemTime(1000);
const cache = doShardedTagCache();
writeTagsMock.mockImplementationOnce(() => {
throw new Error("error");
});
const spiedFn = vi.spyOn(cache, "performWriteTagsWithRetry");
const doId = new TagCacheDOId({
baseShardId: "shard-1",
numberOfReplicas: 1,
shardType: "hard",
});
await cache.performWriteTagsWithRetry(doId, ["tag1"], Date.now());
expect(writeTagsMock).toHaveBeenCalledTimes(2);
expect(spiedFn).toHaveBeenCalledTimes(2);
expect(spiedFn).toHaveBeenCalledWith(doId, ["tag1"], 1000, 1);
expect(sendDLQMock).not.toHaveBeenCalled();

vi.useRealTimers();
});

it("should stop retrying after 3 times", async () => {
vi.useFakeTimers();
vi.setSystemTime(1000);
const cache = doShardedTagCache();
writeTagsMock.mockImplementationOnce(() => {
throw new Error("error");
});
const spiedFn = vi.spyOn(cache, "performWriteTagsWithRetry");
await cache.performWriteTagsWithRetry(
new TagCacheDOId({ baseShardId: "shard-1", numberOfReplicas: 1, shardType: "hard" }),
["tag1"],
Date.now(),
3
);
expect(writeTagsMock).toHaveBeenCalledTimes(1);
expect(spiedFn).toHaveBeenCalledTimes(1);

expect(sendDLQMock).toHaveBeenCalledWith({
failingShardId: "tag-hard;shard-1;replica-1",
failingTags: ["tag1"],
lastModified: 1000,
});

vi.useRealTimers();
});
});
});
Loading