Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/open-next/src/adapters/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type {
IncrementalCacheContext,
IncrementalCacheValue,
} from "types/cache";
import { getTagsFromValue, hasBeenRevalidated } from "utils/cache";
import { getTagsFromValue, hasBeenRevalidated, writeTags } from "utils/cache";
import { isBinaryContentType } from "../utils/binary";
import { debug, error, warn } from "./logger";

Expand Down Expand Up @@ -326,7 +326,7 @@ export default class Cache {
if (globalThis.tagCache.mode === "nextMode") {
const paths = (await globalThis.tagCache.getPathsByTags?.(_tags)) ?? [];

await globalThis.tagCache.writeTags(_tags);
await writeTags(_tags);
if (paths.length > 0) {
// TODO: we should introduce a new method in cdnInvalidationHandler to invalidate paths by tags for cdn that supports it
// It also means that we'll need to provide the tags used in every request to the wrapper or converter.
Expand Down Expand Up @@ -378,7 +378,7 @@ export default class Cache {
}

// Update all keys with the given tag with revalidatedAt set to now
await globalThis.tagCache.writeTags(toInsert);
await writeTags(toInsert);

// We can now invalidate all paths in the CDN
// This only applies to `revalidateTag`, not to `res.revalidate()`
Expand Down Expand Up @@ -442,7 +442,7 @@ export default class Cache {
const storedTags = await globalThis.tagCache.getByPath(key);
const tagsToWrite = derivedTags.filter((tag) => !storedTags.includes(tag));
if (tagsToWrite.length > 0) {
await globalThis.tagCache.writeTags(
await writeTags(
tagsToWrite.map((tag) => ({
path: key,
tag: tag,
Expand Down
21 changes: 15 additions & 6 deletions packages/open-next/src/adapters/composable-cache.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";
import { writeTags } from "utils/cache";
import { fromReadableStream, toReadableStream } from "utils/stream";
import { debug } from "./logger";

const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>();

export default {
async get(cacheKey: string) {
try {
// We first check if we have a pending write for this cache key
// If we do, we return the pending promise instead of fetching the cache
if (pendingWritePromiseMap.has(cacheKey)) {
return pendingWritePromiseMap.get(cacheKey);
}
const result = await globalThis.incrementalCache.get(
cacheKey,
"composable",
Expand Down Expand Up @@ -48,7 +56,10 @@ export default {
},

async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {
const entry = await pendingEntry;
pendingWritePromiseMap.set(cacheKey, pendingEntry);
const entry = await pendingEntry.finally(() => {
pendingWritePromiseMap.delete(cacheKey);
});
const valueToStore = await fromReadableStream(entry.value);
await globalThis.incrementalCache.set(
cacheKey,
Expand All @@ -62,9 +73,7 @@ export default {
const storedTags = await globalThis.tagCache.getByPath(cacheKey);
const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag));
if (tagsToWrite.length > 0) {
await globalThis.tagCache.writeTags(
tagsToWrite.map((tag) => ({ tag, path: cacheKey })),
);
await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey })));
}
}
},
Expand All @@ -83,7 +92,7 @@ export default {
},
async expireTags(...tags: string[]) {
if (globalThis.tagCache.mode === "nextMode") {
return globalThis.tagCache.writeTags(tags);
return writeTags(tags);
}
const tagCache = globalThis.tagCache;
const revalidatedAt = Date.now();
Expand All @@ -104,7 +113,7 @@ export default {
for (const entry of pathsToUpdate.flat()) {
setToWrite.add(entry);
}
await globalThis.tagCache.writeTags(Array.from(setToWrite));
await writeTags(Array.from(setToWrite));
},

// This one is necessary for older versions of next
Expand Down
2 changes: 2 additions & 0 deletions packages/open-next/src/types/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ interface OpenNextRequestContext {
// Last modified time of the page (used in main functions, only available for ISR/SSG).
lastModified?: number;
waitUntil?: WaitUntil;
/** We use this to deduplicate write of the tags*/
writtenTags: Set<string>;
}

declare global {
Expand Down
10 changes: 7 additions & 3 deletions packages/open-next/src/types/overrides.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ export type NextModeTagCache = BaseTagCache & {
getPathsByTags?: (tags: string[]) => Promise<string[]>;
};

export interface OriginalTagCacheWriteInput {
tag: string;
path: string;
revalidatedAt?: number;
}

/**
* On get :
We just check for the cache key in the tag cache. If it has been revalidated we just return null, otherwise we continue
Expand All @@ -168,9 +174,7 @@ export type OriginalTagCache = BaseTagCache & {
getByTag(tag: string): Promise<string[]>;
getByPath(path: string): Promise<string[]>;
getLastModified(path: string, lastModified?: number): Promise<number>;
writeTags(
tags: { tag: string; path: string; revalidatedAt?: number }[],
): Promise<void>;
writeTags(tags: OriginalTagCacheWriteInput[]): Promise<void>;
};

export type TagCache = NextModeTagCache | OriginalTagCache;
Expand Down
43 changes: 42 additions & 1 deletion packages/open-next/src/utils/cache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import type { CacheValue, WithLastModified } from "types/overrides";
import type {
CacheValue,
OriginalTagCacheWriteInput,
WithLastModified,
} from "types/overrides";
import { debug } from "../adapters/logger";

export async function hasBeenRevalidated(
key: string,
Expand Down Expand Up @@ -39,3 +44,39 @@ export function getTagsFromValue(value?: CacheValue<"cache">) {
return [];
}
}

function getTagKey(tag: string | OriginalTagCacheWriteInput): string {
if (typeof tag === "string") {
return tag;
}
return JSON.stringify({
tag: tag.tag,
path: tag.path,
});
}

export async function writeTags(
tags: (string | OriginalTagCacheWriteInput)[],
): Promise<void> {
const store = globalThis.__openNextAls.getStore();
debug("Writing tags", tags, store);
if (!store || globalThis.openNextConfig.dangerous?.disableTagCache) {
return;
}
const tagsToWrite = tags.filter((t) => {
const tagKey = getTagKey(t);
const shouldWrite = !store.writtenTags.has(tagKey);
// We preemptively add the tag to the writtenTags set
// to avoid writing the same tag multiple times in the same request
if (shouldWrite) {
store.writtenTags.add(tagKey);
}
return shouldWrite;
});
if (tagsToWrite.length === 0) {
return;
}

// Here we know that we have the correct type
await globalThis.tagCache.writeTags(tagsToWrite as any);
}
1 change: 1 addition & 0 deletions packages/open-next/src/utils/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export function runWithOpenNextRequestContext<T>(
pendingPromiseRunner: new DetachedPromiseRunner(),
isISRRevalidation,
waitUntil,
writtenTags: new Set<string>(),
},
async () => {
provideNextAfterProvider();
Expand Down
1 change: 1 addition & 0 deletions packages/tests-unit/tests/adapters/cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ describe("CacheHandler", () => {
resolve: vi.fn(),
}),
},
writtenTags: new Set(),
}),
};

Expand Down
Loading