diff --git a/packages/open-next/src/adapters/cache.ts b/packages/open-next/src/adapters/cache.ts index 48a9d067d..6f0945b93 100644 --- a/packages/open-next/src/adapters/cache.ts +++ b/packages/open-next/src/adapters/cache.ts @@ -3,7 +3,7 @@ import type { IncrementalCacheContext, IncrementalCacheValue, } from "types/cache"; -import { getTagsFromValue, hasBeenRevalidated } from "utils/cache"; +import { getTagsFromValue, hasBeenRevalidated, writeTags } from "utils/cache"; import { isBinaryContentType } from "../utils/binary"; import { debug, error, warn } from "./logger"; @@ -326,7 +326,7 @@ export default class Cache { if (globalThis.tagCache.mode === "nextMode") { const paths = (await globalThis.tagCache.getPathsByTags?.(_tags)) ?? []; - await globalThis.tagCache.writeTags(_tags); + await writeTags(_tags); if (paths.length > 0) { // TODO: we should introduce a new method in cdnInvalidationHandler to invalidate paths by tags for cdn that supports it // It also means that we'll need to provide the tags used in every request to the wrapper or converter. @@ -378,7 +378,7 @@ export default class Cache { } // Update all keys with the given tag with revalidatedAt set to now - await globalThis.tagCache.writeTags(toInsert); + await writeTags(toInsert); // We can now invalidate all paths in the CDN // This only applies to `revalidateTag`, not to `res.revalidate()` @@ -442,7 +442,7 @@ export default class Cache { const storedTags = await globalThis.tagCache.getByPath(key); const tagsToWrite = derivedTags.filter((tag) => !storedTags.includes(tag)); if (tagsToWrite.length > 0) { - await globalThis.tagCache.writeTags( + await writeTags( tagsToWrite.map((tag) => ({ path: key, tag: tag, diff --git a/packages/open-next/src/adapters/composable-cache.ts b/packages/open-next/src/adapters/composable-cache.ts index 00b3847bd..0c4fecdc2 100644 --- a/packages/open-next/src/adapters/composable-cache.ts +++ b/packages/open-next/src/adapters/composable-cache.ts @@ -1,10 +1,18 @@ import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache"; +import { writeTags } from "utils/cache"; import { fromReadableStream, toReadableStream } from "utils/stream"; import { debug } from "./logger"; +const pendingWritePromiseMap = new Map>(); + export default { async get(cacheKey: string) { try { + // We first check if we have a pending write for this cache key + // If we do, we return the pending promise instead of fetching the cache + if (pendingWritePromiseMap.has(cacheKey)) { + return pendingWritePromiseMap.get(cacheKey); + } const result = await globalThis.incrementalCache.get( cacheKey, "composable", @@ -48,7 +56,10 @@ export default { }, async set(cacheKey: string, pendingEntry: Promise) { - const entry = await pendingEntry; + pendingWritePromiseMap.set(cacheKey, pendingEntry); + const entry = await pendingEntry.finally(() => { + pendingWritePromiseMap.delete(cacheKey); + }); const valueToStore = await fromReadableStream(entry.value); await globalThis.incrementalCache.set( cacheKey, @@ -62,9 +73,7 @@ export default { const storedTags = await globalThis.tagCache.getByPath(cacheKey); const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag)); if (tagsToWrite.length > 0) { - await globalThis.tagCache.writeTags( - tagsToWrite.map((tag) => ({ tag, path: cacheKey })), - ); + await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey }))); } } }, @@ -83,7 +92,7 @@ export default { }, async expireTags(...tags: string[]) { if (globalThis.tagCache.mode === "nextMode") { - return globalThis.tagCache.writeTags(tags); + return writeTags(tags); } const tagCache = globalThis.tagCache; const revalidatedAt = Date.now(); @@ -104,7 +113,7 @@ export default { for (const entry of pathsToUpdate.flat()) { setToWrite.add(entry); } - await globalThis.tagCache.writeTags(Array.from(setToWrite)); + await writeTags(Array.from(setToWrite)); }, // This one is necessary for older versions of next diff --git a/packages/open-next/src/types/global.ts b/packages/open-next/src/types/global.ts index 085acae8d..882e57908 100644 --- a/packages/open-next/src/types/global.ts +++ b/packages/open-next/src/types/global.ts @@ -63,6 +63,8 @@ interface OpenNextRequestContext { // Last modified time of the page (used in main functions, only available for ISR/SSG). lastModified?: number; waitUntil?: WaitUntil; + /** We use this to deduplicate write of the tags*/ + writtenTags: Set; } declare global { diff --git a/packages/open-next/src/types/overrides.ts b/packages/open-next/src/types/overrides.ts index 4d8eb02d9..94e16c5cd 100644 --- a/packages/open-next/src/types/overrides.ts +++ b/packages/open-next/src/types/overrides.ts @@ -144,6 +144,12 @@ export type NextModeTagCache = BaseTagCache & { getPathsByTags?: (tags: string[]) => Promise; }; +export interface OriginalTagCacheWriteInput { + tag: string; + path: string; + revalidatedAt?: number; +} + /** * On get : We just check for the cache key in the tag cache. If it has been revalidated we just return null, otherwise we continue @@ -168,9 +174,7 @@ export type OriginalTagCache = BaseTagCache & { getByTag(tag: string): Promise; getByPath(path: string): Promise; getLastModified(path: string, lastModified?: number): Promise; - writeTags( - tags: { tag: string; path: string; revalidatedAt?: number }[], - ): Promise; + writeTags(tags: OriginalTagCacheWriteInput[]): Promise; }; export type TagCache = NextModeTagCache | OriginalTagCache; diff --git a/packages/open-next/src/utils/cache.ts b/packages/open-next/src/utils/cache.ts index 5072e4bef..bfc2fd781 100644 --- a/packages/open-next/src/utils/cache.ts +++ b/packages/open-next/src/utils/cache.ts @@ -1,4 +1,9 @@ -import type { CacheValue, WithLastModified } from "types/overrides"; +import type { + CacheValue, + OriginalTagCacheWriteInput, + WithLastModified, +} from "types/overrides"; +import { debug } from "../adapters/logger"; export async function hasBeenRevalidated( key: string, @@ -39,3 +44,39 @@ export function getTagsFromValue(value?: CacheValue<"cache">) { return []; } } + +function getTagKey(tag: string | OriginalTagCacheWriteInput): string { + if (typeof tag === "string") { + return tag; + } + return JSON.stringify({ + tag: tag.tag, + path: tag.path, + }); +} + +export async function writeTags( + tags: (string | OriginalTagCacheWriteInput)[], +): Promise { + const store = globalThis.__openNextAls.getStore(); + debug("Writing tags", tags, store); + if (!store || globalThis.openNextConfig.dangerous?.disableTagCache) { + return; + } + const tagsToWrite = tags.filter((t) => { + const tagKey = getTagKey(t); + const shouldWrite = !store.writtenTags.has(tagKey); + // We preemptively add the tag to the writtenTags set + // to avoid writing the same tag multiple times in the same request + if (shouldWrite) { + store.writtenTags.add(tagKey); + } + return shouldWrite; + }); + if (tagsToWrite.length === 0) { + return; + } + + // Here we know that we have the correct type + await globalThis.tagCache.writeTags(tagsToWrite as any); +} diff --git a/packages/open-next/src/utils/promise.ts b/packages/open-next/src/utils/promise.ts index 9fde876cb..6ae9a491a 100644 --- a/packages/open-next/src/utils/promise.ts +++ b/packages/open-next/src/utils/promise.ts @@ -119,6 +119,7 @@ export function runWithOpenNextRequestContext( pendingPromiseRunner: new DetachedPromiseRunner(), isISRRevalidation, waitUntil, + writtenTags: new Set(), }, async () => { provideNextAfterProvider(); diff --git a/packages/tests-unit/tests/adapters/cache.test.ts b/packages/tests-unit/tests/adapters/cache.test.ts index e4e3977fd..af507200c 100644 --- a/packages/tests-unit/tests/adapters/cache.test.ts +++ b/packages/tests-unit/tests/adapters/cache.test.ts @@ -56,6 +56,7 @@ describe("CacheHandler", () => { resolve: vi.fn(), }), }, + writtenTags: new Set(), }), };