From c76b2f61e538be003167fc1203e924655742c5ec Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Fri, 1 Aug 2025 10:03:05 +0200 Subject: [PATCH 1/4] fix(composable-cache): update pending write handling to avoid locked readable stream --- .../src/adapters/composable-cache.ts | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/packages/open-next/src/adapters/composable-cache.ts b/packages/open-next/src/adapters/composable-cache.ts index 0c4fecdc2..86ff43a01 100644 --- a/packages/open-next/src/adapters/composable-cache.ts +++ b/packages/open-next/src/adapters/composable-cache.ts @@ -1,9 +1,13 @@ import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache"; +import type { CacheValue } from "types/overrides"; import { writeTags } from "utils/cache"; import { fromReadableStream, toReadableStream } from "utils/stream"; import { debug } from "./logger"; -const pendingWritePromiseMap = new Map>(); +const pendingWritePromiseMap = new Map< + string, + Promise> +>(); export default { async get(cacheKey: string) { @@ -11,7 +15,11 @@ export default { // We first check if we have a pending write for this cache key // If we do, we return the pending promise instead of fetching the cache if (pendingWritePromiseMap.has(cacheKey)) { - return pendingWritePromiseMap.get(cacheKey); + const stored = pendingWritePromiseMap.get(cacheKey)!; + return stored.then((entry) => ({ + ...entry, + value: toReadableStream(entry.value), + })); } const result = await globalThis.incrementalCache.get( cacheKey, @@ -56,16 +64,20 @@ export default { }, async set(cacheKey: string, pendingEntry: Promise) { - pendingWritePromiseMap.set(cacheKey, pendingEntry); - const entry = await pendingEntry.finally(() => { + const promiseEntry = pendingEntry.then(async (entry) => ({ + ...entry, + value: await fromReadableStream(entry.value), + })); + pendingWritePromiseMap.set(cacheKey, promiseEntry); + + const entry = await promiseEntry.finally(() => { pendingWritePromiseMap.delete(cacheKey); }); - const valueToStore = await fromReadableStream(entry.value); await globalThis.incrementalCache.set( cacheKey, { ...entry, - value: valueToStore, + value: entry.value, }, "composable", ); From da7fc22e3ae29f437369beefc6a15eef412a518b Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Fri, 1 Aug 2025 10:03:54 +0200 Subject: [PATCH 2/4] changeset --- .changeset/wet-emus-brake.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/wet-emus-brake.md diff --git a/.changeset/wet-emus-brake.md b/.changeset/wet-emus-brake.md new file mode 100644 index 000000000..d0a60e921 --- /dev/null +++ b/.changeset/wet-emus-brake.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/aws": patch +--- + +fix locked readable stream in the composable cache From 95eef7f0f1ba160d71fcc21405419bce33854230 Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Sun, 3 Aug 2025 16:23:25 +0200 Subject: [PATCH 3/4] add unit test for the composable cache --- .../tests/adapters/composable-cache.test.ts | 523 ++++++++++++++++++ 1 file changed, 523 insertions(+) create mode 100644 packages/tests-unit/tests/adapters/composable-cache.test.ts diff --git a/packages/tests-unit/tests/adapters/composable-cache.test.ts b/packages/tests-unit/tests/adapters/composable-cache.test.ts new file mode 100644 index 000000000..d31755bf8 --- /dev/null +++ b/packages/tests-unit/tests/adapters/composable-cache.test.ts @@ -0,0 +1,523 @@ +import ComposableCache from "@opennextjs/aws/adapters/composable-cache"; +import { + fromReadableStream, + toReadableStream, +} from "@opennextjs/aws/utils/stream"; +import { vi } from "vitest"; + +describe("Composable cache handler", () => { + vi.useFakeTimers().setSystemTime("2024-01-02T00:00:00Z"); + + const incrementalCache = { + name: "mock", + get: vi.fn().mockResolvedValue({ + value: { + type: "route", + body: "{}", + tags: ["tag1", "tag2"], + stale: 0, + timestamp: Date.now(), + expire: Date.now() + 1000, + revalidate: 3600, + value: "test-value", + }, + lastModified: Date.now(), + }), + set: vi.fn(), + delete: vi.fn(), + }; + globalThis.incrementalCache = incrementalCache; + + const tagCache = { + name: "mock", + mode: "original" as string | undefined, + hasBeenRevalidated: vi.fn(), + getByTag: vi.fn().mockResolvedValue(["path1", "path2"]), + getByPath: vi.fn().mockResolvedValue(["tag1"]), + getLastModified: vi + .fn() + .mockResolvedValue(new Date("2024-01-02T00:00:00Z").getTime()), + getLastRevalidated: vi.fn().mockResolvedValue(0), + writeTags: vi.fn(), + }; + globalThis.tagCache = tagCache; + + const invalidateCdnHandler = { + name: "mock", + invalidatePaths: vi.fn(), + }; + globalThis.cdnInvalidationHandler = invalidateCdnHandler; + let writtenTags = new Set(); + + globalThis.__openNextAls = { + getStore: () => ({ + pendingPromiseRunner: { + withResolvers: vi.fn().mockReturnValue({ + resolve: vi.fn(), + }), + }, + writtenTags, + }), + }; + + beforeEach(() => { + vi.clearAllMocks(); + + globalThis.openNextConfig = { + dangerous: { + disableIncrementalCache: false, + disableTagCache: false, + }, + }; + }); + + describe("get", () => { + it("should return cached entry when available and not revalidated", async () => { + const result = await ComposableCache.get("test-key"); + + expect(incrementalCache.get).toHaveBeenCalledWith( + "test-key", + "composable", + ); + expect(result).toBeDefined(); + expect(result?.tags).toEqual(["tag1", "tag2"]); + expect(result?.value).toBeInstanceOf(ReadableStream); + }); + + it("should return undefined when cache entry does not exist", async () => { + incrementalCache.get.mockResolvedValueOnce(null); + + const result = await ComposableCache.get("non-existent-key"); + + expect(result).toBeUndefined(); + }); + + it("should return undefined when cache entry has no value", async () => { + incrementalCache.get.mockResolvedValueOnce({ + value: null, + lastModified: Date.now(), + }); + + const result = await ComposableCache.get("test-key"); + + expect(result).toBeUndefined(); + }); + + it("should check tag revalidation in nextMode", async () => { + tagCache.mode = "nextMode"; + tagCache.hasBeenRevalidated.mockResolvedValueOnce(false); + + const result = await ComposableCache.get("test-key"); + + expect(tagCache.hasBeenRevalidated).toHaveBeenCalledWith( + ["tag1", "tag2"], + expect.any(Number), + ); + expect(result).toBeDefined(); + }); + + it("should return undefined when tags have been revalidated in nextMode", async () => { + tagCache.mode = "nextMode"; + tagCache.hasBeenRevalidated.mockResolvedValueOnce(true); + + const result = await ComposableCache.get("test-key"); + + expect(result).toBeUndefined(); + }); + + it("should skip tag check when tags array is empty in nextMode", async () => { + tagCache.mode = "nextMode"; + incrementalCache.get.mockResolvedValueOnce({ + value: { + type: "route", + body: "{}", + tags: [], + value: "test-value", + }, + lastModified: Date.now(), + }); + + const result = await ComposableCache.get("test-key"); + + expect(tagCache.hasBeenRevalidated).not.toHaveBeenCalled(); + expect(result).toBeDefined(); + }); + + it("should check last modified in original mode", async () => { + tagCache.mode = "original"; + tagCache.getLastModified.mockResolvedValueOnce(Date.now()); + + const result = await ComposableCache.get("test-key"); + + expect(tagCache.getLastModified).toHaveBeenCalledWith( + "test-key", + expect.any(Number), + ); + expect(result).toBeDefined(); + }); + + it("should return undefined when entry has been revalidated in original mode", async () => { + tagCache.mode = "original"; + tagCache.getLastModified.mockResolvedValueOnce(-1); + + const result = await ComposableCache.get("test-key"); + + expect(result).toBeUndefined(); + }); + + it("should handle undefined tag cache mode", async () => { + tagCache.mode = undefined; + tagCache.getLastModified.mockResolvedValueOnce(Date.now()); + + const result = await ComposableCache.get("test-key"); + + expect(tagCache.getLastModified).toHaveBeenCalled(); + expect(result).toBeDefined(); + }); + + it("should return undefined on cache read error", async () => { + incrementalCache.get.mockRejectedValueOnce(new Error("Cache error")); + + const result = await ComposableCache.get("test-key"); + + expect(result).toBeUndefined(); + }); + + it("should return pending write promise if available", async () => { + const pendingEntry = Promise.resolve({ + value: toReadableStream("pending-value"), + tags: ["tag1"], + stale: 0, + timestamp: Date.now(), + expire: Date.now() + 1000, + revalidate: 3600, + }); + + // Start a set operation to create a pending write + const setPromise = ComposableCache.set("pending-key", pendingEntry); + + // Try to get the same key while set is in progress + const result = await ComposableCache.get("pending-key"); + + expect(result).toBeDefined(); + expect(result?.value).toBeInstanceOf(ReadableStream); + + // Wait for set to complete + await setPromise; + }); + }); + + describe("set", () => { + beforeEach(() => { + writtenTags = new Set(); + }); + + it("should set cache entry and handle tags in original mode", async () => { + tagCache.mode = "original"; + const entry = { + value: toReadableStream("test-value"), + tags: ["tag1", "tag2"], + stale: 0, + timestamp: Date.now(), + expire: Date.now() + 1000, + revalidate: 3600, + }; + + await ComposableCache.set("test-key", Promise.resolve(entry)); + + expect(incrementalCache.set).toHaveBeenCalledWith( + "test-key", + expect.objectContaining({ + tags: ["tag1", "tag2"], + value: "test-value", + }), + "composable", + ); + expect(tagCache.getByPath).toHaveBeenCalledWith("test-key"); + }); + + it("should write new tags not already stored", async () => { + tagCache.mode = "original"; + tagCache.getByPath.mockResolvedValueOnce(["tag1"]); + + const entry = { + value: toReadableStream("test-value"), + tags: ["tag1", "tag2", "tag3"], + stale: 0, + timestamp: Date.now(), + expire: Date.now() + 1000, + revalidate: 3600, + }; + + await ComposableCache.set("test-key", Promise.resolve(entry)); + + expect(tagCache.writeTags).toHaveBeenCalledWith([ + { tag: "tag2", path: "test-key" }, + { tag: "tag3", path: "test-key" }, + ]); + }); + + it("should not write tags if all are already stored", async () => { + tagCache.mode = "original"; + tagCache.getByPath.mockResolvedValueOnce(["tag1", "tag2"]); + + const entry = { + value: toReadableStream("test-value"), + tags: ["tag1", "tag2"], + stale: 0, + timestamp: Date.now(), + expire: Date.now() + 1000, + revalidate: 3600, + }; + + await ComposableCache.set("test-key", Promise.resolve(entry)); + + expect(tagCache.writeTags).not.toHaveBeenCalled(); + }); + + it("should skip tag handling in nextMode", async () => { + tagCache.mode = "nextMode"; + + const entry = { + value: toReadableStream("test-value"), + tags: ["tag1", "tag2"], + stale: 0, + timestamp: Date.now(), + expire: Date.now() + 1000, + revalidate: 3600, + }; + + await ComposableCache.set("test-key", Promise.resolve(entry)); + + expect(tagCache.getByPath).not.toHaveBeenCalled(); + expect(tagCache.writeTags).not.toHaveBeenCalled(); + }); + + it("should convert ReadableStream to string", async () => { + const entry = { + value: toReadableStream("test-content"), + tags: ["tag1"], + stale: 0, + timestamp: Date.now(), + expire: Date.now() + 1000, + revalidate: 3600, + }; + + await ComposableCache.set("test-key", Promise.resolve(entry)); + + expect(incrementalCache.set).toHaveBeenCalledWith( + "test-key", + expect.objectContaining({ + value: "test-content", + }), + "composable", + ); + }); + }); + + describe("refreshTags", () => { + it("should do nothing", async () => { + await ComposableCache.refreshTags(); + + // Should not call any methods + expect(incrementalCache.get).not.toHaveBeenCalled(); + expect(incrementalCache.set).not.toHaveBeenCalled(); + expect(tagCache.writeTags).not.toHaveBeenCalled(); + }); + }); + + describe("getExpiration", () => { + it("should return last revalidated time in nextMode", async () => { + tagCache.mode = "nextMode"; + tagCache.getLastRevalidated.mockResolvedValueOnce(123456); + + const result = await ComposableCache.getExpiration("tag1", "tag2"); + + expect(tagCache.getLastRevalidated).toHaveBeenCalledWith([ + "tag1", + "tag2", + ]); + expect(result).toBe(123456); + }); + + it("should return 0 in original mode", async () => { + tagCache.mode = "original"; + + const result = await ComposableCache.getExpiration("tag1", "tag2"); + + expect(result).toBe(0); + }); + + it("should return 0 when mode is undefined", async () => { + tagCache.mode = undefined; + + const result = await ComposableCache.getExpiration("tag1", "tag2"); + + expect(result).toBe(0); + }); + }); + + describe("expireTags", () => { + beforeEach(() => { + writtenTags = new Set(); + }); + it("should write tags directly in nextMode", async () => { + tagCache.mode = "nextMode"; + + await ComposableCache.expireTags("tag1", "tag2"); + + expect(tagCache.writeTags).toHaveBeenCalledWith(["tag1", "tag2"]); + }); + + it("should find paths and write tag mappings in original mode", async () => { + tagCache.mode = "original"; + tagCache.getByTag.mockImplementation(async (tag) => { + if (tag === "tag1") return ["path1", "path2"]; + if (tag === "tag2") return ["path2", "path3"]; + return []; + }); + + await ComposableCache.expireTags("tag1", "tag2"); + + expect(tagCache.getByTag).toHaveBeenCalledWith("tag1"); + expect(tagCache.getByTag).toHaveBeenCalledWith("tag2"); + expect(tagCache.writeTags).toHaveBeenCalledWith( + expect.arrayContaining([ + { path: "path1", tag: "tag1", revalidatedAt: expect.any(Number) }, + { path: "path2", tag: "tag1", revalidatedAt: expect.any(Number) }, + { path: "path2", tag: "tag2", revalidatedAt: expect.any(Number) }, + { path: "path3", tag: "tag2", revalidatedAt: expect.any(Number) }, + ]), + ); + }); + + it("should deduplicate paths in original mode", async () => { + tagCache.mode = "original"; + tagCache.getByTag.mockImplementation(async (tag) => { + if (tag === "tag1") return ["path1", "path2"]; + if (tag === "tag2") return ["path1", "path2"]; + return []; + }); + + await ComposableCache.expireTags("tag1", "tag2"); + + const writtenTags = tagCache.writeTags.mock.calls[0][0]; + expect(writtenTags).toHaveLength(4); // 2 paths × 2 tags = 4 unique combinations + expect(writtenTags).toEqual( + expect.arrayContaining([ + { path: "path1", tag: "tag1", revalidatedAt: expect.any(Number) }, + { path: "path2", tag: "tag1", revalidatedAt: expect.any(Number) }, + { path: "path1", tag: "tag2", revalidatedAt: expect.any(Number) }, + { path: "path2", tag: "tag2", revalidatedAt: expect.any(Number) }, + ]), + ); + }); + + it("should handle empty paths in original mode", async () => { + tagCache.mode = "original"; + tagCache.getByTag.mockResolvedValue([]); + + await ComposableCache.expireTags("tag1"); + + expect(tagCache.writeTags).not.toHaveBeenCalled(); + }); + }); + + describe("receiveExpiredTags", () => { + it("should do nothing", async () => { + await ComposableCache.receiveExpiredTags("tag1", "tag2"); + + // Should not call any methods + expect(incrementalCache.get).not.toHaveBeenCalled(); + expect(incrementalCache.set).not.toHaveBeenCalled(); + expect(tagCache.writeTags).not.toHaveBeenCalled(); + }); + }); + + describe("integration tests", () => { + it("should handle complete cache lifecycle", async () => { + // Set a cache entry + const entry = { + value: toReadableStream("integration-test"), + tags: ["integration-tag"], + stale: 0, + timestamp: Date.now(), + expire: Date.now() + 1000, + revalidate: 3600, + }; + + await ComposableCache.set("integration-key", Promise.resolve(entry)); + + // Verify it was stored + expect(incrementalCache.set).toHaveBeenCalledWith( + "integration-key", + expect.objectContaining({ + value: "integration-test", + tags: ["integration-tag"], + }), + "composable", + ); + + // Mock the get response + incrementalCache.get.mockResolvedValueOnce({ + value: { + ...entry, + value: "integration-test", + }, + lastModified: Date.now(), + }); + + // Get the cache entry + const result = await ComposableCache.get("integration-key"); + + expect(result).toBeDefined(); + expect(result?.tags).toEqual(["integration-tag"]); + + // Convert the stream back to verify content + const content = await fromReadableStream(result!.value); + expect(content).toBe("integration-test"); + }); + + it("should handle concurrent get/set operations", async () => { + const entry1 = { + value: toReadableStream("concurrent-1"), + tags: ["tag1"], + stale: 0, + timestamp: Date.now(), + expire: Date.now() + 1000, + revalidate: 3600, + }; + + const entry2 = { + value: toReadableStream("concurrent-2"), + tags: ["tag2"], + stale: 0, + timestamp: Date.now(), + expire: Date.now() + 1000, + revalidate: 3600, + }; + + // Start multiple operations concurrently + const promises = [ + ComposableCache.set("key1", Promise.resolve(entry1)), + ComposableCache.set("key2", Promise.resolve(entry2)), + ComposableCache.get("key1"), + ComposableCache.get("key2"), + ]; + + const results = await Promise.all(promises); + + expect(incrementalCache.set).toHaveBeenCalledTimes(2); + expect(incrementalCache.get).not.toHaveBeenCalled(); + + expect(results[2]).toBeDefined(); + expect(results[3]).toBeDefined(); + + const content1 = await fromReadableStream(results[2]!.value); + expect(content1).toBe("concurrent-1"); + + const content2 = await fromReadableStream(results[3]!.value); + expect(content2).toBe("concurrent-2"); + }); + }); +}); From 6a34a21b8edd7f21619a89b2924b1722469e2613 Mon Sep 17 00:00:00 2001 From: Nicolas Dorseuil Date: Sun, 3 Aug 2025 19:40:22 +0200 Subject: [PATCH 4/4] review --- packages/open-next/src/adapters/composable-cache.ts | 12 +++++++----- .../tests/adapters/composable-cache.test.ts | 6 +++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/packages/open-next/src/adapters/composable-cache.ts b/packages/open-next/src/adapters/composable-cache.ts index 86ff43a01..03dec9efd 100644 --- a/packages/open-next/src/adapters/composable-cache.ts +++ b/packages/open-next/src/adapters/composable-cache.ts @@ -15,11 +15,13 @@ export default { // We first check if we have a pending write for this cache key // If we do, we return the pending promise instead of fetching the cache if (pendingWritePromiseMap.has(cacheKey)) { - const stored = pendingWritePromiseMap.get(cacheKey)!; - return stored.then((entry) => ({ - ...entry, - value: toReadableStream(entry.value), - })); + const stored = pendingWritePromiseMap.get(cacheKey); + if (stored) { + return stored.then((entry) => ({ + ...entry, + value: toReadableStream(entry.value), + })); + } } const result = await globalThis.incrementalCache.get( cacheKey, diff --git a/packages/tests-unit/tests/adapters/composable-cache.test.ts b/packages/tests-unit/tests/adapters/composable-cache.test.ts index d31755bf8..7d110ce0c 100644 --- a/packages/tests-unit/tests/adapters/composable-cache.test.ts +++ b/packages/tests-unit/tests/adapters/composable-cache.test.ts @@ -47,7 +47,7 @@ describe("Composable cache handler", () => { invalidatePaths: vi.fn(), }; globalThis.cdnInvalidationHandler = invalidateCdnHandler; - let writtenTags = new Set(); + const writtenTags = new Set(); globalThis.__openNextAls = { getStore: () => ({ @@ -209,7 +209,7 @@ describe("Composable cache handler", () => { describe("set", () => { beforeEach(() => { - writtenTags = new Set(); + writtenTags.clear(); }); it("should set cache entry and handle tags in original mode", async () => { @@ -359,7 +359,7 @@ describe("Composable cache handler", () => { describe("expireTags", () => { beforeEach(() => { - writtenTags = new Set(); + writtenTags.clear(); }); it("should write tags directly in nextMode", async () => { tagCache.mode = "nextMode";