Skip to content

Commit 521346b

Browse files
conico974Nicolas Dorseuil
andauthored
Fix locked readable stream in composable cache (opennextjs#939)
* fix(composable-cache): update pending write handling to avoid locked readable stream * changeset * add unit test for the composable cache * review --------- Co-authored-by: Nicolas Dorseuil <[email protected]>
1 parent b9635d0 commit 521346b

File tree

3 files changed

+548
-6
lines changed

3 files changed

+548
-6
lines changed

.changeset/wet-emus-brake.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@opennextjs/aws": patch
3+
---
4+
5+
fix locked readable stream in the composable cache

packages/open-next/src/adapters/composable-cache.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
11
import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";
2+
import type { CacheValue } from "types/overrides";
23
import { writeTags } from "utils/cache";
34
import { fromReadableStream, toReadableStream } from "utils/stream";
45
import { debug } from "./logger";
56

6-
const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>();
7+
const pendingWritePromiseMap = new Map<
8+
string,
9+
Promise<CacheValue<"composable">>
10+
>();
711

812
export default {
913
async get(cacheKey: string) {
1014
try {
1115
// We first check if we have a pending write for this cache key
1216
// If we do, we return the pending promise instead of fetching the cache
1317
if (pendingWritePromiseMap.has(cacheKey)) {
14-
return pendingWritePromiseMap.get(cacheKey);
18+
const stored = pendingWritePromiseMap.get(cacheKey);
19+
if (stored) {
20+
return stored.then((entry) => ({
21+
...entry,
22+
value: toReadableStream(entry.value),
23+
}));
24+
}
1525
}
1626
const result = await globalThis.incrementalCache.get(
1727
cacheKey,
@@ -59,16 +69,20 @@ export default {
5969
},
6070

6171
async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {
62-
pendingWritePromiseMap.set(cacheKey, pendingEntry);
63-
const entry = await pendingEntry.finally(() => {
72+
const promiseEntry = pendingEntry.then(async (entry) => ({
73+
...entry,
74+
value: await fromReadableStream(entry.value),
75+
}));
76+
pendingWritePromiseMap.set(cacheKey, promiseEntry);
77+
78+
const entry = await promiseEntry.finally(() => {
6479
pendingWritePromiseMap.delete(cacheKey);
6580
});
66-
const valueToStore = await fromReadableStream(entry.value);
6781
await globalThis.incrementalCache.set(
6882
cacheKey,
6983
{
7084
...entry,
71-
value: valueToStore,
85+
value: entry.value,
7286
},
7387
"composable",
7488
);

0 commit comments

Comments
 (0)