Skip to content
5 changes: 5 additions & 0 deletions .changeset/modern-laws-happen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/cloudflare": patch
---

Add double sharding for the Durable Object Tag Cache
10 changes: 9 additions & 1 deletion examples/e2e/app-router/open-next.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import doQueue from "@opennextjs/cloudflare/durable-queue";

export default defineCloudflareConfig({
incrementalCache: kvIncrementalCache,
tagCache: shardedTagCache({ numberOfShards: 12 }),
// With such a configuration, we could have up to 12 * 8 + 12 * 2 = 120 Durable Objects instances
tagCache: shardedTagCache({
numberOfShards: 12,
enableShardReplication: true,
shardReplicationOptions: {
numberOfSoftReplicas: 8,
numberOfHardReplicas: 2,
},
}),
queue: doQueue,
});
2 changes: 2 additions & 0 deletions packages/cloudflare/src/api/cloudflare-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ declare global {
NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace<DurableObjectQueueHandler>;
// Durables object namespace to use for the sharded tag cache
NEXT_CACHE_D1_SHARDED?: DurableObjectNamespace<DOShardedTagCache>;
// Dead letter queue for the D1 sharded tag cache
NEXT_CACHE_D1_SHARDED_DLQ?: Queue;

// Asset binding
ASSETS?: Fetcher;
Expand Down
140 changes: 127 additions & 13 deletions packages/cloudflare/src/api/do-sharded-tag-cache.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import doShardedTagCache from "./do-sharded-tag-cache";
import doShardedTagCache, { DEFAULT_MAX_HARD_SHARDS, DEFAULT_MAX_SOFT_SHARDS } from "./do-sharded-tag-cache";

const hasBeenRevalidatedMock = vi.fn();
const writeTagsMock = vi.fn();
Expand All @@ -9,9 +9,15 @@ const getMock = vi
.fn()
.mockReturnValue({ hasBeenRevalidated: hasBeenRevalidatedMock, writeTags: writeTagsMock });
const waitUntilMock = vi.fn().mockImplementation(async (fn) => fn());
const sendDLQMock = vi.fn();
vi.mock("./cloudflare-context", () => ({
getCloudflareContext: () => ({
env: { NEXT_CACHE_D1_SHARDED: { idFromName: idFromNameMock, get: getMock } },
env: {
NEXT_CACHE_D1_SHARDED: { idFromName: idFromNameMock, get: getMock },
NEXT_CACHE_D1_SHARDED_DLQ: {
send: sendDLQMock,
},
},
ctx: { waitUntil: waitUntilMock },
}),
}));
Expand All @@ -23,24 +29,67 @@ describe("DOShardedTagCache", () => {
it("should generate a shardId", () => {
const cache = doShardedTagCache();
const expectedResult = new Map();
expectedResult.set("shard-1", ["tag1"]);
expectedResult.set("shard-2", ["tag2"]);
expect(cache.generateShards(["tag1", "tag2"])).toEqual(expectedResult);
expectedResult.set("tag-hard;shard-1;replica-1", ["tag1"]);
expectedResult.set("tag-hard;shard-2;replica-1", ["tag2"]);
expect(cache.generateShards({ tags: ["tag1", "tag2"] })).toEqual(expectedResult);
});

it("should group tags by shard", () => {
const cache = doShardedTagCache();
const expectedResult = new Map();
expectedResult.set("shard-1", ["tag1", "tag6"]);
expect(cache.generateShards(["tag1", "tag6"])).toEqual(expectedResult);
expectedResult.set("tag-hard;shard-1;replica-1", ["tag1", "tag6"]);
expect(cache.generateShards({ tags: ["tag1", "tag6"] })).toEqual(expectedResult);
});

it("should generate the same shardId for the same tag", () => {
const cache = doShardedTagCache();
const firstResult = cache.generateShards(["tag1"]);
const secondResult = cache.generateShards(["tag1", "tag3", "tag4"]);
const firstResult = cache.generateShards({ tags: ["tag1"] });
const secondResult = cache.generateShards({ tags: ["tag1", "tag3", "tag4"] });
expect(firstResult.get("shard-1")).toEqual(secondResult.get("shard-1"));
});

it("should split hard and soft tags", () => {
const cache = doShardedTagCache();
const expectedResult = new Map();
expectedResult.set("tag-hard;shard-1;replica-1", ["tag1"]);
expectedResult.set("tag-soft;shard-3;replica-1", ["_N_T_/tag1"]);
expect(cache.generateShards({ tags: ["tag1", "_N_T_/tag1"] })).toEqual(expectedResult);
});

describe("with shard replication", () => {
it("should generate all shards if generateAllShards is true", () => {
const cache = doShardedTagCache({ numberOfShards: 4, enableShardReplication: true });
const expectedResult = new Map();
expectedResult.set("tag-hard;shard-1;replica-1", ["tag1"]);
expectedResult.set("tag-hard;shard-1;replica-2", ["tag1"]);
expectedResult.set("tag-soft;shard-3;replica-1", ["_N_T_/tag1"]);
expectedResult.set("tag-soft;shard-3;replica-2", ["_N_T_/tag1"]);
expectedResult.set("tag-soft;shard-3;replica-3", ["_N_T_/tag1"]);
expectedResult.set("tag-soft;shard-3;replica-4", ["_N_T_/tag1"]);
expect(cache.generateShards({ tags: ["tag1", "_N_T_/tag1"], generateAllShards: true })).toEqual(
expectedResult
);
});

it("should generate only one shard if generateAllShards is false", () => {
const cache = doShardedTagCache({ numberOfShards: 4, enableShardReplication: true });
const shardedMap = cache.generateShards({ tags: ["tag1", "_N_T_/tag1"], generateAllShards: false });
expect(shardedMap.size).toBe(2);
const shardIds = Array.from(shardedMap.keys());
// We can't test against a specific shard id because the last part is random
expect(shardIds[0]).toMatch(/tag-soft;shard-3;replica-\d/);
expect(shardIds[1]).toMatch(/tag-hard;shard-1;replica-\d/);

// We still need to check if the last part is between the correct boundaries
const shardId = shardIds[0]?.substring(shardIds[0].lastIndexOf("-") + 1) ?? "";
expect(parseInt(shardId)).toBeGreaterThanOrEqual(1);
expect(parseInt(shardId)).toBeLessThanOrEqual(DEFAULT_MAX_SOFT_SHARDS);

const shardId2 = shardIds[1]?.substring(shardIds[1].lastIndexOf("-") + 1) ?? "";
expect(parseInt(shardId2)).toBeGreaterThanOrEqual(1);
expect(parseInt(shardId2)).toBeLessThanOrEqual(DEFAULT_MAX_HARD_SHARDS);
});
});
});

describe("hasBeenRevalidated", () => {
Expand Down Expand Up @@ -130,6 +179,11 @@ describe("DOShardedTagCache", () => {
globalThis.openNextConfig = {
dangerous: { disableTagCache: false },
};
vi.useFakeTimers();
vi.setSystemTime(1000);
});
afterEach(() => {
vi.useRealTimers();
});
it("should return early if the cache is disabled", async () => {
globalThis.openNextConfig = {
Expand All @@ -146,24 +200,33 @@ describe("DOShardedTagCache", () => {
await cache.writeTags(["tag1"]);
expect(idFromNameMock).toHaveBeenCalled();
expect(writeTagsMock).toHaveBeenCalled();
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"]);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
});

it("should write the tags to the cache for multiple shards", async () => {
const cache = doShardedTagCache();
await cache.writeTags(["tag1", "tag2"]);
expect(idFromNameMock).toHaveBeenCalledTimes(2);
expect(writeTagsMock).toHaveBeenCalledTimes(2);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"]);
expect(writeTagsMock).toHaveBeenCalledWith(["tag2"]);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
expect(writeTagsMock).toHaveBeenCalledWith(["tag2"], 1000);
});

it('should write to all the double sharded shards if "generateAllShards" is true', async () => {
const cache = doShardedTagCache({ numberOfShards: 4, enableShardReplication: true });
await cache.writeTags(["tag1", "_N_T_/tag1"]);
expect(idFromNameMock).toHaveBeenCalledTimes(6);
expect(writeTagsMock).toHaveBeenCalledTimes(6);
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
expect(writeTagsMock).toHaveBeenCalledWith(["_N_T_/tag1"], 1000);
});

it("should call deleteRegionalCache", async () => {
const cache = doShardedTagCache();
cache.deleteRegionalCache = vi.fn();
await cache.writeTags(["tag1"]);
expect(cache.deleteRegionalCache).toHaveBeenCalled();
expect(cache.deleteRegionalCache).toHaveBeenCalledWith("shard-1", ["tag1"]);
expect(cache.deleteRegionalCache).toHaveBeenCalledWith("tag-hard;shard-1;replica-1", ["tag1"]);
});
});

Expand Down Expand Up @@ -206,4 +269,55 @@ describe("DOShardedTagCache", () => {
globalThis.caches = undefined;
});
});

describe("getCacheKey", () => {
it("should return the cache key without the random part", async () => {
const cache = doShardedTagCache();
const reqKey = await cache.getCacheKey("shard-soft-1-1", ["_N_T_/tag1"]);
expect(reqKey.url).toBe("http://local.cache/shard/shard-soft-1?tags=_N_T_%2Ftag1");

const reqKey2 = await cache.getCacheKey("shard-hard-1-18", ["tag1"]);
expect(reqKey2.url).toBe("http://local.cache/shard/shard-hard-1?tags=tag1");
});
});

describe("performWriteTagsWithRetry", () => {
it("should retry if it fails", async () => {
vi.useFakeTimers();
vi.setSystemTime(1000);
const cache = doShardedTagCache();
writeTagsMock.mockImplementationOnce(() => {
throw new Error("error");
});
const spiedFn = vi.spyOn(cache, "performWriteTagsWithRetry");
await cache.performWriteTagsWithRetry("shard", ["tag1"], Date.now());
expect(writeTagsMock).toHaveBeenCalledTimes(2);
expect(spiedFn).toHaveBeenCalledTimes(2);
expect(spiedFn).toHaveBeenCalledWith("shard", ["tag1"], 1000, 1);
expect(sendDLQMock).not.toHaveBeenCalled();

vi.useRealTimers();
});

it("should stop retrying after 3 times", async () => {
vi.useFakeTimers();
vi.setSystemTime(1000);
const cache = doShardedTagCache();
writeTagsMock.mockImplementationOnce(() => {
throw new Error("error");
});
const spiedFn = vi.spyOn(cache, "performWriteTagsWithRetry");
await cache.performWriteTagsWithRetry("shard", ["tag1"], Date.now(), 3);
expect(writeTagsMock).toHaveBeenCalledTimes(1);
expect(spiedFn).toHaveBeenCalledTimes(1);

expect(sendDLQMock).toHaveBeenCalledWith({
failingShardId: "shard",
failingTags: ["tag1"],
lastModified: 1000,
});

vi.useRealTimers();
});
});
});
Loading