From f602860c2036ecf5536a5d1243c1d5c4ca84efdf Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Thu, 19 Jun 2025 20:26:01 +0200 Subject: [PATCH 1/5] only call tag cache once --- packages/open-next/src/adapters/cache.ts | 12 ++++-- .../src/adapters/composable-cache.ts | 9 ++-- packages/open-next/src/core/requestHandler.ts | 5 +++ packages/open-next/src/types/global.ts | 3 ++ packages/open-next/src/types/overrides.ts | 10 +++-- packages/open-next/src/utils/cache.ts | 42 ++++++++++++++++++- packages/open-next/src/utils/promise.ts | 2 + 7 files changed, 70 insertions(+), 13 deletions(-) diff --git a/packages/open-next/src/adapters/cache.ts b/packages/open-next/src/adapters/cache.ts index 48a9d067d..35e880d29 100644 --- a/packages/open-next/src/adapters/cache.ts +++ b/packages/open-next/src/adapters/cache.ts @@ -3,7 +3,11 @@ import type { IncrementalCacheContext, IncrementalCacheValue, } from "types/cache"; -import { getTagsFromValue, hasBeenRevalidated } from "utils/cache"; +import { + addTagToWrite, + getTagsFromValue, + hasBeenRevalidated, +} from "utils/cache"; import { isBinaryContentType } from "../utils/binary"; import { debug, error, warn } from "./logger"; @@ -326,7 +330,7 @@ export default class Cache { if (globalThis.tagCache.mode === "nextMode") { const paths = (await globalThis.tagCache.getPathsByTags?.(_tags)) ?? []; - await globalThis.tagCache.writeTags(_tags); + addTagToWrite(_tags); if (paths.length > 0) { // TODO: we should introduce a new method in cdnInvalidationHandler to invalidate paths by tags for cdn that supports it // It also means that we'll need to provide the tags used in every request to the wrapper or converter. @@ -378,7 +382,7 @@ export default class Cache { } // Update all keys with the given tag with revalidatedAt set to now - await globalThis.tagCache.writeTags(toInsert); + addTagToWrite(toInsert); // We can now invalidate all paths in the CDN // This only applies to `revalidateTag`, not to `res.revalidate()` @@ -442,7 +446,7 @@ export default class Cache { const storedTags = await globalThis.tagCache.getByPath(key); const tagsToWrite = derivedTags.filter((tag) => !storedTags.includes(tag)); if (tagsToWrite.length > 0) { - await globalThis.tagCache.writeTags( + addTagToWrite( tagsToWrite.map((tag) => ({ path: key, tag: tag, diff --git a/packages/open-next/src/adapters/composable-cache.ts b/packages/open-next/src/adapters/composable-cache.ts index 00b3847bd..afea72a11 100644 --- a/packages/open-next/src/adapters/composable-cache.ts +++ b/packages/open-next/src/adapters/composable-cache.ts @@ -1,6 +1,7 @@ import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache"; import { fromReadableStream, toReadableStream } from "utils/stream"; import { debug } from "./logger"; +import { addTagToWrite } from "utils/cache"; export default { async get(cacheKey: string) { @@ -62,9 +63,7 @@ export default { const storedTags = await globalThis.tagCache.getByPath(cacheKey); const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag)); if (tagsToWrite.length > 0) { - await globalThis.tagCache.writeTags( - tagsToWrite.map((tag) => ({ tag, path: cacheKey })), - ); + addTagToWrite(tagsToWrite.map((tag) => ({ tag, path: cacheKey }))); } } }, @@ -83,7 +82,7 @@ export default { }, async expireTags(...tags: string[]) { if (globalThis.tagCache.mode === "nextMode") { - return globalThis.tagCache.writeTags(tags); + return addTagToWrite(tags); } const tagCache = globalThis.tagCache; const revalidatedAt = Date.now(); @@ -104,7 +103,7 @@ export default { for (const entry of pathsToUpdate.flat()) { setToWrite.add(entry); } - await globalThis.tagCache.writeTags(Array.from(setToWrite)); + addTagToWrite(Array.from(setToWrite)); }, // This one is necessary for older versions of next diff --git a/packages/open-next/src/core/requestHandler.ts b/packages/open-next/src/core/requestHandler.ts index 7296aba19..1509661df 100644 --- a/packages/open-next/src/core/requestHandler.ts +++ b/packages/open-next/src/core/requestHandler.ts @@ -27,6 +27,7 @@ import routingHandler, { MIDDLEWARE_HEADER_PREFIX_LEN, } from "./routingHandler"; import { requestHandler, setNextjsPrebundledReact } from "./util"; +import { executeTagCacheWrite } from "utils/cache"; // This is used to identify requests in the cache globalThis.__openNextAls = new AsyncLocalStorage(); @@ -210,6 +211,10 @@ export async function openNextHandler( isBase64Encoded, }; + // Last thing we do is to write the tags to the tag cache if we are in the main function + // This is done inside a pending promise runner so that it can be awaited + executeTagCacheWrite(); + return internalResult; }, ); diff --git a/packages/open-next/src/types/global.ts b/packages/open-next/src/types/global.ts index 085acae8d..9ef13c904 100644 --- a/packages/open-next/src/types/global.ts +++ b/packages/open-next/src/types/global.ts @@ -4,6 +4,7 @@ import type { OutgoingHttpHeaders } from "node:http"; import type { CDNInvalidationHandler, IncrementalCache, + OriginalTagCacheWriteInput, ProxyExternalRequest, Queue, TagCache, @@ -63,6 +64,8 @@ interface OpenNextRequestContext { // Last modified time of the page (used in main functions, only available for ISR/SSG). lastModified?: number; waitUntil?: WaitUntil; + /** We use this to deduplicate write of the tags - It's a string if the tag cache is in NextMode */ + pendingTagToWrite: Map; } declare global { diff --git a/packages/open-next/src/types/overrides.ts b/packages/open-next/src/types/overrides.ts index 4d8eb02d9..94e16c5cd 100644 --- a/packages/open-next/src/types/overrides.ts +++ b/packages/open-next/src/types/overrides.ts @@ -144,6 +144,12 @@ export type NextModeTagCache = BaseTagCache & { getPathsByTags?: (tags: string[]) => Promise; }; +export interface OriginalTagCacheWriteInput { + tag: string; + path: string; + revalidatedAt?: number; +} + /** * On get : We just check for the cache key in the tag cache. If it has been revalidated we just return null, otherwise we continue @@ -168,9 +174,7 @@ export type OriginalTagCache = BaseTagCache & { getByTag(tag: string): Promise; getByPath(path: string): Promise; getLastModified(path: string, lastModified?: number): Promise; - writeTags( - tags: { tag: string; path: string; revalidatedAt?: number }[], - ): Promise; + writeTags(tags: OriginalTagCacheWriteInput[]): Promise; }; export type TagCache = NextModeTagCache | OriginalTagCache; diff --git a/packages/open-next/src/utils/cache.ts b/packages/open-next/src/utils/cache.ts index 5072e4bef..33582b750 100644 --- a/packages/open-next/src/utils/cache.ts +++ b/packages/open-next/src/utils/cache.ts @@ -1,4 +1,8 @@ -import type { CacheValue, WithLastModified } from "types/overrides"; +import type { + CacheValue, + OriginalTagCacheWriteInput, + WithLastModified, +} from "types/overrides"; export async function hasBeenRevalidated( key: string, @@ -39,3 +43,39 @@ export function getTagsFromValue(value?: CacheValue<"cache">) { return []; } } + +export function executeTagCacheWrite() { + const store = globalThis.__openNextAls.getStore(); + if (!store || globalThis.openNextConfig.dangerous?.disableTagCache) { + return; + } + const tagCache = globalThis.tagCache; + const tagsToWrite = Array.from(store.pendingTagToWrite.values()); + + store.pendingPromiseRunner.add(tagCache.writeTags(tagsToWrite as any)); +} + +export function addTagToWrite( + tags: (string | OriginalTagCacheWriteInput)[], +): void { + const store = globalThis.__openNextAls.getStore(); + if (!store || globalThis.openNextConfig.dangerous?.disableTagCache) { + return; + } + for (const t of tags) { + if (typeof t === "string") { + store.pendingTagToWrite.set(t, t); + } else { + store.pendingTagToWrite.set( + // The primary key is only the path and the tag, not the revalidatedAt + JSON.stringify({ + tag: t.tag, + path: t.path, + }), + { + ...t, + }, + ); + } + } +} diff --git a/packages/open-next/src/utils/promise.ts b/packages/open-next/src/utils/promise.ts index 9fde876cb..cf1557e62 100644 --- a/packages/open-next/src/utils/promise.ts +++ b/packages/open-next/src/utils/promise.ts @@ -1,5 +1,6 @@ import type { WaitUntil } from "types/open-next"; import { debug, error } from "../adapters/logger"; +import type { OriginalTagCacheWriteInput } from "types/overrides"; /** * A `Promise.withResolvers` implementation that exposes the `resolve` and @@ -119,6 +120,7 @@ export function runWithOpenNextRequestContext( pendingPromiseRunner: new DetachedPromiseRunner(), isISRRevalidation, waitUntil, + pendingTagToWrite: new Map(), }, async () => { provideNextAfterProvider(); From 696a0e96010f5e3e5439e197a736b75831613a32 Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Sat, 21 Jun 2025 14:28:23 +0200 Subject: [PATCH 2/5] write tags right away instead of at the end of the request --- packages/open-next/src/adapters/cache.ts | 12 ++--- .../src/adapters/composable-cache.ts | 8 ++-- packages/open-next/src/core/requestHandler.ts | 5 -- packages/open-next/src/types/global.ts | 5 +- packages/open-next/src/utils/cache.ts | 48 +++++++++---------- packages/open-next/src/utils/promise.ts | 3 +- 6 files changed, 35 insertions(+), 46 deletions(-) diff --git a/packages/open-next/src/adapters/cache.ts b/packages/open-next/src/adapters/cache.ts index 35e880d29..6f0945b93 100644 --- a/packages/open-next/src/adapters/cache.ts +++ b/packages/open-next/src/adapters/cache.ts @@ -3,11 +3,7 @@ import type { IncrementalCacheContext, IncrementalCacheValue, } from "types/cache"; -import { - addTagToWrite, - getTagsFromValue, - hasBeenRevalidated, -} from "utils/cache"; +import { getTagsFromValue, hasBeenRevalidated, writeTags } from "utils/cache"; import { isBinaryContentType } from "../utils/binary"; import { debug, error, warn } from "./logger"; @@ -330,7 +326,7 @@ export default class Cache { if (globalThis.tagCache.mode === "nextMode") { const paths = (await globalThis.tagCache.getPathsByTags?.(_tags)) ?? []; - addTagToWrite(_tags); + await writeTags(_tags); if (paths.length > 0) { // TODO: we should introduce a new method in cdnInvalidationHandler to invalidate paths by tags for cdn that supports it // It also means that we'll need to provide the tags used in every request to the wrapper or converter. @@ -382,7 +378,7 @@ export default class Cache { } // Update all keys with the given tag with revalidatedAt set to now - addTagToWrite(toInsert); + await writeTags(toInsert); // We can now invalidate all paths in the CDN // This only applies to `revalidateTag`, not to `res.revalidate()` @@ -446,7 +442,7 @@ export default class Cache { const storedTags = await globalThis.tagCache.getByPath(key); const tagsToWrite = derivedTags.filter((tag) => !storedTags.includes(tag)); if (tagsToWrite.length > 0) { - addTagToWrite( + await writeTags( tagsToWrite.map((tag) => ({ path: key, tag: tag, diff --git a/packages/open-next/src/adapters/composable-cache.ts b/packages/open-next/src/adapters/composable-cache.ts index afea72a11..5e0e61e6e 100644 --- a/packages/open-next/src/adapters/composable-cache.ts +++ b/packages/open-next/src/adapters/composable-cache.ts @@ -1,7 +1,7 @@ import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache"; +import { writeTags } from "utils/cache"; import { fromReadableStream, toReadableStream } from "utils/stream"; import { debug } from "./logger"; -import { addTagToWrite } from "utils/cache"; export default { async get(cacheKey: string) { @@ -63,7 +63,7 @@ export default { const storedTags = await globalThis.tagCache.getByPath(cacheKey); const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag)); if (tagsToWrite.length > 0) { - addTagToWrite(tagsToWrite.map((tag) => ({ tag, path: cacheKey }))); + await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey }))); } } }, @@ -82,7 +82,7 @@ export default { }, async expireTags(...tags: string[]) { if (globalThis.tagCache.mode === "nextMode") { - return addTagToWrite(tags); + return writeTags(tags); } const tagCache = globalThis.tagCache; const revalidatedAt = Date.now(); @@ -103,7 +103,7 @@ export default { for (const entry of pathsToUpdate.flat()) { setToWrite.add(entry); } - addTagToWrite(Array.from(setToWrite)); + await writeTags(Array.from(setToWrite)); }, // This one is necessary for older versions of next diff --git a/packages/open-next/src/core/requestHandler.ts b/packages/open-next/src/core/requestHandler.ts index 1509661df..7296aba19 100644 --- a/packages/open-next/src/core/requestHandler.ts +++ b/packages/open-next/src/core/requestHandler.ts @@ -27,7 +27,6 @@ import routingHandler, { MIDDLEWARE_HEADER_PREFIX_LEN, } from "./routingHandler"; import { requestHandler, setNextjsPrebundledReact } from "./util"; -import { executeTagCacheWrite } from "utils/cache"; // This is used to identify requests in the cache globalThis.__openNextAls = new AsyncLocalStorage(); @@ -211,10 +210,6 @@ export async function openNextHandler( isBase64Encoded, }; - // Last thing we do is to write the tags to the tag cache if we are in the main function - // This is done inside a pending promise runner so that it can be awaited - executeTagCacheWrite(); - return internalResult; }, ); diff --git a/packages/open-next/src/types/global.ts b/packages/open-next/src/types/global.ts index 9ef13c904..882e57908 100644 --- a/packages/open-next/src/types/global.ts +++ b/packages/open-next/src/types/global.ts @@ -4,7 +4,6 @@ import type { OutgoingHttpHeaders } from "node:http"; import type { CDNInvalidationHandler, IncrementalCache, - OriginalTagCacheWriteInput, ProxyExternalRequest, Queue, TagCache, @@ -64,8 +63,8 @@ interface OpenNextRequestContext { // Last modified time of the page (used in main functions, only available for ISR/SSG). lastModified?: number; waitUntil?: WaitUntil; - /** We use this to deduplicate write of the tags - It's a string if the tag cache is in NextMode */ - pendingTagToWrite: Map; + /** We use this to deduplicate write of the tags*/ + writtenTags: Set; } declare global { diff --git a/packages/open-next/src/utils/cache.ts b/packages/open-next/src/utils/cache.ts index 33582b750..f8c1b5e47 100644 --- a/packages/open-next/src/utils/cache.ts +++ b/packages/open-next/src/utils/cache.ts @@ -44,38 +44,38 @@ export function getTagsFromValue(value?: CacheValue<"cache">) { } } -export function executeTagCacheWrite() { - const store = globalThis.__openNextAls.getStore(); - if (!store || globalThis.openNextConfig.dangerous?.disableTagCache) { - return; +function getTagKey(tag: string | OriginalTagCacheWriteInput): string { + if (typeof tag === "string") { + return tag; } - const tagCache = globalThis.tagCache; - const tagsToWrite = Array.from(store.pendingTagToWrite.values()); - - store.pendingPromiseRunner.add(tagCache.writeTags(tagsToWrite as any)); + return JSON.stringify({ + tag: tag.tag, + path: tag.path, + }); } -export function addTagToWrite( +export async function writeTags( tags: (string | OriginalTagCacheWriteInput)[], -): void { +): Promise { const store = globalThis.__openNextAls.getStore(); + console.log("Writing tags", tags, store); if (!store || globalThis.openNextConfig.dangerous?.disableTagCache) { return; } - for (const t of tags) { - if (typeof t === "string") { - store.pendingTagToWrite.set(t, t); - } else { - store.pendingTagToWrite.set( - // The primary key is only the path and the tag, not the revalidatedAt - JSON.stringify({ - tag: t.tag, - path: t.path, - }), - { - ...t, - }, - ); + const tagsToWrite = tags.filter((t) => { + const tagKey = getTagKey(t); + const shouldWrite = !store.writtenTags.has(tagKey); + // We preemptively add the tag to the writtenTags set + // to avoid writing the same tag multiple times in the same request + if (shouldWrite) { + store.writtenTags.add(tagKey); } + return shouldWrite; + }); + if (tagsToWrite.length === 0) { + return; } + + // Here we know that we have the correct type + await globalThis.tagCache.writeTags(tagsToWrite as any); } diff --git a/packages/open-next/src/utils/promise.ts b/packages/open-next/src/utils/promise.ts index cf1557e62..6ae9a491a 100644 --- a/packages/open-next/src/utils/promise.ts +++ b/packages/open-next/src/utils/promise.ts @@ -1,6 +1,5 @@ import type { WaitUntil } from "types/open-next"; import { debug, error } from "../adapters/logger"; -import type { OriginalTagCacheWriteInput } from "types/overrides"; /** * A `Promise.withResolvers` implementation that exposes the `resolve` and @@ -120,7 +119,7 @@ export function runWithOpenNextRequestContext( pendingPromiseRunner: new DetachedPromiseRunner(), isISRRevalidation, waitUntil, - pendingTagToWrite: new Map(), + writtenTags: new Set(), }, async () => { provideNextAfterProvider(); From fa3384de77a7438d9d636f5b80aab4ae4aec7db3 Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Sat, 21 Jun 2025 14:42:28 +0200 Subject: [PATCH 3/5] read from pending write in composable cache --- packages/open-next/src/adapters/composable-cache.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/packages/open-next/src/adapters/composable-cache.ts b/packages/open-next/src/adapters/composable-cache.ts index 5e0e61e6e..0c4fecdc2 100644 --- a/packages/open-next/src/adapters/composable-cache.ts +++ b/packages/open-next/src/adapters/composable-cache.ts @@ -3,9 +3,16 @@ import { writeTags } from "utils/cache"; import { fromReadableStream, toReadableStream } from "utils/stream"; import { debug } from "./logger"; +const pendingWritePromiseMap = new Map>(); + export default { async get(cacheKey: string) { try { + // We first check if we have a pending write for this cache key + // If we do, we return the pending promise instead of fetching the cache + if (pendingWritePromiseMap.has(cacheKey)) { + return pendingWritePromiseMap.get(cacheKey); + } const result = await globalThis.incrementalCache.get( cacheKey, "composable", @@ -49,7 +56,10 @@ export default { }, async set(cacheKey: string, pendingEntry: Promise) { - const entry = await pendingEntry; + pendingWritePromiseMap.set(cacheKey, pendingEntry); + const entry = await pendingEntry.finally(() => { + pendingWritePromiseMap.delete(cacheKey); + }); const valueToStore = await fromReadableStream(entry.value); await globalThis.incrementalCache.set( cacheKey, From 133c791a552400476db41c16fbbea754c199c485 Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Sat, 21 Jun 2025 14:43:27 +0200 Subject: [PATCH 4/5] fix unit test --- packages/tests-unit/tests/adapters/cache.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/tests-unit/tests/adapters/cache.test.ts b/packages/tests-unit/tests/adapters/cache.test.ts index e4e3977fd..af507200c 100644 --- a/packages/tests-unit/tests/adapters/cache.test.ts +++ b/packages/tests-unit/tests/adapters/cache.test.ts @@ -56,6 +56,7 @@ describe("CacheHandler", () => { resolve: vi.fn(), }), }, + writtenTags: new Set(), }), }; From 150db0d413c2b354850862443b37b4ba4a93cfda Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Mon, 23 Jun 2025 16:50:40 +0200 Subject: [PATCH 5/5] review fix --- packages/open-next/src/utils/cache.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/open-next/src/utils/cache.ts b/packages/open-next/src/utils/cache.ts index f8c1b5e47..bfc2fd781 100644 --- a/packages/open-next/src/utils/cache.ts +++ b/packages/open-next/src/utils/cache.ts @@ -3,6 +3,7 @@ import type { OriginalTagCacheWriteInput, WithLastModified, } from "types/overrides"; +import { debug } from "../adapters/logger"; export async function hasBeenRevalidated( key: string, @@ -58,7 +59,7 @@ export async function writeTags( tags: (string | OriginalTagCacheWriteInput)[], ): Promise { const store = globalThis.__openNextAls.getStore(); - console.log("Writing tags", tags, store); + debug("Writing tags", tags, store); if (!store || globalThis.openNextConfig.dangerous?.disableTagCache) { return; }