Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/great-carrots-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/cloudflare": patch
---

Fix regional cache for the DOShardedTagCache
10 changes: 10 additions & 0 deletions packages/cloudflare/src/api/durable-objects/sharded-tag-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,14 @@ export class DOShardedTagCache extends DurableObject<CloudflareEnv> {
);
});
}

async getRevalidationTimes(tags: string[]): Promise<Record<string, number>> {
const result = this.sql
.exec(
`SELECT tag, revalidatedAt FROM revalidations WHERE tag IN (${tags.map(() => "?").join(", ")})`,
...tags
)
.toArray();
return Object.fromEntries(result.map((row) => [row.tag, row.revalidatedAt]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const getMock = vi
.fn()
.mockReturnValue({ hasBeenRevalidated: hasBeenRevalidatedMock, writeTags: writeTagsMock });
const waitUntilMock = vi.fn().mockImplementation(async (fn) => fn());
// @ts-expect-error - We define it here only for the test
globalThis.continent = undefined;
const sendDLQMock = vi.fn();
vi.mock("../../cloudflare-context", () => ({
Expand All @@ -22,7 +21,6 @@ vi.mock("../../cloudflare-context", () => ({
},
ctx: { waitUntil: waitUntilMock },
cf: {
// @ts-expect-error - We define it here only for the test
continent: globalThis.continent,
},
}),
Expand Down Expand Up @@ -148,7 +146,6 @@ describe("DOShardedTagCache", () => {
});

it("should generate one doIds, but in the correct region", () => {
// @ts-expect-error - We define it here only for the test
globalThis.continent = "EU";
const cache = shardedDOTagCache({
baseShardSize: 4,
Expand All @@ -168,7 +165,6 @@ describe("DOShardedTagCache", () => {
expect(shardedTagCollection[0]?.doId.region).toBe("weur");
expect(shardedTagCollection[1]?.doId.region).toBe("weur");

//@ts-expect-error - We need to reset the global variable
globalThis.continent = undefined;
});

Expand Down Expand Up @@ -216,7 +212,7 @@ describe("DOShardedTagCache", () => {

it("should return false if stub return false", async () => {
const cache = shardedDOTagCache();
cache.getFromRegionalCache = vi.fn();
cache.getFromRegionalCache = vi.fn().mockResolvedValueOnce([]);
hasBeenRevalidatedMock.mockImplementationOnce(() => false);
const result = await cache.hasBeenRevalidated(["tag1"], 123456);
expect(cache.getFromRegionalCache).toHaveBeenCalled();
Expand All @@ -227,7 +223,7 @@ describe("DOShardedTagCache", () => {

it("should return true if stub return true", async () => {
const cache = shardedDOTagCache();
cache.getFromRegionalCache = vi.fn();
cache.getFromRegionalCache = vi.fn().mockResolvedValueOnce([]);
hasBeenRevalidatedMock.mockImplementationOnce(() => true);
const result = await cache.hasBeenRevalidated(["tag1"], 123456);
expect(cache.getFromRegionalCache).toHaveBeenCalled();
Expand All @@ -238,7 +234,7 @@ describe("DOShardedTagCache", () => {

it("should return false if it throws", async () => {
const cache = shardedDOTagCache();
cache.getFromRegionalCache = vi.fn();
cache.getFromRegionalCache = vi.fn().mockResolvedValueOnce([]);
hasBeenRevalidatedMock.mockImplementationOnce(() => {
throw new Error("error");
});
Expand All @@ -251,7 +247,7 @@ describe("DOShardedTagCache", () => {

it("Should return from the cache if it was found there", async () => {
const cache = shardedDOTagCache();
cache.getFromRegionalCache = vi.fn().mockReturnValueOnce(new Response("true"));
cache.getFromRegionalCache = vi.fn().mockReturnValueOnce([{ tag: "tag1", time: 1234567 }]);
const result = await cache.hasBeenRevalidated(["tag1"], 123456);
expect(result).toBe(true);
expect(idFromNameMock).not.toHaveBeenCalled();
Expand All @@ -260,7 +256,7 @@ describe("DOShardedTagCache", () => {

it("should try to put the result in the cache if it was not revalidated", async () => {
const cache = shardedDOTagCache();
cache.getFromRegionalCache = vi.fn();
cache.getFromRegionalCache = vi.fn().mockResolvedValueOnce([]);
cache.putToRegionalCache = vi.fn();
hasBeenRevalidatedMock.mockImplementationOnce(() => false);
const result = await cache.hasBeenRevalidated(["tag1"], 123456);
Expand All @@ -272,7 +268,7 @@ describe("DOShardedTagCache", () => {

it("should call all the durable object instance", async () => {
const cache = shardedDOTagCache();
cache.getFromRegionalCache = vi.fn();
cache.getFromRegionalCache = vi.fn().mockResolvedValue([]);
const result = await cache.hasBeenRevalidated(["tag1", "tag2"], 123456);
expect(result).toBe(false);
expect(idFromNameMock).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -338,7 +334,6 @@ describe("DOShardedTagCache", () => {
expect(cache.deleteRegionalCache).toHaveBeenCalledWith({
doId: expect.objectContaining({ key: "tag-hard;shard-1;replica-1" }),
tags: ["tag1"],
type: "boolean",
});
// expect(cache.deleteRegionalCache).toHaveBeenCalledWith("tag-hard;shard-1;replica-1", ["tag1"]);
});
Expand Down Expand Up @@ -372,14 +367,14 @@ describe("DOShardedTagCache", () => {
numberOfReplicas: 1,
shardType: "hard",
});
expect(await cache.getFromRegionalCache({ doId, tags: ["tag1"], type: "boolean" })).toBeUndefined();
expect(await cache.getFromRegionalCache({ doId, tags: ["tag1"] })).toEqual([]);
});

it("should call .match on the cache", async () => {
// @ts-expect-error - Defined on cloudfare context
globalThis.caches = {
open: vi.fn().mockResolvedValue({
match: vi.fn().mockResolvedValue("response"),
match: vi.fn().mockResolvedValue(new Response("1234567")),
}),
};
const cache = shardedDOTagCache({ baseShardSize: 4, regionalCache: true });
Expand All @@ -388,7 +383,9 @@ describe("DOShardedTagCache", () => {
numberOfReplicas: 1,
shardType: "hard",
});
expect(await cache.getFromRegionalCache({ doId, tags: ["tag1"], type: "boolean" })).toBe("response");
const cacheResult = await cache.getFromRegionalCache({ doId, tags: ["tag1"] });
expect(cacheResult.length).toBe(1);
expect(cacheResult[0]).toEqual({ tag: "tag1", time: 1234567 });
// @ts-expect-error - Defined on cloudfare context
globalThis.caches = undefined;
});
Expand All @@ -398,18 +395,16 @@ describe("DOShardedTagCache", () => {
it("should return the cache key without the random part", async () => {
const cache = shardedDOTagCache();
const doId1 = new DOId({ baseShardId: "shard-0", numberOfReplicas: 1, shardType: "hard" });
expect(cache.getCacheUrlKey({ doId: doId1, tags: ["_N_T_/tag1"], type: "boolean" })).toBe(
"http://local.cache/shard/tag-hard;shard-0?type=boolean&tags=_N_T_%2Ftag1"
expect(cache.getCacheUrlKey(doId1, "_N_T_/tag1")).toBe(
"http://local.cache/shard/tag-hard;shard-0?tag=_N_T_%2Ftag1"
);

const doId2 = new DOId({
baseShardId: "shard-1",
numberOfReplicas: 1,
shardType: "hard",
});
expect(cache.getCacheUrlKey({ doId: doId2, tags: ["tag1"], type: "boolean" })).toBe(
"http://local.cache/shard/tag-hard;shard-1?type=boolean&tags=tag1"
);
expect(cache.getCacheUrlKey(doId2, "tag1")).toBe("http://local.cache/shard/tag-hard;shard-1?tag=tag1");
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js";

import type { OpenNextConfig } from "../../../api/config.js";
import { getCloudflareContext } from "../../cloudflare-context";
import { DOShardedTagCache } from "../../durable-objects/sharded-tag-cache.js";
import { debugCache, purgeCacheByTags } from "../internal";

export const DEFAULT_WRITE_RETRIES = 3;
Expand Down Expand Up @@ -120,7 +121,6 @@ export class DOId {
interface CacheTagKeyOptions {
doId: DOId;
tags: string[];
type: "boolean" | "number";
}
class ShardedDOTagCache implements NextModeTagCache {
readonly mode = "nextMode" as const;
Expand Down Expand Up @@ -294,28 +294,31 @@ class ShardedDOTagCache implements NextModeTagCache {
async getLastRevalidated(tags: string[]): Promise<number> {
const { isDisabled } = await this.getConfig();
if (isDisabled) return 0;
if (tags.length === 0) return 0; // No tags to check
const deduplicatedTags = Array.from(new Set(tags)); // We deduplicate the tags to avoid unnecessary requests
try {
const shardedTagGroups = this.groupTagsByDO({ tags });
const shardedTagGroups = this.groupTagsByDO({ tags: deduplicatedTags });
const shardedTagRevalidationOutcomes = await Promise.all(
shardedTagGroups.map(async ({ doId, tags }) => {
const cachedValue = await this.getFromRegionalCache({ doId, tags, type: "number" });
if (cachedValue) {
const cached = await cachedValue.text();
try {
return parseInt(cached, 10);
} catch (e) {
debug("Error while parsing cached value", e);
// If we can't parse the cached value, we should just ignore it and go to the durable object
}
const cachedValue = await this.getFromRegionalCache({ doId, tags });
// If all the value were found in the regional cache, we can just return the max value
if (cachedValue.length === tags.length) {
return Math.max(...cachedValue.map((item) => item.time));
}
// Otherwise we need to check the durable object on the ones that were not found in the cache
const filteredTags = deduplicatedTags.filter(
(tag) => !cachedValue.some((item) => item.tag === tag)
);

const stub = this.getDurableObjectStub(doId);
const _lastRevalidated = await stub.getLastRevalidated(tags);
if (!_lastRevalidated) {
getCloudflareContext().ctx.waitUntil(
this.putToRegionalCache({ doId, tags, type: "number" }, _lastRevalidated)
);
}
return _lastRevalidated;
const lastRevalidated = await stub.getLastRevalidated(filteredTags);

const result = Math.max(...cachedValue.map((item) => item.time), lastRevalidated);

// We then need to populate the regional cache with the missing tags
getCloudflareContext().ctx.waitUntil(this.putToRegionalCache({ doId, tags }, stub));

return result;
})
);
return Math.max(...shardedTagRevalidationOutcomes);
Expand All @@ -339,20 +342,27 @@ class ShardedDOTagCache implements NextModeTagCache {
const shardedTagGroups = this.groupTagsByDO({ tags });
const shardedTagRevalidationOutcomes = await Promise.all(
shardedTagGroups.map(async ({ doId, tags }) => {
const cachedValue = await this.getFromRegionalCache({ doId, tags, type: "boolean" });
if (cachedValue) {
return (await cachedValue.text()) === "true";
const cachedValue = await this.getFromRegionalCache({ doId, tags });

// If one of the cached values is newer than the lastModified, we can return true
const cacheHasBeenRevalidated = cachedValue.some((cachedValue) => {
return (cachedValue.time ?? 0) > (lastModified ?? Date.now());
});

if (cacheHasBeenRevalidated) {
return true;
}
const stub = this.getDurableObjectStub(doId);
const _hasBeenRevalidated = await stub.hasBeenRevalidated(tags, lastModified);
//TODO: Do we want to cache the result if it has been revalidated ?
// If we do so, we risk causing cache MISS even though it has been revalidated elsewhere
// On the other hand revalidating a tag that is used in a lot of places will cause a lot of requests
if (!_hasBeenRevalidated) {

const remainingTags = tags.filter((tag) => !cachedValue.some((item) => item.tag === tag));
if (remainingTags.length > 0) {
// We need to put the missing tags in the regional cache
getCloudflareContext().ctx.waitUntil(
this.putToRegionalCache({ doId, tags, type: "boolean" }, _hasBeenRevalidated)
this.putToRegionalCache({ doId, tags: remainingTags }, stub)
);
}

return _hasBeenRevalidated;
})
);
Expand Down Expand Up @@ -389,10 +399,7 @@ class ShardedDOTagCache implements NextModeTagCache {
await stub.writeTags(tags, lastModified);
// Depending on the shards and the tags, deleting from the regional cache will not work for every tag
// We also need to delete both cache
await Promise.all([
this.deleteRegionalCache({ doId, tags, type: "boolean" }),
this.deleteRegionalCache({ doId, tags, type: "number" }),
]);
await Promise.all([this.deleteRegionalCache({ doId, tags })]);
} catch (e) {
error("Error while writing tags", e);
if (retryNumber >= this.maxWriteRetries) {
Expand All @@ -417,49 +424,86 @@ class ShardedDOTagCache implements NextModeTagCache {
return this.localCache;
}

getCacheUrlKey(opts: CacheTagKeyOptions): string {
const { doId, tags, type } = opts;
return `http://local.cache/shard/${doId.shardId}?type=${type}&tags=${encodeURIComponent(tags.join(";"))}`;
getCacheUrlKey(doId: DOId, tag: string) {
return `http://local.cache/shard/${doId.shardId}?tag=${encodeURIComponent(tag)}`;
}

/**
* Get the last revalidation time for the tags from the regional cache
* If the cache is not enabled, it will return an empty array
* @returns An array of objects with the tag and the last revalidation time
*/
async getFromRegionalCache(opts: CacheTagKeyOptions) {
try {
if (!this.opts.regionalCache) return;
if (!this.opts.regionalCache) return [];
const cache = await this.getCacheInstance();
if (!cache) return;
return cache.match(this.getCacheUrlKey(opts));
if (!cache) return [];
const result = await Promise.all(
opts.tags.map(async (tag) => {
const cachedResponse = await cache.match(this.getCacheUrlKey(opts.doId, tag));
if (!cachedResponse) return null;
const cachedText = await cachedResponse.text();
try {
return { tag, time: parseInt(cachedText, 10) };
} catch (e) {
debugCache("Error while parsing cached value", e);
return null;
}
})
);
return result.filter((item) => item !== null);
} catch (e) {
error("Error while fetching from regional cache", e);
return [];
}
}

async putToRegionalCache(optsKey: CacheTagKeyOptions, value: number | boolean) {
async putToRegionalCache(optsKey: CacheTagKeyOptions, stub: DurableObjectStub<DOShardedTagCache>) {
if (!this.opts.regionalCache) return;
const cache = await this.getCacheInstance();
if (!cache) return;
const tags = optsKey.tags;
await cache.put(
this.getCacheUrlKey(optsKey),
new Response(`${value}`, {
headers: {
"cache-control": `max-age=${this.opts.regionalCacheTtlSec ?? 5}`,
...(tags.length > 0
? {
"cache-tag": tags.join(","),
}
: {}),
},
const tagsLastRevalidated = await stub.getRevalidationTimes(tags);
await Promise.all(
tags.map(async (tag) => {
const lastRevalidated = tagsLastRevalidated[tag];
if (lastRevalidated === undefined) return; // Should we store something in the cache if the tag is not found ?
const cacheKey = this.getCacheUrlKey(optsKey.doId, tag);
debugCache("Putting to regional cache", { cacheKey, lastRevalidated });
await cache.put(
cacheKey,
new Response(lastRevalidated.toString(), {
status: 200,
headers: {
"cache-control": `max-age=${this.opts.regionalCacheTtlSec ?? 5}`,
...(tags.length > 0
? {
"cache-tag": tags.join(","),
}
: {}),
},
})
);
})
);
}

/**
* Deletes the regional cache for the given tags
* This is used to ensure that the cache is cleared when the tags are revalidated
*/
async deleteRegionalCache(optsKey: CacheTagKeyOptions) {
// We never want to crash because of the cache
try {
if (!this.opts.regionalCache) return;
const cache = await this.getCacheInstance();
if (!cache) return;
await cache.delete(this.getCacheUrlKey(optsKey));
await Promise.all(
optsKey.tags.map(async (tag) => {
const cacheKey = this.getCacheUrlKey(optsKey.doId, tag);
debugCache("Deleting from regional cache", { cacheKey });
await cache.delete(cacheKey);
})
);
} catch (e) {
debugCache("Error while deleting from regional cache", e);
}
Expand Down