Skip to content

Commit 9fb8b4d

Browse files
committed
initial implementation
1 parent c46eeee commit 9fb8b4d

File tree

2 files changed

+123
-65
lines changed

2 files changed

+123
-65
lines changed

packages/cloudflare/src/api/durable-objects/sharded-tag-cache.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,14 @@ export class DOShardedTagCache extends DurableObject<CloudflareEnv> {
5050
);
5151
});
5252
}
53+
54+
async getRevalidationTimes(tags: string[]): Promise<Record<string, number>> {
55+
const result = this.sql
56+
.exec(
57+
`SELECT tag, revalidatedAt FROM revalidations WHERE tag IN (${tags.map(() => "?").join(", ")})`,
58+
...tags
59+
)
60+
.toArray();
61+
return Object.fromEntries(result.map((row) => [row.tag, row.revalidatedAt]));
62+
}
5363
}

packages/cloudflare/src/api/overrides/tag-cache/do-sharded-tag-cache.ts

Lines changed: 113 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js";
66
import type { OpenNextConfig } from "../../../api/config.js";
77
import { getCloudflareContext } from "../../cloudflare-context";
88
import { debugCache, purgeCacheByTags } from "../internal";
9+
import { DOShardedTagCache } from "../../durable-objects/sharded-tag-cache.js";
910

