Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/open-next/src/adapters/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type {
IncrementalCacheContext,
IncrementalCacheValue,
} from "types/cache";
import { getTagsFromValue, hasBeenRevalidated } from "utils/cache";
import { getTagsFromValue, hasBeenRevalidated, writeTags } from "utils/cache";
import { isBinaryContentType } from "../utils/binary";
import { debug, error, warn } from "./logger";

Expand Down Expand Up @@ -326,7 +326,7 @@ export default class Cache {
if (globalThis.tagCache.mode === "nextMode") {
const paths = (await globalThis.tagCache.getPathsByTags?.(_tags)) ?? [];

await globalThis.tagCache.writeTags(_tags);
await writeTags(_tags);
if (paths.length > 0) {
// TODO: we should introduce a new method in cdnInvalidationHandler to invalidate paths by tags for cdn that supports it
// It also means that we'll need to provide the tags used in every request to the wrapper or converter.
Expand Down Expand Up @@ -378,7 +378,7 @@ export default class Cache {
}

// Update all keys with the given tag with revalidatedAt set to now
await globalThis.tagCache.writeTags(toInsert);
await writeTags(toInsert);

// We can now invalidate all paths in the CDN
// This only applies to `revalidateTag`, not to `res.revalidate()`
Expand Down Expand Up @@ -442,7 +442,7 @@ export default class Cache {
const storedTags = await globalThis.tagCache.getByPath(key);
const tagsToWrite = derivedTags.filter((tag) => !storedTags.includes(tag));
if (tagsToWrite.length > 0) {
await globalThis.tagCache.writeTags(
await writeTags(
tagsToWrite.map((tag) => ({
path: key,
tag: tag,
Expand Down
21 changes: 15 additions & 6 deletions packages/open-next/src/adapters/composable-cache.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";
import { writeTags } from "utils/cache";
import { fromReadableStream, toReadableStream } from "utils/stream";
import { debug } from "./logger";

const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>();

export default {
async get(cacheKey: string) {
try {
// We first check if we have a pending write for this cache key
// If we do, we return the pending promise instead of fetching the cache
if (pendingWritePromiseMap.has(cacheKey)) {
return pendingWritePromiseMap.get(cacheKey);
}
const result = await globalThis.incrementalCache.get(
cacheKey,
"composable",
Expand Down Expand Up @@ -48,7 +56,10 @@ export default {
},

async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {
const entry = await pendingEntry;
pendingWritePromiseMap.set(cacheKey, pendingEntry);
const entry = await pendingEntry.finally(() => {
pendingWritePromiseMap.delete(cacheKey);
});
const valueToStore = await fromReadableStream(entry.value);
await globalThis.incrementalCache.set(
cacheKey,
Expand All @@ -62,9 +73,7 @@ export default {
const storedTags = await globalThis.tagCache.getByPath(cacheKey);
const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag));
if (tagsToWrite.length > 0) {
await globalThis.tagCache.writeTags(
tagsToWrite.map((tag) => ({ tag, path: cacheKey })),
);
await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey })));
}
}
},
Expand All @@ -83,7 +92,7 @@ export default {
},
async expireTags(...tags: string[]) {
if (globalThis.tagCache.mode === "nextMode") {
return globalThis.tagCache.writeTags(tags);
return writeTags(tags);
}
const tagCache = globalThis.tagCache;
const revalidatedAt = Date.now();
Expand All @@ -104,7 +113,7 @@ export default {
for (const entry of pathsToUpdate.flat()) {
setToWrite.add(entry);
}
await globalThis.tagCache.writeTags(Array.from(setToWrite));
await writeTags(Array.from(setToWrite));
},

// This one is necessary for older versions of next
Expand Down
2 changes: 2 additions & 0 deletions packages/open-next/src/types/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ interface OpenNextRequestContext {
// Last modified time of the page (used in main functions, only available for ISR/SSG).
lastModified?: number;
waitUntil?: WaitUntil;
/** We use this to deduplicate write of the tags*/
writtenTags: Set<string>;
}

declare global {
Expand Down
10 changes: 7 additions & 3 deletions packages/open-next/src/types/overrides.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ export type NextModeTagCache = BaseTagCache & {
getPathsByTags?: (tags: string[]) => Promise<string[]>;
};

export interface OriginalTagCacheWriteInput {
tag: string;
path: string;
revalidatedAt?: number;
}

/**
* On get :
We just check for the cache key in the tag cache. If it has been revalidated we just return null, otherwise we continue
Expand All @@ -168,9 +174,7 @@ export type OriginalTagCache = BaseTagCache & {
getByTag(tag: string): Promise<string[]>;
getByPath(path: string): Promise<string[]>;
getLastModified(path: string, lastModified?: number): Promise<number>;
writeTags(
tags: { tag: string; path: string; revalidatedAt?: number }[],
): Promise<void>;
writeTags(tags: OriginalTagCacheWriteInput[]): Promise<void>;
};

export type TagCache = NextModeTagCache | OriginalTagCache;
Expand Down
42 changes: 41 additions & 1 deletion packages/open-next/src/utils/cache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import type { CacheValue, WithLastModified } from "types/overrides";
import type {
CacheValue,
OriginalTagCacheWriteInput,
WithLastModified,
} from "types/overrides";

export async function hasBeenRevalidated(
key: string,
Expand Down Expand Up @@ -39,3 +43,39 @@ export function getTagsFromValue(value?: CacheValue<"cache">) {
return [];
}
}

function getTagKey(tag: string | OriginalTagCacheWriteInput): string {
if (typeof tag === "string") {
return tag;
}
return JSON.stringify({
tag: tag.tag,
path: tag.path,
});
}

export async function writeTags(
tags: (string | OriginalTagCacheWriteInput)[],
): Promise<void> {
const store = globalThis.__openNextAls.getStore();
console.log("Writing tags", tags, store);
if (!store || globalThis.openNextConfig.dangerous?.disableTagCache) {
return;
}
const tagsToWrite = tags.filter((t) => {
const tagKey = getTagKey(t);
const shouldWrite = !store.writtenTags.has(tagKey);
// We preemptively add the tag to the writtenTags set
// to avoid writing the same tag multiple times in the same request
if (shouldWrite) {
store.writtenTags.add(tagKey);
}
return shouldWrite;
});
if (tagsToWrite.length === 0) {
return;
}

// Here we know that we have the correct type
await globalThis.tagCache.writeTags(tagsToWrite as any);
}
1 change: 1 addition & 0 deletions packages/open-next/src/utils/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export function runWithOpenNextRequestContext<T>(
pendingPromiseRunner: new DetachedPromiseRunner(),
isISRRevalidation,
waitUntil,
writtenTags: new Set<string>(),
},
async () => {
provideNextAfterProvider();
Expand Down
1 change: 1 addition & 0 deletions packages/tests-unit/tests/adapters/cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ describe("CacheHandler", () => {
resolve: vi.fn(),
}),
},
writtenTags: new Set(),
}),
};

Expand Down
Loading