Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/wet-emus-brake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/aws": patch
---

fix locked readable stream in the composable cache
24 changes: 18 additions & 6 deletions packages/open-next/src/adapters/composable-cache.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";
import type { CacheValue } from "types/overrides";
import { writeTags } from "utils/cache";
import { fromReadableStream, toReadableStream } from "utils/stream";
import { debug } from "./logger";

const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>();
const pendingWritePromiseMap = new Map<
string,
Promise<CacheValue<"composable">>
>();

export default {
async get(cacheKey: string) {
try {
// We first check if we have a pending write for this cache key
// If we do, we return the pending promise instead of fetching the cache
if (pendingWritePromiseMap.has(cacheKey)) {
return pendingWritePromiseMap.get(cacheKey);
const stored = pendingWritePromiseMap.get(cacheKey)!;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a scenario where set doesn't get called and this returns undefined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of any, but I can add a check just in case. And it will stop typescript complaining

return stored.then((entry) => ({
...entry,
value: toReadableStream(entry.value),
}));
}
const result = await globalThis.incrementalCache.get(
cacheKey,
Expand Down Expand Up @@ -56,16 +64,20 @@ export default {
},

async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {
pendingWritePromiseMap.set(cacheKey, pendingEntry);
const entry = await pendingEntry.finally(() => {
const promiseEntry = pendingEntry.then(async (entry) => ({
...entry,
value: await fromReadableStream(entry.value),
}));
pendingWritePromiseMap.set(cacheKey, promiseEntry);

const entry = await promiseEntry.finally(() => {
pendingWritePromiseMap.delete(cacheKey);
});
const valueToStore = await fromReadableStream(entry.value);
await globalThis.incrementalCache.set(
cacheKey,
{
...entry,
value: valueToStore,
value: entry.value,
},
"composable",
);
Expand Down
Loading