1011
export const DEFAULT_WRITE_RETRIES = 3;
1112
export const DEFAULT_NUM_SHARDS = 4;
@@ -120,7 +121,6 @@ export class DOId {
120121
interface CacheTagKeyOptions {
121122
doId: DOId;
122123
tags: string[];
123-
type: "boolean" | "number";
124124
}
125125
class ShardedDOTagCache implements NextModeTagCache {
126126
readonly mode = "nextMode" as const;
@@ -197,23 +197,23 @@ class ShardedDOTagCache implements NextModeTagCache {
197197
// If we have regional replication enabled, we need to further duplicate the shards in all the regions
198198
const regionalReplicasInAllRegions = generateAllReplicas
199199
? regionalReplicas.flatMap(({ doId, tag }) => {
200-
return AVAILABLE_REGIONS.map((region) => {
201-
return {
202-
doId: new DOId({
203-
baseShardId: doId.options.baseShardId,
204-
numberOfReplicas: numReplicas,
205-
shardType,
206-
replicaId: doId.replicaId,
207-
region,
208-
}),
209-
tag,
210-
};
211-
});
212-
})
213-
: regionalReplicas.map(({ doId, tag }) => {
214-
doId.region = this.getClosestRegion();
215-
return { doId, tag };
200+
return AVAILABLE_REGIONS.map((region) => {
201+
return {
202+
doId: new DOId({
203+
baseShardId: doId.options.baseShardId,
204+
numberOfReplicas: numReplicas,
205+
shardType,
206+
replicaId: doId.replicaId,
207+
region,
208+
}),
209+
tag,
210+
};
216211
});
212+
})
213+
: regionalReplicas.map(({ doId, tag }) => {
214+
doId.region = this.getClosestRegion();
215+
return { doId, tag };
216+
});
217217
return regionalReplicasInAllRegions;
218218
}
219219

@@ -286,36 +286,42 @@ class ShardedDOTagCache implements NextModeTagCache {
286286
return !db || isDisabled
287287
? { isDisabled: true as const }
288288
: {
289-
isDisabled: false as const,
290-
db,
291-
};
289+
isDisabled: false as const,
290+
db,
291+
};
292292
}
293293

294294
async getLastRevalidated(tags: string[]): Promise<number> {
295295
const { isDisabled } = await this.getConfig();
296296
if (isDisabled) return 0;
297+
if (tags.length === 0) return 0; // No tags to check
298+
const deduplicatedTags = Array.from(new Set(tags)); // We deduplicate the tags to avoid unnecessary requests
297299
try {
298-
const shardedTagGroups = this.groupTagsByDO({ tags });
300+
const shardedTagGroups = this.groupTagsByDO({ tags: deduplicatedTags });
299301
const shardedTagRevalidationOutcomes = await Promise.all(
300302
shardedTagGroups.map(async ({ doId, tags }) => {
301-
const cachedValue = await this.getFromRegionalCache({ doId, tags, type: "number" });
302-
if (cachedValue) {
303-
const cached = await cachedValue.text();
304-
try {
305-
return parseInt(cached, 10);
306-
} catch (e) {
307-
debug("Error while parsing cached value", e);
308-
// If we can't parse the cached value, we should just ignore it and go to the durable object
309-
}
303+
const cachedValue = await this.getFromRegionalCache({ doId, tags });
304+
// If all the value were found in the regional cache, we can just return the max value
305+
if (cachedValue.length === tags.length) {
306+
return Math.max(...cachedValue.map((item) => item.time));
310307
}
308+
// Otherwise we need to check the durable object on the ones that were not found in the cache
309+
const filteredTags = tags.filter((tag) => !cachedValue.some((item) => item.tag === tag));
310+
311311
const stub = this.getDurableObjectStub(doId);
312-
const _lastRevalidated = await stub.getLastRevalidated(tags);
313-
if (!_lastRevalidated) {
314-
getCloudflareContext().ctx.waitUntil(
315-
this.putToRegionalCache({ doId, tags, type: "number" }, _lastRevalidated)
316-
);
317-
}
318-
return _lastRevalidated;
312+
const lastRevalidated = await stub.getLastRevalidated(filteredTags);
313+
314+
const result = Math.max(
315+
...cachedValue.map((item) => item.time),
316+
lastRevalidated
317+
);
318+
319+
// We then need to populate the regional cache with the missing tags
320+
getCloudflareContext().ctx.waitUntil(
321+
this.putToRegionalCache({ doId, tags }, stub)
322+
);
323+
324+
return result;
319325
})
320326
);
321327
return Math.max(...shardedTagRevalidationOutcomes);
@@ -339,20 +345,27 @@ class ShardedDOTagCache implements NextModeTagCache {
339345
const shardedTagGroups = this.groupTagsByDO({ tags });
340346
const shardedTagRevalidationOutcomes = await Promise.all(
341347
shardedTagGroups.map(async ({ doId, tags }) => {
342-
const cachedValue = await this.getFromRegionalCache({ doId, tags, type: "boolean" });
343-
if (cachedValue) {
344-
return (await cachedValue.text()) === "true";
348+
const cachedValue = await this.getFromRegionalCache({ doId, tags });
349+
350+
// If one of the cached values is newer than the lastModified, we can return true
351+
const cacheHasBeenRevalidated = cachedValue.some((cachedValue) => {
352+
return (cachedValue.time ?? 0) > (lastModified ?? Date.now());
353+
});
354+
355+
if (cacheHasBeenRevalidated) {
356+
return true;
345357
}
346358
const stub = this.getDurableObjectStub(doId);
347359
const _hasBeenRevalidated = await stub.hasBeenRevalidated(tags, lastModified);
348-
//TODO: Do we want to cache the result if it has been revalidated ?
349-
// If we do so, we risk causing cache MISS even though it has been revalidated elsewhere
350-
// On the other hand revalidating a tag that is used in a lot of places will cause a lot of requests
351-
if (!_hasBeenRevalidated) {
360+
361+
const remainingTags = tags.filter((tag) => !cachedValue.some((item) => item.tag === tag));
362+
if (remainingTags.length > 0) {
363+
// We need to put the missing tags in the regional cache
352364
getCloudflareContext().ctx.waitUntil(
353-
this.putToRegionalCache({ doId, tags, type: "boolean" }, _hasBeenRevalidated)
365+
this.putToRegionalCache({ doId, tags: remainingTags }, stub)
354366
);
355367
}
368+
356369
return _hasBeenRevalidated;
357370
})
358371
);
@@ -390,8 +403,7 @@ class ShardedDOTagCache implements NextModeTagCache {
390403
// Depending on the shards and the tags, deleting from the regional cache will not work for every tag
391404
// We also need to delete both cache
392405
await Promise.all([
393-
this.deleteRegionalCache({ doId, tags, type: "boolean" }),
394-
this.deleteRegionalCache({ doId, tags, type: "number" }),
406+
this.deleteRegionalCache({ doId, tags }),
395407
]);
396408
} catch (e) {
397409
error("Error while writing tags", e);
@@ -417,49 +429,85 @@ class ShardedDOTagCache implements NextModeTagCache {
417429
return this.localCache;
418430
}
419431

420-
getCacheUrlKey(opts: CacheTagKeyOptions): string {
421-
const { doId, tags, type } = opts;
422-
return `http://local.cache/shard/${doId.shardId}?type=${type}&tags=${encodeURIComponent(tags.join(";"))}`;
432+
private getCacheUrlKey(doId: DOId, tag: string) {
433+
return `http://local.cache/shard/${doId.shardId}?tag=${encodeURIComponent(tag)}`;
423434
}
424435

436+
437+
438+
439+
/**
440+
* Get the last revalidation time for the tags from the regional cache
441+
* If the cache is not enabled, it will return an empty array
442+
* @returns An array of objects with the tag and the last revalidation time
443+
*/
425444
async getFromRegionalCache(opts: CacheTagKeyOptions) {
426445
try {
427-
if (!this.opts.regionalCache) return;
446+
if (!this.opts.regionalCache) return [];
428447
const cache = await this.getCacheInstance();
429-
if (!cache) return;
430-
return cache.match(this.getCacheUrlKey(opts));
448+
if (!cache) return [];
449+
const result = await Promise.all(
450+
opts.tags.map(async (tag) => {
451+
const cachedResponse = await cache.match(this.getCacheUrlKey(opts.doId, tag));
452+
if (!cachedResponse) return null;
453+
const cachedText = await cachedResponse.text();
454+
try {
455+
return { tag, time: parseInt(cachedText, 10) };
456+
} catch (e) {
457+
debugCache("Error while parsing cached value", e);
458+
return null;
459+
}
460+
})
461+
);
462+
return result.filter((item) => item !== null);
431463
} catch (e) {
432464
error("Error while fetching from regional cache", e);
465+
return [];
433466
}
434467
}
435-
436-
async putToRegionalCache(optsKey: CacheTagKeyOptions, value: number | boolean) {
468+
async putToRegionalCache(optsKey: CacheTagKeyOptions, stub: DurableObjectStub<DOShardedTagCache>) {
437469
if (!this.opts.regionalCache) return;
438470
const cache = await this.getCacheInstance();
439471
if (!cache) return;
440472
const tags = optsKey.tags;
441-
await cache.put(
442-
this.getCacheUrlKey(optsKey),
443-
new Response(`${value}`, {
444-
headers: {
445-
"cache-control": `max-age=${this.opts.regionalCacheTtlSec ?? 5}`,
446-
...(tags.length > 0
447-
? {
473+
const tagsLastRevalidated = await stub.getRevalidationTimes(tags);
474+
await Promise.all(
475+
tags.map(async (tag) => {
476+
const lastRevalidated = tagsLastRevalidated[tag];
477+
if (lastRevalidated === undefined) return; // Should we store something in the cache if the tag is not found ?
478+
const cacheKey = this.getCacheUrlKey(optsKey.doId, tag);
479+
debugCache("Putting to regional cache", { cacheKey, lastRevalidated });
480+
await cache.put(cacheKey, new Response(lastRevalidated.toString(), {
481+
status: 200, headers: {
482+
"cache-control": `max-age=${this.opts.regionalCacheTtlSec ?? 5}`,
483+
...(tags.length > 0
484+
? {
448485
"cache-tag": tags.join(","),
449486
}
450-
: {}),
451-
},
487+
: {})
488+
}
489+
}));
452490
})
453491
);
454492
}
455493

494+
/**
495+
* Deletes the regional cache for the given tags
496+
* This is used to ensure that the cache is cleared when the tags are revalidated
497+
*/
456498
async deleteRegionalCache(optsKey: CacheTagKeyOptions) {
457499
// We never want to crash because of the cache
458500
try {
459501
if (!this.opts.regionalCache) return;
460502
const cache = await this.getCacheInstance();
461503
if (!cache) return;
462-
await cache.delete(this.getCacheUrlKey(optsKey));
504+
await Promise.all(
505+
optsKey.tags.map(async (tag) => {
506+
const cacheKey = this.getCacheUrlKey(optsKey.doId, tag);
507+
debugCache("Deleting from regional cache", { cacheKey });
508+
await cache.delete(cacheKey);
509+
})
510+
);
463511
} catch (e) {
464512
debugCache("Error while deleting from regional cache", e);
465513
}

0 commit comments

Comments
 (0)