diff --git a/.changeset/modern-laws-happen.md b/.changeset/modern-laws-happen.md new file mode 100644 index 00000000..dd7af609 --- /dev/null +++ b/.changeset/modern-laws-happen.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/cloudflare": patch +--- + +Add sharding replication for the Durable Object Tag Cache diff --git a/examples/e2e/app-router/open-next.config.ts b/examples/e2e/app-router/open-next.config.ts index 60c80399..01e59f15 100644 --- a/examples/e2e/app-router/open-next.config.ts +++ b/examples/e2e/app-router/open-next.config.ts @@ -5,6 +5,14 @@ import doQueue from "@opennextjs/cloudflare/durable-queue"; export default defineCloudflareConfig({ incrementalCache: kvIncrementalCache, - tagCache: shardedTagCache({ numberOfShards: 12 }), + // With such a configuration, we could have up to 12 * (8 + 2) = 120 Durable Objects instances + tagCache: shardedTagCache({ + numberOfShards: 12, + enableShardReplication: true, + shardReplicationOptions: { + numberOfSoftReplicas: 8, + numberOfHardReplicas: 2, + }, + }), queue: doQueue, }); diff --git a/examples/e2e/app-router/package.json b/examples/e2e/app-router/package.json index 5c71e9a5..d8b58f80 100644 --- a/examples/e2e/app-router/package.json +++ b/examples/e2e/app-router/package.json @@ -9,8 +9,7 @@ "start": "next start --port 3001", "lint": "next lint", "clean": "rm -rf .turbo node_modules .next .open-next", - "d1:clean": "wrangler d1 execute NEXT_CACHE_D1 --command \"DROP TABLE IF EXISTS tags; DROP TABLE IF EXISTS revalidations\"", - "build:worker": "pnpm d1:clean && pnpm opennextjs-cloudflare build", + "build:worker": "pnpm opennextjs-cloudflare build", "preview:worker": "pnpm opennextjs-cloudflare preview", "preview": "pnpm build:worker && pnpm preview:worker", "e2e": "playwright test -c e2e/playwright.config.ts" diff --git a/examples/e2e/app-router/wrangler.jsonc b/examples/e2e/app-router/wrangler.jsonc index abb7ff27..18c8e4ad 100644 --- a/examples/e2e/app-router/wrangler.jsonc +++ b/examples/e2e/app-router/wrangler.jsonc @@ -15,7 +15,7 @@ "class_name": "DurableObjectQueueHandler" }, { - "name": "NEXT_CACHE_D1_SHARDED", + "name": "NEXT_CACHE_DO_SHARDED", "class_name": "DOShardedTagCache" } ] @@ -32,13 +32,6 @@ "id": "" } ], - "d1_databases": [ - { - "binding": "NEXT_CACHE_D1", - "database_id": "NEXT_CACHE_D1", - "database_name": "NEXT_CACHE_D1" - } - ], "services": [ { "binding": "NEXT_CACHE_REVALIDATION_WORKER", diff --git a/packages/cloudflare/src/api/cloudflare-context.ts b/packages/cloudflare/src/api/cloudflare-context.ts index f0d7cdc0..b3614350 100644 --- a/packages/cloudflare/src/api/cloudflare-context.ts +++ b/packages/cloudflare/src/api/cloudflare-context.ts @@ -22,7 +22,11 @@ declare global { // Durable Object namespace to use for the durable object queue handler NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace; // Durables object namespace to use for the sharded tag cache - NEXT_CACHE_D1_SHARDED?: DurableObjectNamespace; + NEXT_CACHE_DO_SHARDED?: DurableObjectNamespace; + // Queue of failed tag write + // It could be used for monitoring or to reprocess failed writes + // Entirely optional + NEXT_CACHE_DO_SHARDED_DLQ?: Queue; // Asset binding ASSETS?: Fetcher; diff --git a/packages/cloudflare/src/api/do-sharded-tag-cache.spec.ts b/packages/cloudflare/src/api/do-sharded-tag-cache.spec.ts index c7572787..4476e88c 100644 --- a/packages/cloudflare/src/api/do-sharded-tag-cache.spec.ts +++ b/packages/cloudflare/src/api/do-sharded-tag-cache.spec.ts @@ -1,6 +1,10 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import doShardedTagCache from "./do-sharded-tag-cache"; +import doShardedTagCache, { + DEFAULT_HARD_REPLICAS, + DEFAULT_SOFT_REPLICAS, + TagCacheDOId, +} from "./do-sharded-tag-cache"; const hasBeenRevalidatedMock = vi.fn(); const writeTagsMock = vi.fn(); @@ -9,9 +13,15 @@ const getMock = vi .fn() .mockReturnValue({ hasBeenRevalidated: hasBeenRevalidatedMock, writeTags: writeTagsMock }); const waitUntilMock = vi.fn().mockImplementation(async (fn) => fn()); +const sendDLQMock = vi.fn(); vi.mock("./cloudflare-context", () => ({ getCloudflareContext: () => ({ - env: { NEXT_CACHE_D1_SHARDED: { idFromName: idFromNameMock, get: getMock } }, + env: { + NEXT_CACHE_DO_SHARDED: { idFromName: idFromNameMock, get: getMock }, + NEXT_CACHE_DO_SHARDED_DLQ: { + send: sendDLQMock, + }, + }, ctx: { waitUntil: waitUntilMock }, }), })); @@ -22,24 +32,81 @@ describe("DOShardedTagCache", () => { describe("generateShardId", () => { it("should generate a shardId", () => { const cache = doShardedTagCache(); - const expectedResult = new Map(); - expectedResult.set("shard-1", ["tag1"]); - expectedResult.set("shard-2", ["tag2"]); - expect(cache.generateShards(["tag1", "tag2"])).toEqual(expectedResult); + const expectedResult = [ + { doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1"] }, + { doId: expect.objectContaining({ shardId: "tag-hard;shard-2" }), tags: ["tag2"] }, + ]; + const result = cache.groupTagsByDO({ tags: ["tag1", "tag2"] }); + expect(result).toEqual(expectedResult); + expect(result[0]?.doId.key).toBe("tag-hard;shard-1;replica-1"); + expect(result[1]?.doId.key).toBe("tag-hard;shard-2;replica-1"); }); it("should group tags by shard", () => { const cache = doShardedTagCache(); - const expectedResult = new Map(); - expectedResult.set("shard-1", ["tag1", "tag6"]); - expect(cache.generateShards(["tag1", "tag6"])).toEqual(expectedResult); + const expectedResult = [ + { doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1", "tag6"] }, + ]; + const result = cache.groupTagsByDO({ tags: ["tag1", "tag6"] }); + expect(result).toEqual(expectedResult); + expect(result[0]?.doId.key).toBe("tag-hard;shard-1;replica-1"); }); it("should generate the same shardId for the same tag", () => { const cache = doShardedTagCache(); - const firstResult = cache.generateShards(["tag1"]); - const secondResult = cache.generateShards(["tag1", "tag3", "tag4"]); - expect(firstResult.get("shard-1")).toEqual(secondResult.get("shard-1")); + const firstResult = cache.groupTagsByDO({ tags: ["tag1"] }); + const secondResult = cache.groupTagsByDO({ tags: ["tag1", "tag3", "tag4"] }); + expect(firstResult[0]).toEqual(secondResult[0]); + }); + + it("should split hard and soft tags", () => { + const cache = doShardedTagCache(); + const expectedResult = [ + { doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] }, + { doId: expect.objectContaining({ shardId: "tag-hard;shard-1", replicaId: 1 }), tags: ["tag1"] }, + ]; + const result = cache.groupTagsByDO({ tags: ["tag1", "_N_T_/tag1"] }); + expect(result).toEqual(expectedResult); + expect(result[1]?.doId.key).toBe("tag-hard;shard-1;replica-1"); + expect(result[0]?.doId.key).toBe("tag-soft;shard-3;replica-1"); + }); + + describe("with shard replication", () => { + it("should generate all doIds if generateAllReplicas is true", () => { + const cache = doShardedTagCache({ baseShardSize: 4, enableShardReplication: true }); + const expectedResult = [ + { doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] }, + { doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] }, + { doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] }, + { doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] }, + { doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1"] }, + { doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1"] }, + ]; + const result = cache.groupTagsByDO({ tags: ["tag1", "_N_T_/tag1"], generateAllReplicas: true }); + console.log(result); + expect(result).toEqual(expectedResult); + }); + + it("should generate only one doIds by tag type if generateAllReplicas is false", () => { + const cache = doShardedTagCache({ baseShardSize: 4, enableShardReplication: true }); + const shardedTagCollection = cache.groupTagsByDO({ + tags: ["tag1", "_N_T_/tag1"], + generateAllReplicas: false, + }); + expect(shardedTagCollection.length).toBe(2); + const firstDOId = shardedTagCollection[0]?.doId; + const secondDOId = shardedTagCollection[1]?.doId; + + expect(firstDOId?.shardId).toBe("tag-soft;shard-3"); + expect(secondDOId?.shardId).toBe("tag-hard;shard-1"); + + // We still need to check if the last part is between the correct boundaries + expect(firstDOId?.replicaId).toBeGreaterThanOrEqual(1); + expect(firstDOId?.replicaId).toBeLessThanOrEqual(DEFAULT_SOFT_REPLICAS); + + expect(secondDOId?.replicaId).toBeGreaterThanOrEqual(1); + expect(secondDOId?.replicaId).toBeLessThanOrEqual(DEFAULT_HARD_REPLICAS); + }); }); }); @@ -115,7 +182,7 @@ describe("DOShardedTagCache", () => { expect(cache.putToRegionalCache).toHaveBeenCalled(); }); - it("should call all the shards", async () => { + it("should call all the durable object instance", async () => { const cache = doShardedTagCache(); cache.getFromRegionalCache = vi.fn(); const result = await cache.hasBeenRevalidated(["tag1", "tag2"], 123456); @@ -130,6 +197,11 @@ describe("DOShardedTagCache", () => { globalThis.openNextConfig = { dangerous: { disableTagCache: false }, }; + vi.useFakeTimers(); + vi.setSystemTime(1000); + }); + afterEach(() => { + vi.useRealTimers(); }); it("should return early if the cache is disabled", async () => { globalThis.openNextConfig = { @@ -146,7 +218,7 @@ describe("DOShardedTagCache", () => { await cache.writeTags(["tag1"]); expect(idFromNameMock).toHaveBeenCalled(); expect(writeTagsMock).toHaveBeenCalled(); - expect(writeTagsMock).toHaveBeenCalledWith(["tag1"]); + expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000); }); it("should write the tags to the cache for multiple shards", async () => { @@ -154,8 +226,17 @@ describe("DOShardedTagCache", () => { await cache.writeTags(["tag1", "tag2"]); expect(idFromNameMock).toHaveBeenCalledTimes(2); expect(writeTagsMock).toHaveBeenCalledTimes(2); - expect(writeTagsMock).toHaveBeenCalledWith(["tag1"]); - expect(writeTagsMock).toHaveBeenCalledWith(["tag2"]); + expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000); + expect(writeTagsMock).toHaveBeenCalledWith(["tag2"], 1000); + }); + + it('should write to all the replicated shards if "generateAllReplicas" is true', async () => { + const cache = doShardedTagCache({ baseShardSize: 4, enableShardReplication: true }); + await cache.writeTags(["tag1", "_N_T_/tag1"]); + expect(idFromNameMock).toHaveBeenCalledTimes(6); + expect(writeTagsMock).toHaveBeenCalledTimes(6); + expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000); + expect(writeTagsMock).toHaveBeenCalledWith(["_N_T_/tag1"], 1000); }); it("should call deleteRegionalCache", async () => { @@ -163,7 +244,11 @@ describe("DOShardedTagCache", () => { cache.deleteRegionalCache = vi.fn(); await cache.writeTags(["tag1"]); expect(cache.deleteRegionalCache).toHaveBeenCalled(); - expect(cache.deleteRegionalCache).toHaveBeenCalledWith("shard-1", ["tag1"]); + expect(cache.deleteRegionalCache).toHaveBeenCalledWith( + expect.objectContaining({ key: "tag-hard;shard-1;replica-1" }), + ["tag1"] + ); + // expect(cache.deleteRegionalCache).toHaveBeenCalledWith("tag-hard;shard-1;replica-1", ["tag1"]); }); }); @@ -178,7 +263,7 @@ describe("DOShardedTagCache", () => { globalThis.caches = { open: vi.fn().mockResolvedValue("cache"), }; - const cache = doShardedTagCache({ numberOfShards: 4, regionalCache: true }); + const cache = doShardedTagCache({ baseShardSize: 4, regionalCache: true }); expect(cache.localCache).toBeUndefined(); expect(await cache.getCacheInstance()).toBe("cache"); expect(cache.localCache).toBe("cache"); @@ -190,7 +275,12 @@ describe("DOShardedTagCache", () => { describe("getFromRegionalCache", () => { it("should return undefined if regional cache is disabled", async () => { const cache = doShardedTagCache(); - expect(await cache.getFromRegionalCache("shard-1", ["tag1"])).toBeUndefined(); + const doId = new TagCacheDOId({ + baseShardId: "shard-1", + numberOfReplicas: 1, + shardType: "hard", + }); + expect(await cache.getFromRegionalCache(doId, ["tag1"])).toBeUndefined(); }); it("should call .match on the cache", async () => { @@ -200,10 +290,82 @@ describe("DOShardedTagCache", () => { match: vi.fn().mockResolvedValue("response"), }), }; - const cache = doShardedTagCache({ numberOfShards: 4, regionalCache: true }); - expect(await cache.getFromRegionalCache("shard-1", ["tag1"])).toBe("response"); + const cache = doShardedTagCache({ baseShardSize: 4, regionalCache: true }); + const doId = new TagCacheDOId({ + baseShardId: "shard-1", + numberOfReplicas: 1, + shardType: "hard", + }); + expect(await cache.getFromRegionalCache(doId, ["tag1"])).toBe("response"); // @ts-expect-error - Defined on cloudfare context globalThis.caches = undefined; }); }); + + describe("getCacheKey", () => { + it("should return the cache key without the random part", async () => { + const cache = doShardedTagCache(); + const doId1 = new TagCacheDOId({ baseShardId: "shard-0", numberOfReplicas: 1, shardType: "hard" }); + const reqKey = await cache.getCacheKey(doId1, ["_N_T_/tag1"]); + expect(reqKey.url).toBe("http://local.cache/shard/tag-hard;shard-0?tags=_N_T_%2Ftag1"); + + const doId2 = new TagCacheDOId({ + baseShardId: "shard-1", + numberOfReplicas: 1, + shardType: "hard", + }); + const reqKey2 = await cache.getCacheKey(doId2, ["tag1"]); + expect(reqKey2.url).toBe("http://local.cache/shard/tag-hard;shard-1?tags=tag1"); + }); + }); + + describe("performWriteTagsWithRetry", () => { + it("should retry if it fails", async () => { + vi.useFakeTimers(); + vi.setSystemTime(1000); + const cache = doShardedTagCache(); + writeTagsMock.mockImplementationOnce(() => { + throw new Error("error"); + }); + const spiedFn = vi.spyOn(cache, "performWriteTagsWithRetry"); + const doId = new TagCacheDOId({ + baseShardId: "shard-1", + numberOfReplicas: 1, + shardType: "hard", + }); + await cache.performWriteTagsWithRetry(doId, ["tag1"], Date.now()); + expect(writeTagsMock).toHaveBeenCalledTimes(2); + expect(spiedFn).toHaveBeenCalledTimes(2); + expect(spiedFn).toHaveBeenCalledWith(doId, ["tag1"], 1000, 1); + expect(sendDLQMock).not.toHaveBeenCalled(); + + vi.useRealTimers(); + }); + + it("should stop retrying after 3 times", async () => { + vi.useFakeTimers(); + vi.setSystemTime(1000); + const cache = doShardedTagCache(); + writeTagsMock.mockImplementationOnce(() => { + throw new Error("error"); + }); + const spiedFn = vi.spyOn(cache, "performWriteTagsWithRetry"); + await cache.performWriteTagsWithRetry( + new TagCacheDOId({ baseShardId: "shard-1", numberOfReplicas: 1, shardType: "hard" }), + ["tag1"], + Date.now(), + 3 + ); + expect(writeTagsMock).toHaveBeenCalledTimes(1); + expect(spiedFn).toHaveBeenCalledTimes(1); + + expect(sendDLQMock).toHaveBeenCalledWith({ + failingShardId: "tag-hard;shard-1;replica-1", + failingTags: ["tag1"], + lastModified: 1000, + }); + + vi.useRealTimers(); + }); + }); }); diff --git a/packages/cloudflare/src/api/do-sharded-tag-cache.ts b/packages/cloudflare/src/api/do-sharded-tag-cache.ts index cfb6b59a..530355c9 100644 --- a/packages/cloudflare/src/api/do-sharded-tag-cache.ts +++ b/packages/cloudflare/src/api/do-sharded-tag-cache.ts @@ -6,15 +6,22 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js"; import { getCloudflareContext } from "./cloudflare-context"; -interface ShardedD1TagCacheOptions { +const SOFT_TAG_PREFIX = "_N_T_/"; +export const DEFAULT_SOFT_REPLICAS = 4; +export const DEFAULT_HARD_REPLICAS = 2; +export const DEFAULT_WRITE_RETRIES = 3; +export const DEFAULT_NUM_SHARDS = 4; + +interface ShardedDOTagCacheOptions { /** * The number of shards that will be used. * 1 shards means 1 durable object instance. + * Soft (internal next tags used for `revalidatePath`) and hard tags (the one you define in your app) will be split in different shards. * The number of requests made to Durable Objects will scale linearly with the number of shards. * For example, a request involving 5 tags may access between 1 and 5 shards, with the upper limit being the lesser of the number of tags or the number of shards * @default 4 */ - numberOfShards: number; + baseShardSize: number; /** * Whether to enable a regional cache on a per-shard basis * Because of the way tags are implemented in Next.js, some shards will have more requests than others. For these cases, it is recommended to enable the regional cache. @@ -27,46 +34,162 @@ interface ShardedD1TagCacheOptions { * @default 5 */ regionalCacheTtlSec?: number; + + /** + * Whether to enable shard replication + * Shard replication will duplicate each shards into N replicas to spread the load even more + * All replicas of the a shard contain the same value - write are sent to all of the replicas. + * This allows most frequent read operations to be sent to only one replica to spread the load. + * For example with N being 2, tag `tag1` could be read from 2 different durable object instance + * On read you only need to read from one of the shards, but on write you need to write to all shards + * @default false + */ + enableShardReplication?: boolean; + + /** + * The number of replicas that will be used for shard replication + * Soft shard replicas are more often accessed than hard shard replicas, so it is recommended to have more soft replicas than hard replicas + * Soft replicas are for internal next tags used for `revalidatePath` (i.e. `_N_T_/layout`, `_N_T_/page1`), hard replicas are the tags defined in your app + * @default { numberOfSoftReplicas: 4, numberOfHardReplicas: 2 } + */ + shardReplicationOptions?: { + numberOfSoftReplicas: number; + numberOfHardReplicas: number; + }; + + /** + * The number of retries to perform when writing tags + * @default 3 + */ + maxWriteRetries?: number; } -class ShardedD1TagCache implements NextModeTagCache { + +interface TagCacheDOIdOptions { + baseShardId: string; + numberOfReplicas: number; + shardType: "soft" | "hard"; + replicaId?: number; +} +export class TagCacheDOId { + shardId: string; + replicaId: number; + constructor(public options: TagCacheDOIdOptions) { + const { baseShardId, shardType, numberOfReplicas, replicaId } = options; + this.shardId = `tag-${shardType};${baseShardId}`; + this.replicaId = replicaId ?? this.generateRandomNumberBetween(1, numberOfReplicas); + } + + private generateRandomNumberBetween(min: number, max: number) { + return Math.floor(Math.random() * (max - min + 1) + min); + } + + get key() { + return `${this.shardId};replica-${this.replicaId}`; + } +} +class ShardedDOTagCache implements NextModeTagCache { readonly mode = "nextMode" as const; - readonly name = "sharded-d1-tag-cache"; + readonly name = "sharded-do-tag-cache"; + readonly numSoftReplicas: number; + readonly numHardReplicas: number; + readonly maxWriteRetries: number; localCache?: Cache; - constructor(private opts: ShardedD1TagCacheOptions = { numberOfShards: 4 }) {} + constructor(private opts: ShardedDOTagCacheOptions = { baseShardSize: DEFAULT_NUM_SHARDS }) { + this.numSoftReplicas = opts.shardReplicationOptions?.numberOfSoftReplicas ?? DEFAULT_SOFT_REPLICAS; + this.numHardReplicas = opts.shardReplicationOptions?.numberOfHardReplicas ?? DEFAULT_HARD_REPLICAS; + this.maxWriteRetries = opts.maxWriteRetries ?? DEFAULT_WRITE_RETRIES; + } - private getDurableObjectStub(shardId: string) { - const durableObject = getCloudflareContext().env.NEXT_CACHE_D1_SHARDED; + private getDurableObjectStub(doId: TagCacheDOId) { + const durableObject = getCloudflareContext().env.NEXT_CACHE_DO_SHARDED; if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation"); - const id = durableObject.idFromName(shardId); + const id = durableObject.idFromName(doId.key); return durableObject.get(id); } + /** + * Generates a list of DO ids for the shards and replicas + * @param tags The tags to generate shards for + * @param shardType Whether to generate shards for soft or hard tags + * @param generateAllShards Whether to generate all shards or only one + * @returns An array of TagCacheDOId and tag + */ + private generateDOIdArray({ + tags, + shardType, + generateAllReplicas = false, + }: { + tags: string[]; + shardType: "soft" | "hard"; + generateAllReplicas: boolean; + }) { + let replicaIndexes: Array = [1]; + const isSoft = shardType === "soft"; + let numReplicas = 1; + if (this.opts.enableShardReplication) { + numReplicas = isSoft ? this.numSoftReplicas : this.numHardReplicas; + replicaIndexes = generateAllReplicas + ? Array.from({ length: numReplicas }, (_, i) => i + 1) + : [undefined]; + } + return replicaIndexes.flatMap((replicaId) => { + return tags + .filter((tag) => (isSoft ? tag.startsWith(SOFT_TAG_PREFIX) : !tag.startsWith(SOFT_TAG_PREFIX))) + .map((tag) => { + return { + doId: new TagCacheDOId({ + baseShardId: generateShardId(tag, this.opts.baseShardSize, "shard"), + numberOfReplicas: numReplicas, + shardType, + replicaId, + }), + tag, + }; + }); + }); + } + /** * Same tags are guaranteed to be in the same shard * @param tags - * @returns A map of shardId to tags + * @returns An array of DO ids and tags */ - generateShards(tags: string[]) { - // For each tag, we generate a message group id - const messageGroupIds = tags.map((tag) => ({ - shardId: generateShardId(tag, this.opts.numberOfShards, "shard"), - tag, - })); - // We group the tags by shard - const shards = new Map(); - for (const { shardId, tag } of messageGroupIds) { - const tags = shards.get(shardId) ?? []; - tags.push(tag); - shards.set(shardId, tags); + groupTagsByDO({ tags, generateAllReplicas = false }: { tags: string[]; generateAllReplicas?: boolean }) { + // Here we'll start by splitting soft tags from hard tags + // This will greatly increase the cache hit rate for the soft tag (which are the most likely to cause issue because of load) + const softTags = this.generateDOIdArray({ tags, shardType: "soft", generateAllReplicas }); + + const hardTags = this.generateDOIdArray({ tags, shardType: "hard", generateAllReplicas }); + + const tagIdCollection = [...softTags, ...hardTags]; + + // We then group the tags by DO id + const tagsByDOId = new Map< + string, + { + doId: TagCacheDOId; + tags: string[]; + } + >(); + for (const { doId, tag } of tagIdCollection) { + const doIdString = doId.key; + const tagsArray = tagsByDOId.get(doIdString)?.tags ?? []; + tagsArray.push(tag); + tagsByDOId.set(doIdString, { + // We override the doId here, but it should be the same for all tags + doId, + tags: tagsArray, + }); } - return shards; + const result = Array.from(tagsByDOId.values()); + return result; } private async getConfig() { const cfEnv = getCloudflareContext().env; - const db = cfEnv.NEXT_CACHE_D1_SHARDED; + const db = cfEnv.NEXT_CACHE_DO_SHARDED; if (!db) debug("No Durable object found"); const isDisabled = !!(globalThis as unknown as { openNextConfig: OpenNextConfig }).openNextConfig @@ -93,28 +216,25 @@ class ShardedD1TagCache implements NextModeTagCache { const { isDisabled } = await this.getConfig(); if (isDisabled) return false; try { - const shards = this.generateShards(tags); - // We then create a new durable object for each shard - const shardsResult = await Promise.all( - Array.from(shards.entries()).map(async ([shardId, shardedTags]) => { - const cachedValue = await this.getFromRegionalCache(shardId, shardedTags); + const shardedTagGroups = this.groupTagsByDO({ tags }); + const shardedTagRevalidationOutcomes = await Promise.all( + shardedTagGroups.map(async ({ doId, tags }) => { + const cachedValue = await this.getFromRegionalCache(doId, tags); if (cachedValue) { return (await cachedValue.text()) === "true"; } - const stub = this.getDurableObjectStub(shardId); - const _hasBeenRevalidated = await stub.hasBeenRevalidated(shardedTags, lastModified); + const stub = this.getDurableObjectStub(doId); + const _hasBeenRevalidated = await stub.hasBeenRevalidated(tags, lastModified); //TODO: Do we want to cache the result if it has been revalidated ? // If we do so, we risk causing cache MISS even though it has been revalidated elsewhere // On the other hand revalidating a tag that is used in a lot of places will cause a lot of requests if (!_hasBeenRevalidated) { - getCloudflareContext().ctx.waitUntil( - this.putToRegionalCache(shardId, shardedTags, _hasBeenRevalidated) - ); + getCloudflareContext().ctx.waitUntil(this.putToRegionalCache(doId, tags, _hasBeenRevalidated)); } return _hasBeenRevalidated; }) ); - return shardsResult.some((result) => result); + return shardedTagRevalidationOutcomes.some((result) => result); } catch (e) { error("Error while checking revalidation", e); return false; @@ -130,38 +250,58 @@ class ShardedD1TagCache implements NextModeTagCache { async writeTags(tags: string[]): Promise { const { isDisabled } = await this.getConfig(); if (isDisabled) return; - const shards = this.generateShards(tags); - // We then create a new durable object for each shard + const shardedTagGroups = this.groupTagsByDO({ tags, generateAllReplicas: true }); + // We want to use the same revalidation time for all tags + const currentTime = Date.now(); await Promise.all( - Array.from(shards.entries()).map(async ([shardId, shardedTags]) => { - const stub = this.getDurableObjectStub(shardId); - await stub.writeTags(shardedTags); - // Depending on the shards and the tags, deleting from the regional cache will not work for every tag - await this.deleteRegionalCache(shardId, shardedTags); + shardedTagGroups.map(async ({ doId, tags }) => { + await this.performWriteTagsWithRetry(doId, tags, currentTime); }) ); } + async performWriteTagsWithRetry(doId: TagCacheDOId, tags: string[], lastModified: number, retryNumber = 0) { + try { + const stub = this.getDurableObjectStub(doId); + await stub.writeTags(tags, lastModified); + // Depending on the shards and the tags, deleting from the regional cache will not work for every tag + await this.deleteRegionalCache(doId, tags); + } catch (e) { + error("Error while writing tags", e); + if (retryNumber >= this.maxWriteRetries) { + error("Error while writing tags, too many retries"); + // Do we want to throw an error here ? + await getCloudflareContext().env.NEXT_CACHE_DO_SHARDED_DLQ?.send({ + failingShardId: doId.key, + failingTags: tags, + lastModified, + }); + return; + } + await this.performWriteTagsWithRetry(doId, tags, lastModified, retryNumber + 1); + } + } + // Cache API async getCacheInstance() { if (!this.localCache && this.opts.regionalCache) { - this.localCache = await caches.open("sharded-d1-tag-cache"); + this.localCache = await caches.open("sharded-do-tag-cache"); } return this.localCache; } - async getCacheKey(shardId: string, tags: string[]) { + async getCacheKey(doId: TagCacheDOId, tags: string[]) { return new Request( - new URL(`shard/${shardId}?tags=${encodeURIComponent(tags.join(";"))}`, "http://local.cache") + new URL(`shard/${doId.shardId}?tags=${encodeURIComponent(tags.join(";"))}`, "http://local.cache") ); } - async getFromRegionalCache(shardId: string, tags: string[]) { + async getFromRegionalCache(doId: TagCacheDOId, tags: string[]) { try { if (!this.opts.regionalCache) return; const cache = await this.getCacheInstance(); if (!cache) return; - const key = await this.getCacheKey(shardId, tags); + const key = await this.getCacheKey(doId, tags); return cache.match(key); } catch (e) { error("Error while fetching from regional cache", e); @@ -169,11 +309,11 @@ class ShardedD1TagCache implements NextModeTagCache { } } - async putToRegionalCache(shardId: string, tags: string[], hasBeenRevalidated: boolean) { + async putToRegionalCache(doId: TagCacheDOId, tags: string[], hasBeenRevalidated: boolean) { if (!this.opts.regionalCache) return; const cache = await this.getCacheInstance(); if (!cache) return; - const key = await this.getCacheKey(shardId, tags); + const key = await this.getCacheKey(doId, tags); await cache.put( key, new Response(`${hasBeenRevalidated}`, { @@ -182,13 +322,18 @@ class ShardedD1TagCache implements NextModeTagCache { ); } - async deleteRegionalCache(shardId: string, tags: string[]) { - if (!this.opts.regionalCache) return; - const cache = await this.getCacheInstance(); - if (!cache) return; - const key = await this.getCacheKey(shardId, tags); - await cache.delete(key); + async deleteRegionalCache(doId: TagCacheDOId, tags: string[]) { + // We never want to crash because of the cache + try { + if (!this.opts.regionalCache) return; + const cache = await this.getCacheInstance(); + if (!cache) return; + const key = await this.getCacheKey(doId, tags); + await cache.delete(key); + } catch (e) { + debug("Error while deleting from regional cache", e); + } } } -export default (opts?: ShardedD1TagCacheOptions) => new ShardedD1TagCache(opts); +export default (opts?: ShardedDOTagCacheOptions) => new ShardedDOTagCache(opts); diff --git a/packages/cloudflare/src/api/durable-objects/sharded-tag-cache.ts b/packages/cloudflare/src/api/durable-objects/sharded-tag-cache.ts index eae900c7..43be4e27 100644 --- a/packages/cloudflare/src/api/durable-objects/sharded-tag-cache.ts +++ b/packages/cloudflare/src/api/durable-objects/sharded-tag-cache.ts @@ -24,12 +24,12 @@ export class DOShardedTagCache extends DurableObject { return result.cnt > 0; } - async writeTags(tags: string[]): Promise { + async writeTags(tags: string[], lastModified: number): Promise { tags.forEach((tag) => { this.sql.exec( `INSERT OR REPLACE INTO revalidations (tag, revalidatedAt) VALUES (?, ?)`, tag, - Date.now() + lastModified ); }); }