Skip to content

Commit c894cb2

Browse files
committed
add new tag cache mode
1 parent 867defe commit c894cb2

File tree

14 files changed

+279
-43
lines changed

14 files changed

+279
-43
lines changed

packages/open-next/src/adapters/cache.ts

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { isBinaryContentType } from "../utils/binary";
22
import { debug, error, warn } from "./logger";
3+
import { getTagFromValue, hasBeenRevalidated } from "utils/cache";
34

45
interface CachedFetchValue {
56
kind: "FETCH";
@@ -134,14 +135,15 @@ export default class Cache {
134135

135136
if (cachedEntry?.value === undefined) return null;
136137

137-
const _lastModified = await globalThis.tagCache.getLastModified(
138+
const _tags = [...(tags ?? []), ...(softTags ?? [])];
139+
const _lastModified = cachedEntry.lastModified ?? Date.now();
140+
const _hasBeenRevalidated = await hasBeenRevalidated(
138141
key,
142+
_tags,
139143
cachedEntry?.lastModified,
140144
);
141-
if (_lastModified === -1) {
142-
// If some tags are stale we need to force revalidation
143-
return null;
144-
}
145+
146+
if (_hasBeenRevalidated) return null;
145147

146148
// For cases where we don't have tags, we need to ensure that the soft tags are not being revalidated
147149
// We only need to check for the path as it should already contain all the tags
@@ -154,11 +156,12 @@ export default class Cache {
154156
!tag.endsWith("page"),
155157
);
156158
if (path) {
157-
const pathLastModified = await globalThis.tagCache.getLastModified(
159+
const hasPathBeenUpdated = await hasBeenRevalidated(
158160
path.replace("_N_T_/", ""),
161+
[],
159162
cachedEntry.lastModified,
160163
);
161-
if (pathLastModified === -1) {
164+
if (hasPathBeenUpdated) {
162165
// In case the path has been revalidated, we don't want to use the fetch cache
163166
return null;
164167
}
@@ -184,16 +187,17 @@ export default class Cache {
184187
return null;
185188
}
186189

187-
const meta = cachedEntry.value.meta;
188-
const _lastModified = await globalThis.tagCache.getLastModified(
190+
const cacheData = cachedEntry?.value;
191+
const meta = cacheData?.meta;
192+
const tags = getTagFromValue(cacheData);
193+
const _lastModified = cachedEntry.lastModified ?? Date.now();
194+
const _hasBeenRevalidated = await hasBeenRevalidated(
189195
key,
190-
cachedEntry?.lastModified,
196+
tags,
197+
_lastModified,
191198
);
192-
if (_lastModified === -1) {
193-
// If some tags are stale we need to force revalidation
194-
return null;
195-
}
196-
const cacheData = cachedEntry?.value;
199+
if (cacheData === undefined || _hasBeenRevalidated) return null;
200+
197201
const requestId = globalThis.__openNextAls.getStore()?.requestId ?? "";
198202
globalThis.lastModified[requestId] = _lastModified;
199203
if (cacheData?.type === "route") {
@@ -370,23 +374,7 @@ export default class Cache {
370374
? (data.headers?.["x-next-cache-tags"]?.split(",") ?? [])
371375
: [];
372376
debug("derivedTags", derivedTags);
373-
// Get all tags stored in dynamodb for the given key
374-
// If any of the derived tags are not stored in dynamodb for the given key, write them
375-
const storedTags = await globalThis.tagCache.getByPath(key);
376-
const tagsToWrite = derivedTags.filter(
377-
(tag) => !storedTags.includes(tag),
378-
);
379-
if (tagsToWrite.length > 0) {
380-
await globalThis.tagCache.writeTags(
381-
tagsToWrite.map((tag) => ({
382-
path: key,
383-
tag: tag,
384-
// In case the tags are not there we just need to create them
385-
// but we don't want them to return from `getLastModified` as they are not stale
386-
revalidatedAt: 1,
387-
})),
388-
);
389-
}
377+
await this.updateTagsOnSet(key, derivedTags);
390378
debug("Finished setting cache");
391379
} catch (e) {
392380
error("Failed to set cache", e);
@@ -403,6 +391,9 @@ export default class Cache {
403391
}
404392
try {
405393
const _tags = Array.isArray(tags) ? tags : [tags];
394+
if (globalThis.tagCache.mode === "nextMode") {
395+
return globalThis.tagCache.writeTags(_tags);
396+
}
406397
for (const tag of _tags) {
407398
debug("revalidateTag", tag);
408399
// Find all keys with the given tag
@@ -466,4 +457,28 @@ export default class Cache {
466457
error("Failed to revalidate tag", e);
467458
}
468459
}
460+
461+
private async updateTagsOnSet(key: string, derivedTags: string[]) {
462+
if (
463+
globalThis.openNextConfig.dangerous?.disableTagCache ||
464+
globalThis.tagCache.mode === "nextMode"
465+
) {
466+
return;
467+
}
468+
// Get all tags stored in dynamodb for the given key
469+
// If any of the derived tags are not stored in dynamodb for the given key, write them
470+
const storedTags = await globalThis.tagCache.getByPath(key);
471+
const tagsToWrite = derivedTags.filter((tag) => !storedTags.includes(tag));
472+
if (tagsToWrite.length > 0) {
473+
await globalThis.tagCache.writeTags(
474+
tagsToWrite.map((tag) => ({
475+
path: key,
476+
tag: tag,
477+
// In case the tags are not there we just need to create them
478+
// but we don't want them to return from `getLastModified` as they are not stale
479+
revalidatedAt: 1,
480+
})),
481+
);
482+
}
483+
}
469484
}

packages/open-next/src/adapters/dynamo-provider.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ async function defaultHandler(
4747
async function insert(
4848
requestType: InitializationFunctionEvent["requestType"],
4949
): Promise<InitializationFunctionEvent> {
50+
// If it is in nextMode, we don't need to do anything
51+
if (tagCache.mode === "nextMode") {
52+
return {
53+
type: "initializationFunction",
54+
requestType,
55+
resourceId: PHYSICAL_RESOURCE_ID,
56+
};
57+
}
5058
const file = readFileSync("dynamodb-cache.json", "utf8");
5159

5260
const data: DataType[] = JSON.parse(file);

packages/open-next/src/core/routing/cacheInterceptor.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { emptyReadableStream, toReadableStream } from "utils/stream";
88
import { debug } from "../../adapters/logger";
99
import { localizePath } from "./i18n";
1010
import { generateMessageGroupId } from "./util";
11+
import { getTagFromValue, hasBeenRevalidated } from "utils/cache";
1112

1213
const CACHE_ONE_YEAR = 60 * 60 * 24 * 365;
1314
const CACHE_ONE_MONTH = 60 * 60 * 24 * 30;
@@ -161,15 +162,15 @@ export async function cacheInterceptor(
161162
if (!cachedData?.value) {
162163
return event;
163164
}
164-
165-
if (cachedData?.value?.type === "app") {
166-
// We need to check the tag cache now
167-
const _lastModified = await globalThis.tagCache.getLastModified(
165+
// We need to check the tag cache now
166+
if (cachedData.value?.type === "app") {
167+
const tags = getTagFromValue(cachedData.value);
168+
const _hasBeenRevalidated = await hasBeenRevalidated(
168169
localizedPath,
170+
tags,
169171
cachedData.lastModified,
170172
);
171-
if (_lastModified === -1) {
172-
// If some tags are stale we need to force revalidation
173+
if (_hasBeenRevalidated) {
173174
return event;
174175
}
175176
}

packages/open-next/src/overrides/incrementalCache/multi-tier-ddb-s3.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ const maxCacheSize = process.env.OPEN_NEXT_LOCAL_CACHE_SIZE
1414
: 1000;
1515

1616
const localCache = new LRUCache<{
17-
value: CacheValue<false>;
17+
value: CacheValue<any>;
1818
lastModified: number;
1919
}>(maxCacheSize);
2020

packages/open-next/src/overrides/tagCache/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export const MAX_DYNAMO_BATCH_WRITE_ITEM_COUNT = 25;
2+
export const MAX_DYNAMO_BATCH_GET_ITEM_COUNT = 100;
23

34
/**
45
* Sending to dynamo X commands at a time, using about X * 25 write units per batch to not overwhelm DDB
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import type { NextModeTagCache } from "types/overrides";
2+
3+
import { AwsClient } from "aws4fetch";
4+
import { RecoverableError } from "utils/error";
5+
import { customFetchClient } from "utils/fetch";
6+
7+
import { debug, error } from "../../adapters/logger";
8+
import { chunk, parseNumberFromEnv } from "../../adapters/util";
9+
import {
10+
MAX_DYNAMO_BATCH_WRITE_ITEM_COUNT,
11+
getDynamoBatchWriteCommandConcurrency,
12+
} from "./constants";
13+
import path from "node:path";
14+
15+
let awsClient: AwsClient | null = null;
16+
17+
const getAwsClient = () => {
18+
const { CACHE_BUCKET_REGION } = process.env;
19+
if (awsClient) {
20+
return awsClient;
21+
}
22+
awsClient = new AwsClient({
23+
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
24+
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
25+
sessionToken: process.env.AWS_SESSION_TOKEN,
26+
region: CACHE_BUCKET_REGION,
27+
retries: parseNumberFromEnv(process.env.AWS_SDK_S3_MAX_ATTEMPTS),
28+
});
29+
return awsClient;
30+
};
31+
const awsFetch = (
32+
body: RequestInit["body"],
33+
type: "query" | "batchWrite" = "query",
34+
) => {
35+
const { CACHE_BUCKET_REGION } = process.env;
36+
const client = getAwsClient();
37+
return customFetchClient(client)(
38+
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
39+
{
40+
method: "POST",
41+
headers: {
42+
"Content-Type": "application/x-amz-json-1.0",
43+
"X-Amz-Target": `DynamoDB_20120810.${
44+
type === "query" ? "BatchGetItem" : "BatchWriteItem"
45+
}`,
46+
},
47+
body,
48+
},
49+
);
50+
};
51+
52+
function buildDynamoKey(key: string) {
53+
const { NEXT_BUILD_ID } = process.env;
54+
// FIXME: We should probably use something else than path.join here
55+
// this could transform some fetch cache key into a valid path
56+
return path.posix.join(NEXT_BUILD_ID ?? "", "_tag", key);
57+
}
58+
59+
// We use the same key for both path and tag
60+
// That's mostly for compatibility reason so that it's easier to use this with existing infra
61+
// FIXME: Allow a simpler object without an unnecessary path key
62+
function buildDynamoObject(tag: string, revalidatedAt?: number) {
63+
return {
64+
path: { S: buildDynamoKey(tag) },
65+
tag: { S: buildDynamoKey(tag) },
66+
revalidatedAt: { N: `${revalidatedAt ?? Date.now()}` },
67+
};
68+
}
69+
70+
export default {
71+
name: "ddb-nextMode",
72+
mode: "nextMode",
73+
hasBeenRevalidated: async (tags: string[], lastModified?: number) => {
74+
if (globalThis.openNextConfig.dangerous?.disableTagCache) {
75+
return false;
76+
}
77+
const { CACHE_DYNAMO_TABLE } = process.env;
78+
// It's unlikely that we will have more than 100 items to query
79+
// If that's the case, you should not use this tagCache implementation
80+
const response = await awsFetch(
81+
JSON.stringify({
82+
RequestItems: {
83+
[CACHE_DYNAMO_TABLE ?? ""]: {
84+
Keys: tags.map((tag) => ({
85+
path: { S: buildDynamoKey(tag) },
86+
tag: { S: buildDynamoKey(tag) },
87+
})),
88+
},
89+
},
90+
}),
91+
"query",
92+
);
93+
if (response.status !== 200) {
94+
throw new RecoverableError(
95+
`Failed to query dynamo item: ${response.status}`,
96+
);
97+
}
98+
// Now we need to check for every item if lastModified is greater than the revalidatedAt
99+
const { Responses } = await response.json();
100+
if (!Responses) {
101+
return false;
102+
}
103+
const revalidatedTags = Responses[CACHE_DYNAMO_TABLE ?? ""].filter(
104+
(item: any) =>
105+
Number.parseInt(item.revalidatedAt.N) > (lastModified ?? 0),
106+
);
107+
debug("retrieved tags", revalidatedTags);
108+
return revalidatedTags.length > 0;
109+
},
110+
writeTags: async (tags: string[]) => {
111+
try {
112+
const { CACHE_DYNAMO_TABLE } = process.env;
113+
if (globalThis.openNextConfig.dangerous?.disableTagCache) {
114+
return;
115+
}
116+
const dataChunks = chunk(tags, MAX_DYNAMO_BATCH_WRITE_ITEM_COUNT).map(
117+
(Items) => ({
118+
RequestItems: {
119+
[CACHE_DYNAMO_TABLE ?? ""]: Items.map((tag) => ({
120+
PutRequest: {
121+
Item: {
122+
...buildDynamoObject(tag),
123+
},
124+
},
125+
})),
126+
},
127+
}),
128+
);
129+
const toInsert = chunk(
130+
dataChunks,
131+
getDynamoBatchWriteCommandConcurrency(),
132+
);
133+
for (const paramsChunk of toInsert) {
134+
await Promise.all(
135+
paramsChunk.map(async (params) => {
136+
const response = await awsFetch(
137+
JSON.stringify(params),
138+
"batchWrite",
139+
);
140+
if (response.status !== 200) {
141+
throw new RecoverableError(
142+
`Failed to batch write dynamo item: ${response.status}`,
143+
);
144+
}
145+
return response;
146+
}),
147+
);
148+
}
149+
} catch (e) {
150+
error("Failed to batch write dynamo item", e);
151+
}
152+
},
153+
} satisfies NextModeTagCache;

packages/open-next/src/overrides/tagCache/dummy.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { TagCache } from "types/overrides";
33
// We don't want to throw error on this one because we might use it when we don't need tag cache
44
const dummyTagCache: TagCache = {
55
name: "dummy",
6+
mode: "original",
67
getByPath: async () => {
78
return [];
89
},

packages/open-next/src/overrides/tagCache/dynamodb-lite.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ function buildDynamoObject(path: string, tags: string, revalidatedAt?: number) {
6666
}
6767

6868
const tagCache: TagCache = {
69+
mode: "original",
6970
async getByPath(path) {
7071
try {
7172
if (globalThis.openNextConfig.dangerous?.disableTagCache) {

packages/open-next/src/overrides/tagCache/dynamodb.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ function buildDynamoObject(path: string, tags: string, revalidatedAt?: number) {
4242
}
4343

4444
const tagCache: TagCache = {
45+
mode: "original",
4546
async getByPath(path) {
4647
try {
4748
if (globalThis.openNextConfig.dangerous?.disableTagCache) {

packages/open-next/src/overrides/tagCache/fs-dev.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ let tags = JSON.parse(tagContent) as {
1414

1515
const tagCache: TagCache = {
1616
name: "fs-dev",
17+
mode: "original",
1718
getByPath: async (path: string) => {
1819
return tags
1920
.filter((tagPathMapping) => tagPathMapping.path.S === path)

0 commit comments

Comments
 (0)