Skip to content

Commit c202302

Browse files
conico974Nicolas Dorseuil
andauthored
Fix regional cache for the DOShardedTagCache (#820)
* initial implementation * Fix unit test * lint * prettier fix * review fix --------- Co-authored-by: Nicolas Dorseuil <[email protected]>
1 parent c46eeee commit c202302

File tree

4 files changed

+123
-69
lines changed

4 files changed

+123
-69
lines changed

.changeset/great-carrots-hammer.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@opennextjs/cloudflare": patch
3+
---
4+
5+
Fix regional cache for the DOShardedTagCache

packages/cloudflare/src/api/durable-objects/sharded-tag-cache.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,14 @@ export class DOShardedTagCache extends DurableObject<CloudflareEnv> {
5050
);
5151
});
5252
}
53+
54+
async getRevalidationTimes(tags: string[]): Promise<Record<string, number>> {
55+
const result = this.sql
56+
.exec(
57+
`SELECT tag, revalidatedAt FROM revalidations WHERE tag IN (${tags.map(() => "?").join(", ")})`,
58+
...tags
59+
)
60+
.toArray();
61+
return Object.fromEntries(result.map((row) => [row.tag, row.revalidatedAt]));
62+
}
5363
}

packages/cloudflare/src/api/overrides/tag-cache/do-sharded-tag-cache.spec.ts

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ const getMock = vi
99
.fn()
1010
.mockReturnValue({ hasBeenRevalidated: hasBeenRevalidatedMock, writeTags: writeTagsMock });
1111
const waitUntilMock = vi.fn().mockImplementation(async (fn) => fn());
12-
// @ts-expect-error - We define it here only for the test
1312
globalThis.continent = undefined;
1413
const sendDLQMock = vi.fn();
1514
vi.mock("../../cloudflare-context", () => ({
@@ -22,7 +21,6 @@ vi.mock("../../cloudflare-context", () => ({
2221
},
2322
ctx: { waitUntil: waitUntilMock },
2423
cf: {
25-
// @ts-expect-error - We define it here only for the test
2624
continent: globalThis.continent,
2725
},
2826
}),
@@ -148,7 +146,6 @@ describe("DOShardedTagCache", () => {
148146
});
149147

150148
it("should generate one doIds, but in the correct region", () => {
151-
// @ts-expect-error - We define it here only for the test
152149
globalThis.continent = "EU";
153150
const cache = shardedDOTagCache({
154151
baseShardSize: 4,
@@ -168,7 +165,6 @@ describe("DOShardedTagCache", () => {
168165
expect(shardedTagCollection[0]?.doId.region).toBe("weur");
169166
expect(shardedTagCollection[1]?.doId.region).toBe("weur");
170167

171-
//@ts-expect-error - We need to reset the global variable
172168
globalThis.continent = undefined;
173169
});
174170

@@ -216,7 +212,7 @@ describe("DOShardedTagCache", () => {
216212

217213
it("should return false if stub return false", async () => {
218214
const cache = shardedDOTagCache();
219-
cache.getFromRegionalCache = vi.fn();
215+
cache.getFromRegionalCache = vi.fn().mockResolvedValueOnce([]);
220216
hasBeenRevalidatedMock.mockImplementationOnce(() => false);
221217
const result = await cache.hasBeenRevalidated(["tag1"], 123456);
222218
expect(cache.getFromRegionalCache).toHaveBeenCalled();
@@ -227,7 +223,7 @@ describe("DOShardedTagCache", () => {
227223

228224
it("should return true if stub return true", async () => {
229225
const cache = shardedDOTagCache();
230-
cache.getFromRegionalCache = vi.fn();
226+
cache.getFromRegionalCache = vi.fn().mockResolvedValueOnce([]);
231227
hasBeenRevalidatedMock.mockImplementationOnce(() => true);
232228
const result = await cache.hasBeenRevalidated(["tag1"], 123456);
233229
expect(cache.getFromRegionalCache).toHaveBeenCalled();
@@ -238,7 +234,7 @@ describe("DOShardedTagCache", () => {
238234

239235
it("should return false if it throws", async () => {
240236
const cache = shardedDOTagCache();
241-
cache.getFromRegionalCache = vi.fn();
237+
cache.getFromRegionalCache = vi.fn().mockResolvedValueOnce([]);
242238
hasBeenRevalidatedMock.mockImplementationOnce(() => {
243239
throw new Error("error");
244240
});
@@ -251,7 +247,7 @@ describe("DOShardedTagCache", () => {
251247

252248
it("Should return from the cache if it was found there", async () => {
253249
const cache = shardedDOTagCache();
254-
cache.getFromRegionalCache = vi.fn().mockReturnValueOnce(new Response("true"));
250+
cache.getFromRegionalCache = vi.fn().mockReturnValueOnce([{ tag: "tag1", time: 1234567 }]);
255251
const result = await cache.hasBeenRevalidated(["tag1"], 123456);
256252
expect(result).toBe(true);
257253
expect(idFromNameMock).not.toHaveBeenCalled();
@@ -260,7 +256,7 @@ describe("DOShardedTagCache", () => {
260256

261257
it("should try to put the result in the cache if it was not revalidated", async () => {
262258
const cache = shardedDOTagCache();
263-
cache.getFromRegionalCache = vi.fn();
259+
cache.getFromRegionalCache = vi.fn().mockResolvedValueOnce([]);
264260
cache.putToRegionalCache = vi.fn();
265261
hasBeenRevalidatedMock.mockImplementationOnce(() => false);
266262
const result = await cache.hasBeenRevalidated(["tag1"], 123456);
@@ -272,7 +268,7 @@ describe("DOShardedTagCache", () => {
272268

273269
it("should call all the durable object instance", async () => {
274270
const cache = shardedDOTagCache();
275-
cache.getFromRegionalCache = vi.fn();
271+
cache.getFromRegionalCache = vi.fn().mockResolvedValue([]);
276272
const result = await cache.hasBeenRevalidated(["tag1", "tag2"], 123456);
277273
expect(result).toBe(false);
278274
expect(idFromNameMock).toHaveBeenCalledTimes(2);
@@ -338,7 +334,6 @@ describe("DOShardedTagCache", () => {
338334
expect(cache.deleteRegionalCache).toHaveBeenCalledWith({
339335
doId: expect.objectContaining({ key: "tag-hard;shard-1;replica-1" }),
340336
tags: ["tag1"],
341-
type: "boolean",
342337
});
343338
// expect(cache.deleteRegionalCache).toHaveBeenCalledWith("tag-hard;shard-1;replica-1", ["tag1"]);
344339
});
@@ -372,14 +367,14 @@ describe("DOShardedTagCache", () => {
372367
numberOfReplicas: 1,
373368
shardType: "hard",
374369
});
375-
expect(await cache.getFromRegionalCache({ doId, tags: ["tag1"], type: "boolean" })).toBeUndefined();
370+
expect(await cache.getFromRegionalCache({ doId, tags: ["tag1"] })).toEqual([]);
376371
});
377372

378373
it("should call .match on the cache", async () => {
379374
// @ts-expect-error - Defined on cloudfare context
380375
globalThis.caches = {
381376
open: vi.fn().mockResolvedValue({
382-
match: vi.fn().mockResolvedValue("response"),
377+
match: vi.fn().mockResolvedValue(new Response("1234567")),
383378
}),
384379
};
385380
const cache = shardedDOTagCache({ baseShardSize: 4, regionalCache: true });
@@ -388,7 +383,9 @@ describe("DOShardedTagCache", () => {
388383
numberOfReplicas: 1,
389384
shardType: "hard",
390385
});
391-
expect(await cache.getFromRegionalCache({ doId, tags: ["tag1"], type: "boolean" })).toBe("response");
386+
const cacheResult = await cache.getFromRegionalCache({ doId, tags: ["tag1"] });
387+
expect(cacheResult.length).toBe(1);
388+
expect(cacheResult[0]).toEqual({ tag: "tag1", time: 1234567 });
392389
// @ts-expect-error - Defined on cloudfare context
393390
globalThis.caches = undefined;
394391
});
@@ -398,18 +395,16 @@ describe("DOShardedTagCache", () => {
398395
it("should return the cache key without the random part", async () => {
399396
const cache = shardedDOTagCache();
400397
const doId1 = new DOId({ baseShardId: "shard-0", numberOfReplicas: 1, shardType: "hard" });
401-
expect(cache.getCacheUrlKey({ doId: doId1, tags: ["_N_T_/tag1"], type: "boolean" })).toBe(
402-
"http://local.cache/shard/tag-hard;shard-0?type=boolean&tags=_N_T_%2Ftag1"
398+
expect(cache.getCacheUrlKey(doId1, "_N_T_/tag1")).toBe(
399+
"http://local.cache/shard/tag-hard;shard-0?tag=_N_T_%2Ftag1"
403400
);
404401

405402
const doId2 = new DOId({
406403
baseShardId: "shard-1",
407404
numberOfReplicas: 1,
408405
shardType: "hard",
409406
});
410-
expect(cache.getCacheUrlKey({ doId: doId2, tags: ["tag1"], type: "boolean" })).toBe(
411-
"http://local.cache/shard/tag-hard;shard-1?type=boolean&tags=tag1"
412-
);
407+
expect(cache.getCacheUrlKey(doId2, "tag1")).toBe("http://local.cache/shard/tag-hard;shard-1?tag=tag1");
413408
});
414409
});
415410

packages/cloudflare/src/api/overrides/tag-cache/do-sharded-tag-cache.ts

Lines changed: 94 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js";
55

66
import type { OpenNextConfig } from "../../../api/config.js";
77
import { getCloudflareContext } from "../../cloudflare-context";
8+
import { DOShardedTagCache } from "../../durable-objects/sharded-tag-cache.js";
89
import { debugCache, purgeCacheByTags } from "../internal";
910

1011
export const DEFAULT_WRITE_RETRIES = 3;
@@ -120,7 +121,6 @@ export class DOId {
120121
interface CacheTagKeyOptions {
121122
doId: DOId;
122123
tags: string[];
123-
type: "boolean" | "number";
124124
}
125125
class ShardedDOTagCache implements NextModeTagCache {
126126
readonly mode = "nextMode" as const;
@@ -294,28 +294,31 @@ class ShardedDOTagCache implements NextModeTagCache {
294294
async getLastRevalidated(tags: string[]): Promise<number> {
295295
const { isDisabled } = await this.getConfig();
296296
if (isDisabled) return 0;
297+
if (tags.length === 0) return 0; // No tags to check
298+
const deduplicatedTags = Array.from(new Set(tags)); // We deduplicate the tags to avoid unnecessary requests
297299
try {
298-
const shardedTagGroups = this.groupTagsByDO({ tags });
300+
const shardedTagGroups = this.groupTagsByDO({ tags: deduplicatedTags });
299301
const shardedTagRevalidationOutcomes = await Promise.all(
300302
shardedTagGroups.map(async ({ doId, tags }) => {
301-
const cachedValue = await this.getFromRegionalCache({ doId, tags, type: "number" });
302-
if (cachedValue) {
303-
const cached = await cachedValue.text();
304-
try {
305-
return parseInt(cached, 10);
306-
} catch (e) {
307-
debug("Error while parsing cached value", e);
308-
// If we can't parse the cached value, we should just ignore it and go to the durable object
309-
}
303+
const cachedValue = await this.getFromRegionalCache({ doId, tags });
304+
// If all the value were found in the regional cache, we can just return the max value
305+
if (cachedValue.length === tags.length) {
306+
return Math.max(...cachedValue.map((item) => item.time));
310307
}
308+
// Otherwise we need to check the durable object on the ones that were not found in the cache
309+
const filteredTags = deduplicatedTags.filter(
310+
(tag) => !cachedValue.some((item) => item.tag === tag)
311+
);
312+
311313
const stub = this.getDurableObjectStub(doId);
312-
const _lastRevalidated = await stub.getLastRevalidated(tags);
313-
if (!_lastRevalidated) {
314-
getCloudflareContext().ctx.waitUntil(
315-
this.putToRegionalCache({ doId, tags, type: "number" }, _lastRevalidated)
316-
);
317-
}
318-
return _lastRevalidated;
314+
const lastRevalidated = await stub.getLastRevalidated(filteredTags);
315+
316+
const result = Math.max(...cachedValue.map((item) => item.time), lastRevalidated);
317+
318+
// We then need to populate the regional cache with the missing tags
319+
getCloudflareContext().ctx.waitUntil(this.putToRegionalCache({ doId, tags }, stub));
320+
321+
return result;
319322
})
320323
);
321324
return Math.max(...shardedTagRevalidationOutcomes);
@@ -339,20 +342,27 @@ class ShardedDOTagCache implements NextModeTagCache {
339342
const shardedTagGroups = this.groupTagsByDO({ tags });
340343
const shardedTagRevalidationOutcomes = await Promise.all(
341344
shardedTagGroups.map(async ({ doId, tags }) => {
342-
const cachedValue = await this.getFromRegionalCache({ doId, tags, type: "boolean" });
343-
if (cachedValue) {
344-
return (await cachedValue.text()) === "true";
345+
const cachedValue = await this.getFromRegionalCache({ doId, tags });
346+
347+
// If one of the cached values is newer than the lastModified, we can return true
348+
const cacheHasBeenRevalidated = cachedValue.some((cachedValue) => {
349+
return (cachedValue.time ?? 0) > (lastModified ?? Date.now());
350+
});
351+
352+
if (cacheHasBeenRevalidated) {
353+
return true;
345354
}
346355
const stub = this.getDurableObjectStub(doId);
347356
const _hasBeenRevalidated = await stub.hasBeenRevalidated(tags, lastModified);
348-
//TODO: Do we want to cache the result if it has been revalidated ?
349-
// If we do so, we risk causing cache MISS even though it has been revalidated elsewhere
350-
// On the other hand revalidating a tag that is used in a lot of places will cause a lot of requests
351-
if (!_hasBeenRevalidated) {
357+
358+
const remainingTags = tags.filter((tag) => !cachedValue.some((item) => item.tag === tag));
359+
if (remainingTags.length > 0) {
360+
// We need to put the missing tags in the regional cache
352361
getCloudflareContext().ctx.waitUntil(
353-
this.putToRegionalCache({ doId, tags, type: "boolean" }, _hasBeenRevalidated)
362+
this.putToRegionalCache({ doId, tags: remainingTags }, stub)
354363
);
355364
}
365+
356366
return _hasBeenRevalidated;
357367
})
358368
);
@@ -389,10 +399,7 @@ class ShardedDOTagCache implements NextModeTagCache {
389399
await stub.writeTags(tags, lastModified);
390400
// Depending on the shards and the tags, deleting from the regional cache will not work for every tag
391401
// We also need to delete both cache
392-
await Promise.all([
393-
this.deleteRegionalCache({ doId, tags, type: "boolean" }),
394-
this.deleteRegionalCache({ doId, tags, type: "number" }),
395-
]);
402+
await Promise.all([this.deleteRegionalCache({ doId, tags })]);
396403
} catch (e) {
397404
error("Error while writing tags", e);
398405
if (retryNumber >= this.maxWriteRetries) {
@@ -417,49 +424,86 @@ class ShardedDOTagCache implements NextModeTagCache {
417424
return this.localCache;
418425
}
419426

420-
getCacheUrlKey(opts: CacheTagKeyOptions): string {
421-
const { doId, tags, type } = opts;
422-
return `http://local.cache/shard/${doId.shardId}?type=${type}&tags=${encodeURIComponent(tags.join(";"))}`;
427+
getCacheUrlKey(doId: DOId, tag: string) {
428+
return `http://local.cache/shard/${doId.shardId}?tag=${encodeURIComponent(tag)}`;
423429
}
424430

431+
/**
432+
* Get the last revalidation time for the tags from the regional cache
433+
* If the cache is not enabled, it will return an empty array
434+
* @returns An array of objects with the tag and the last revalidation time
435+
*/
425436
async getFromRegionalCache(opts: CacheTagKeyOptions) {
426437
try {
427-
if (!this.opts.regionalCache) return;
438+
if (!this.opts.regionalCache) return [];
428439
const cache = await this.getCacheInstance();
429-
if (!cache) return;
430-
return cache.match(this.getCacheUrlKey(opts));
440+
if (!cache) return [];
441+
const result = await Promise.all(
442+
opts.tags.map(async (tag) => {
443+
const cachedResponse = await cache.match(this.getCacheUrlKey(opts.doId, tag));
444+
if (!cachedResponse) return null;
445+
const cachedText = await cachedResponse.text();
446+
try {
447+
return { tag, time: parseInt(cachedText, 10) };
448+
} catch (e) {
449+
debugCache("Error while parsing cached value", e);
450+
return null;
451+
}
452+
})
453+
);
454+
return result.filter((item) => item !== null);
431455
} catch (e) {
432456
error("Error while fetching from regional cache", e);
457+
return [];
433458
}
434459
}
435-
436-
async putToRegionalCache(optsKey: CacheTagKeyOptions, value: number | boolean) {
460+
async putToRegionalCache(optsKey: CacheTagKeyOptions, stub: DurableObjectStub<DOShardedTagCache>) {
437461
if (!this.opts.regionalCache) return;
438462
const cache = await this.getCacheInstance();
439463
if (!cache) return;
440464
const tags = optsKey.tags;
441-
await cache.put(
442-
this.getCacheUrlKey(optsKey),
443-
new Response(`${value}`, {
444-
headers: {
445-
"cache-control": `max-age=${this.opts.regionalCacheTtlSec ?? 5}`,
446-
...(tags.length > 0
447-
? {
448-
"cache-tag": tags.join(","),
449-
}
450-
: {}),
451-
},
465+
const tagsLastRevalidated = await stub.getRevalidationTimes(tags);
466+
await Promise.all(
467+
tags.map(async (tag) => {
468+
const lastRevalidated = tagsLastRevalidated[tag];
469+
if (lastRevalidated === undefined) return; // Should we store something in the cache if the tag is not found ?
470+
const cacheKey = this.getCacheUrlKey(optsKey.doId, tag);
471+
debugCache("Putting to regional cache", { cacheKey, lastRevalidated });
472+
await cache.put(
473+
cacheKey,
474+
new Response(lastRevalidated.toString(), {
475+
status: 200,
476+
headers: {
477+
"cache-control": `max-age=${this.opts.regionalCacheTtlSec ?? 5}`,
478+
...(tags.length > 0
479+
? {
480+
"cache-tag": tags.join(","),
481+
}
482+
: {}),
483+
},
484+
})
485+
);
452486
})
453487
);
454488
}
455489

490+
/**
491+
* Deletes the regional cache for the given tags
492+
* This is used to ensure that the cache is cleared when the tags are revalidated
493+
*/
456494
async deleteRegionalCache(optsKey: CacheTagKeyOptions) {
457495
// We never want to crash because of the cache
458496
try {
459497
if (!this.opts.regionalCache) return;
460498
const cache = await this.getCacheInstance();
461499
if (!cache) return;
462-
await cache.delete(this.getCacheUrlKey(optsKey));
500+
await Promise.all(
501+
optsKey.tags.map(async (tag) => {
502+
const cacheKey = this.getCacheUrlKey(optsKey.doId, tag);
503+
debugCache("Deleting from regional cache", { cacheKey });
504+
await cache.delete(cacheKey);
505+
})
506+
);
463507
} catch (e) {
464508
debugCache("Error while deleting from regional cache", e);
465509
}

0 commit comments

Comments
 (0)