Skip to content

Commit 858b075

Browse files
committed
initial implementation of a sharded tag cache
1 parent 7a80a48 commit 858b075

File tree

7 files changed

+137
-3
lines changed

7 files changed

+137
-3
lines changed

examples/e2e/app-router/open-next.config.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ import { defineCloudflareConfig } from "@opennextjs/cloudflare";
22
import d1TagCache from "@opennextjs/cloudflare/d1-tag-cache";
33
import kvIncrementalCache from "@opennextjs/cloudflare/kv-cache";
44
import doQueue from "@opennextjs/cloudflare/durable-queue";
5+
import shardedTagCache from "@opennextjs/cloudflare/do-sharded-tag-cache";
56

67
export default defineCloudflareConfig({
78
incrementalCache: kvIncrementalCache,
8-
tagCache: d1TagCache,
9+
tagCache: shardedTagCache,
910
queue: doQueue,
1011
});

examples/e2e/app-router/wrangler.jsonc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@
1313
{
1414
"name": "NEXT_CACHE_REVALIDATION_DURABLE_OBJECT",
1515
"class_name": "DurableObjectQueueHandler"
16+
},
17+
{
18+
"name": "NEXT_CACHE_D1_SHARDED",
19+
"class_name": "DOShardedTagCache"
1620
}
1721
]
1822
},
1923
"migrations": [
2024
{
2125
"tag": "v1",
22-
"new_sqlite_classes": ["DurableObjectQueueHandler"]
26+
"new_sqlite_classes": ["DurableObjectQueueHandler", "DOShardedTagCache"]
2327
}
2428
],
2529
"kv_namespaces": [

packages/cloudflare/src/api/cloudflare-context.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { Context, RunningCodeOptions } from "node:vm";
22

33
import type { DurableObjectQueueHandler } from "./durable-objects/queue";
4+
import { DOShardedTagCache } from "./durable-objects/sharded-tag-cache";
45

56
declare global {
67
interface CloudflareEnv {
@@ -16,6 +17,9 @@ declare global {
1617
NEXT_CACHE_REVALIDATION_WORKER?: Service;
1718
// Durable Object namespace to use for the durable object queue handler
1819
NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace<DurableObjectQueueHandler>;
20+
// Durables object namespace to use for the sharded tag cache
21+
NEXT_CACHE_D1_SHARDED?: DurableObjectNamespace<DOShardedTagCache>;
22+
1923
// Asset binding
2024
ASSETS?: Fetcher;
2125

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import { debug } from "@opennextjs/aws/adapters/logger.js";
2+
import { generateMessageGroupId } from "@opennextjs/aws/core/routing/queue.js";
3+
import type { OpenNextConfig } from "@opennextjs/aws/types/open-next";
4+
import type { NextModeTagCache } from "@opennextjs/aws/types/overrides.js";
5+
import { IgnorableError } from "@opennextjs/aws/utils/error.js";
6+
7+
import { getCloudflareContext } from "./cloudflare-context";
8+
9+
class ShardedD1TagCache implements NextModeTagCache {
10+
mode = "nextMode" as const;
11+
public readonly name = "sharded-d1-tag-cache";
12+
13+
private getDurableObjectStub(shardId: string) {
14+
const durableObject = getCloudflareContext().env.NEXT_CACHE_D1_SHARDED;
15+
if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation");
16+
17+
const id = durableObject.idFromName(shardId);
18+
return durableObject.get(id);
19+
}
20+
21+
private generateShards(tags: string[]) {
22+
// For each tag, we generate a message group id
23+
const messageGroupIds = tags.map((tag) => ({ shardId: generateMessageGroupId(tag), tag }));
24+
// We group the tags by shard
25+
const shards = new Map<string, string[]>();
26+
for (const { shardId, tag } of messageGroupIds) {
27+
const tags = shards.get(shardId) ?? [];
28+
tags.push(tag);
29+
shards.set(shardId, tags);
30+
}
31+
return shards;
32+
}
33+
34+
private getConfig() {
35+
const cfEnv = getCloudflareContext().env;
36+
const db = cfEnv.NEXT_CACHE_D1_SHARDED;
37+
38+
if (!db) debug("No Durable object found");
39+
40+
const isDisabled = !!(globalThis as unknown as { openNextConfig: OpenNextConfig }).openNextConfig
41+
.dangerous?.disableTagCache;
42+
43+
if (!db || isDisabled) {
44+
return { isDisabled: true as const };
45+
}
46+
47+
return {
48+
isDisabled: false as const,
49+
db,
50+
};
51+
}
52+
53+
async hasBeenRevalidated(tags: string[], lastModified?: number): Promise<boolean> {
54+
const { isDisabled } = this.getConfig();
55+
if (isDisabled) return false;
56+
const shards = this.generateShards(tags);
57+
let hasBeenRevalidated = false;
58+
console.log("Checking revalidation for tags", tags, shards);
59+
// We then create a new durable object for each shard
60+
await Promise.all(
61+
Array.from(shards.entries()).map(async ([shardId, shardedTags]) => {
62+
const stub = this.getDurableObjectStub(shardId);
63+
hasBeenRevalidated = hasBeenRevalidated || (await stub.hasBeenRevalidated(shardedTags, lastModified));
64+
})
65+
);
66+
return hasBeenRevalidated;
67+
}
68+
69+
async writeTags(tags: string[]): Promise<void> {
70+
const { isDisabled } = this.getConfig();
71+
if (isDisabled) return;
72+
const shards = this.generateShards(tags);
73+
console.log("Writing tags", tags, shards);
74+
// We then create a new durable object for each shard
75+
await Promise.all(
76+
Array.from(shards.entries()).map(async ([shardId, shardedTags]) => {
77+
const stub = this.getDurableObjectStub(shardId);
78+
await stub.writeTags(shardedTags);
79+
})
80+
);
81+
}
82+
}
83+
84+
export default new ShardedD1TagCache();
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { DurableObject } from "cloudflare:workers";
2+
3+
export class DOShardedTagCache extends DurableObject<CloudflareEnv> {
4+
sql: SqlStorage;
5+
6+
constructor(state: DurableObjectState, env: CloudflareEnv) {
7+
super(state, env);
8+
this.sql = state.storage.sql;
9+
state.blockConcurrencyWhile(async () => {
10+
this.sql.exec(`CREATE TABLE IF NOT EXISTS revalidations (tag TEXT PRIMARY KEY, revalidatedAt INTEGER)`);
11+
});
12+
}
13+
14+
async hasBeenRevalidated(tags: string[], lastModified?: number): Promise<boolean> {
15+
const result = this.sql
16+
.exec<{
17+
cnt: number;
18+
}>(
19+
`SELECT COUNT(*) as cnt FROM revalidations WHERE tag IN (${tags.map(() => "?").join(", ")}) AND revalidatedAt > ?`,
20+
...tags,
21+
lastModified ?? Date.now()
22+
)
23+
.one();
24+
return result.cnt > 0;
25+
}
26+
27+
async writeTags(tags: string[]): Promise<void> {
28+
tags.forEach((tag) => {
29+
this.sql.exec(
30+
`INSERT OR REPLACE INTO revalidations (tag, revalidatedAt) VALUES (?, ?)`,
31+
tag,
32+
Date.now()
33+
);
34+
});
35+
}
36+
}

packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ import { type BuildOptions, esbuildSync, getPackagePath } from "@opennextjs/aws/
66

77
export function compileDurableObjects(buildOpts: BuildOptions) {
88
const _require = createRequire(import.meta.url);
9-
const entryPoints = [_require.resolve("@opennextjs/cloudflare/durable-objects/queue")];
9+
const entryPoints = [
10+
_require.resolve("@opennextjs/cloudflare/durable-objects/queue"),
11+
_require.resolve("@opennextjs/cloudflare/durable-objects/sharded-tag-cache"),
12+
];
1013

1114
const { outputDir } = buildOpts;
1215

packages/cloudflare/src/cli/templates/worker.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ Object.defineProperty(globalThis, Symbol.for("__cloudflare-context__"), {
1919

2020
//@ts-expect-error: Will be resolved by wrangler build
2121
export { DurableObjectQueueHandler } from "./.build/durable-objects/queue.js";
22+
//@ts-expect-error: Will be resolved by wrangler build
23+
export { DOShardedTagCache } from "./.build/durable-objects/sharded-tag-cache.js";
2224

2325
// Populate process.env on the first request
2426
let processEnvPopulated = false;

0 commit comments

Comments
 (0)