diff --git a/examples/e2e/app-router/open-next.config.ts b/examples/e2e/app-router/open-next.config.ts index 00db5428..fe53a284 100644 --- a/examples/e2e/app-router/open-next.config.ts +++ b/examples/e2e/app-router/open-next.config.ts @@ -1,10 +1,10 @@ import { defineCloudflareConfig } from "@opennextjs/cloudflare"; import d1TagCache from "@opennextjs/cloudflare/d1-tag-cache"; import kvIncrementalCache from "@opennextjs/cloudflare/kv-cache"; -import memoryQueue from "@opennextjs/cloudflare/memory-queue"; +import doQueue from "@opennextjs/cloudflare/durable-queue"; export default defineCloudflareConfig({ incrementalCache: kvIncrementalCache, tagCache: d1TagCache, - queue: memoryQueue, + queue: doQueue, }); diff --git a/examples/e2e/app-router/wrangler.jsonc b/examples/e2e/app-router/wrangler.jsonc index 25be2dde..8440d8c5 100644 --- a/examples/e2e/app-router/wrangler.jsonc +++ b/examples/e2e/app-router/wrangler.jsonc @@ -8,6 +8,20 @@ "directory": ".open-next/assets", "binding": "ASSETS" }, + "durable_objects": { + "bindings": [ + { + "name": "NEXT_CACHE_REVALIDATION_DURABLE_OBJECT", + "class_name": "DurableObjectQueueHandler" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_classes": ["DurableObjectQueueHandler"] + } + ], "kv_namespaces": [ { "binding": "NEXT_CACHE_WORKERS_KV", diff --git a/packages/cloudflare/package.json b/packages/cloudflare/package.json index a6bc9ded..615addcf 100644 --- a/packages/cloudflare/package.json +++ b/packages/cloudflare/package.json @@ -73,7 +73,7 @@ "dependencies": { "@ast-grep/napi": "^0.36.1", "@dotenvx/dotenvx": "catalog:", - "@opennextjs/aws": "https://pkg.pr.new/@opennextjs/aws@7e23eee", + "@opennextjs/aws": "https://pkg.pr.new/@opennextjs/aws@773", "enquirer": "^2.4.1", "glob": "catalog:", "yaml": "^2.7.0" diff --git a/packages/cloudflare/src/api/cloudflare-context.ts b/packages/cloudflare/src/api/cloudflare-context.ts index f13a6a5b..51aa2960 100644 --- a/packages/cloudflare/src/api/cloudflare-context.ts +++ b/packages/cloudflare/src/api/cloudflare-context.ts @@ -1,12 +1,22 @@ import type { Context, RunningCodeOptions } from "node:vm"; +import type { DurableObjectQueueHandler } from "./durable-objects/queue"; + declare global { interface CloudflareEnv { + // KV used for the incremental cache NEXT_CACHE_WORKERS_KV?: KVNamespace; + // D1 db used for the tag cache NEXT_CACHE_D1?: D1Database; + // D1 table to use for the tag cache for the tag/path mapping NEXT_CACHE_D1_TAGS_TABLE?: string; + // D1 table to use for the tag cache for storing the tag and their associated revalidation times NEXT_CACHE_D1_REVALIDATIONS_TABLE?: string; + // Service binding for the worker itself to be able to call itself from within the worker NEXT_CACHE_REVALIDATION_WORKER?: Service; + // Durable Object namespace to use for the durable object queue handler + NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace; + // Asset binding ASSETS?: Fetcher; } } diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts new file mode 100644 index 00000000..e877d2f9 --- /dev/null +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -0,0 +1,312 @@ +import { describe, expect, it, vi } from "vitest"; + +import { DurableObjectQueueHandler } from "./queue"; + +vi.mock("cloudflare:workers", () => ({ + DurableObject: class { + constructor( + public ctx: DurableObjectState, + public env: CloudflareEnv + ) {} + }, +})); + +const createDurableObjectQueue = ({ + fetchDuration, + statusCode, + headers, +}: { + fetchDuration: number; + statusCode?: number; + headers?: Headers; +}) => { + const mockState = { + waitUntil: vi.fn(), + blockConcurrencyWhile: vi.fn().mockImplementation(async (fn) => fn()), + storage: { + setAlarm: vi.fn(), + getAlarm: vi.fn(), + }, + }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return new DurableObjectQueueHandler(mockState as any, { + NEXT_CACHE_REVALIDATION_WORKER: { + fetch: vi.fn().mockReturnValue( + new Promise((res) => + setTimeout( + () => + res( + new Response(null, { + status: statusCode, + headers: headers ?? new Headers([["x-nextjs-cache", "REVALIDATED"]]), + }) + ), + fetchDuration + ) + ) + ), + connect: vi.fn(), + }, + }); +}; + +const createMessage = (dedupId: string, lastModified = Date.now()) => ({ + MessageBody: { host: "test.local", url: "/test", eTag: "test", lastModified }, + MessageGroupId: "test.local/test", + MessageDeduplicationId: dedupId, + previewModeId: "test", +}); + +describe("DurableObjectQueue", () => { + describe("successful revalidation", () => { + it("should process a single revalidation", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + const firstRequest = await queue.revalidate(createMessage("id")); + expect(firstRequest).toBeUndefined(); + expect(queue.ongoingRevalidations.size).toBe(1); + expect(queue.ongoingRevalidations.has("id")).toBe(true); + + await queue.ongoingRevalidations.get("id"); + + expect(queue.ongoingRevalidations.size).toBe(0); + expect(queue.ongoingRevalidations.has("id")).toBe(false); + expect(queue.service.fetch).toHaveBeenCalledWith("https://test.local/test", { + method: "HEAD", + headers: { + "x-prerender-revalidate": "test", + "x-isr": "1", + }, + signal: expect.any(AbortSignal), + }); + }); + + it("should dedupe revalidations", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + await queue.revalidate(createMessage("id")); + await queue.revalidate(createMessage("id")); + expect(queue.ongoingRevalidations.size).toBe(1); + expect(queue.ongoingRevalidations.has("id")).toBe(true); + }); + + it("should block concurrency", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + await queue.revalidate(createMessage("id")); + await queue.revalidate(createMessage("id2")); + await queue.revalidate(createMessage("id3")); + await queue.revalidate(createMessage("id4")); + await queue.revalidate(createMessage("id5")); + // the next one should block until one of the previous ones finishes + const blockedReq = queue.revalidate(createMessage("id6")); + + expect(queue.ongoingRevalidations.size).toBe(queue.maxRevalidations); + expect(queue.ongoingRevalidations.has("id6")).toBe(false); + expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]); + + // @ts-expect-error + expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1); + + // Here we await the blocked request to ensure it's resolved + await blockedReq; + // We then need to await for the actual revalidation to finish + await Promise.all(Array.from(queue.ongoingRevalidations.values())); + expect(queue.ongoingRevalidations.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(6); + }); + }); + + describe("failed revalidation", () => { + it("should not put it in failed state for an incorrect 200", async () => { + const queue = createDurableObjectQueue({ + fetchDuration: 10, + statusCode: 200, + headers: new Headers([["x-nextjs-cache", "MISS"]]), + }); + await queue.revalidate(createMessage("id")); + + await queue.ongoingRevalidations.get("id"); + + expect(queue.routeInFailedState.size).toBe(0); + }); + + it("should not put it in failed state for a failed revalidation with 404", async () => { + const queue = createDurableObjectQueue({ + fetchDuration: 10, + statusCode: 404, + }); + await queue.revalidate(createMessage("id")); + + await queue.ongoingRevalidations.get("id"); + + expect(queue.routeInFailedState.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + + await queue.revalidate(createMessage("id")); + + expect(queue.routeInFailedState.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(2); + }); + + it("should put it in failed state if revalidation fails with 500", async () => { + const queue = createDurableObjectQueue({ + fetchDuration: 10, + statusCode: 500, + }); + await queue.revalidate(createMessage("id")); + + await queue.ongoingRevalidations.get("id"); + + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.routeInFailedState.has("id")).toBe(true); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + + await queue.revalidate(createMessage("id")); + + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + }); + + it("should put it in failed state if revalidation fetch throw", async () => { + const queue = createDurableObjectQueue({ + fetchDuration: 10, + }); + // @ts-expect-error - This is mocked above + queue.service.fetch.mockImplementationOnce(() => Promise.reject(new Error("fetch error"))); + await queue.revalidate(createMessage("id")); + + await queue.ongoingRevalidations.get("id"); + + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.routeInFailedState.has("id")).toBe(true); + expect(queue.ongoingRevalidations.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + + await queue.revalidate(createMessage("id")); + + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + }); + }); + + describe("addAlarm", () => { + const getStorage = (queue: DurableObjectQueueHandler): DurableObjectStorage => { + // @ts-expect-error - ctx is a protected field + return queue.ctx.storage; + }; + + it("should not add an alarm if there are no failed states", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + await queue.addAlarm(); + expect(getStorage(queue).setAlarm).not.toHaveBeenCalled(); + }); + + it("should add an alarm if there are failed states", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 }); + await queue.addAlarm(); + expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000); + }); + + it("should not add an alarm if there is already an alarm set", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 }); + // @ts-expect-error + queue.ctx.storage.getAlarm.mockResolvedValueOnce(1000); + await queue.addAlarm(); + expect(getStorage(queue).setAlarm).not.toHaveBeenCalled(); + }); + + it("should set the alarm to the lowest nextAlarm", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 }); + queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarmMs: 500 }); + await queue.addAlarm(); + expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500); + }); + }); + + describe("addToFailedState", () => { + it("should add a failed state", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + await queue.addToFailedState(createMessage("id")); + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.routeInFailedState.has("id")).toBe(true); + expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1); + }); + + it("should add a failed state with the correct nextAlarm", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + await queue.addToFailedState(createMessage("id")); + expect(queue.routeInFailedState.get("id")?.nextAlarmMs).toBeGreaterThan(Date.now()); + expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1); + }); + + it("should add a failed state with the correct nextAlarm for a retry", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + await queue.addToFailedState(createMessage("id")); + await queue.addToFailedState(createMessage("id")); + expect(queue.routeInFailedState.get("id")?.nextAlarmMs).toBeGreaterThan(Date.now()); + expect(queue.routeInFailedState.get("id")?.retryCount).toBe(2); + }); + + it("should not add a failed state if it has been retried 6 times", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 6, nextAlarmMs: 1000 }); + await queue.addToFailedState(createMessage("id")); + expect(queue.routeInFailedState.size).toBe(0); + }); + }); + + describe("alarm", () => { + it("should execute revalidations for expired events", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { + msg: createMessage("id"), + retryCount: 0, + nextAlarmMs: Date.now() - 1000, + }); + queue.routeInFailedState.set("id2", { + msg: createMessage("id2"), + retryCount: 0, + nextAlarmMs: Date.now() - 1000, + }); + await queue.alarm(); + expect(queue.routeInFailedState.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(2); + }); + + it("should execute revalidations for the next event to retry", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { + msg: createMessage("id"), + retryCount: 0, + nextAlarmMs: Date.now() + 1000, + }); + queue.routeInFailedState.set("id2", { + msg: createMessage("id2"), + retryCount: 0, + nextAlarmMs: Date.now() + 500, + }); + await queue.alarm(); + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + expect(queue.routeInFailedState.has("id2")).toBe(false); + }); + + it("should execute revalidations for the next event to retry and expired events", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { + msg: createMessage("id"), + retryCount: 0, + nextAlarmMs: Date.now() + 1000, + }); + queue.routeInFailedState.set("id2", { + msg: createMessage("id2"), + retryCount: 0, + nextAlarmMs: Date.now() - 1000, + }); + await queue.alarm(); + expect(queue.routeInFailedState.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(2); + }); + }); +}); diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts new file mode 100644 index 00000000..ca6bb961 --- /dev/null +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -0,0 +1,172 @@ +import { error } from "@opennextjs/aws/adapters/logger.js"; +import type { QueueMessage } from "@opennextjs/aws/types/overrides"; +import { + FatalError, + IgnorableError, + isOpenNextError, + RecoverableError, +} from "@opennextjs/aws/utils/error.js"; +import { DurableObject } from "cloudflare:workers"; + +const MAX_REVALIDATION_BY_DURABLE_OBJECT = 5; +const DEFAULT_REVALIDATION_TIMEOUT_MS = 10_000; + +interface ExtendedQueueMessage extends QueueMessage { + previewModeId: string; +} + +export class DurableObjectQueueHandler extends DurableObject { + // Ongoing revalidations are deduped by the deduplication id + // Since this is running in waitUntil, we expect the durable object state to persist this during the duration of the revalidation + // TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top) + ongoingRevalidations = new Map>(); + + // TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage + routeInFailedState = new Map< + string, + { msg: ExtendedQueueMessage; retryCount: number; nextAlarmMs: number } + >(); + + service: NonNullable; + + // TODO: allow this to be configurable - How do we want todo that? env variable? passed down from the queue override ? + maxRevalidations = MAX_REVALIDATION_BY_DURABLE_OBJECT; + + constructor(ctx: DurableObjectState, env: CloudflareEnv) { + super(ctx, env); + this.service = env.NEXT_CACHE_REVALIDATION_WORKER!; + // If there is no service binding, we throw an error because we can't revalidate without it + if (!this.service) throw new IgnorableError("No service binding for cache revalidation worker"); + } + + async revalidate(msg: ExtendedQueueMessage) { + // If there is already an ongoing revalidation, we don't need to revalidate again + if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return; + + // The route is already in a failed state, it will be retried later + if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return; + + if (this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) { + const ongoingRevalidations = this.ongoingRevalidations.values(); + // When there is more than the max revalidations, we block concurrency until one of the revalidations finishes + // We still await the promise to ensure the revalidation is completed + // This is fine because the queue itself run inside a waitUntil + await this.ctx.blockConcurrencyWhile(() => Promise.race(ongoingRevalidations)); + } + + const revalidationPromise = this.executeRevalidation(msg); + + // We store the promise to dedupe the revalidation + this.ongoingRevalidations.set(msg.MessageDeduplicationId, revalidationPromise); + + // TODO: check if the object stays up during waitUntil so that the internal state is maintained + this.ctx.waitUntil(revalidationPromise); + } + + private async executeRevalidation(msg: ExtendedQueueMessage) { + try { + const { + MessageBody: { host, url }, + previewModeId, + } = msg; + const protocol = host.includes("localhost") ? "http" : "https"; + + //TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc) + const response = await this.service.fetch(`${protocol}://${host}${url}`, { + method: "HEAD", + headers: { + "x-prerender-revalidate": previewModeId, + "x-isr": "1", + }, + signal: AbortSignal.timeout(DEFAULT_REVALIDATION_TIMEOUT_MS), + }); + // Now we need to handle errors from the fetch + if (response.status === 200 && response.headers.get("x-nextjs-cache") !== "REVALIDATED") { + // Something is very wrong here, it means that either the page is not ISR/SSG (and we shouldn't be here) or the `x-prerender-revalidate` header is not correct (and it should not happen either) + throw new FatalError( + `The revalidation for ${host}${url} cannot be done. This error should never happen.` + ); + } else if (response.status === 404) { + // The page is not found, we should not revalidate it + throw new IgnorableError( + `The revalidation for ${host}${url} cannot be done because the page is not found. It's either expected or an error in user code itself` + ); + } else if (response.status === 500) { + // A server error occurred, we should retry + + await this.addToFailedState(msg); + + throw new IgnorableError(`Something went wrong while revalidating ${host}${url}`); + } else if (response.status !== 200) { + // TODO: check if we need to handle cloudflare specific status codes/errors + // An unknown error occurred, most likely from something in user code like missing auth in the middleware + throw new RecoverableError(`An unknown error occurred while revalidating ${host}${url}`); + } + } catch (e) { + // Do we want to propagate the error to the calling worker? + if (!isOpenNextError(e)) { + await this.addToFailedState(msg); + } + error(e); + } finally { + this.ongoingRevalidations.delete(msg.MessageDeduplicationId); + } + } + + override async alarm() { + const currentDateTime = Date.now(); + // We fetch the first event that needs to be retried or if the date is expired + const nextEventToRetry = Array.from(this.routeInFailedState.values()) + .filter(({ nextAlarmMs }) => nextAlarmMs > currentDateTime) + .sort(({ nextAlarmMs: a }, { nextAlarmMs: b }) => a - b)[0]; + // We also have to check if there are expired events, if the revalidation takes too long, or if the + const expiredEvents = Array.from(this.routeInFailedState.values()).filter( + ({ nextAlarmMs }) => nextAlarmMs <= currentDateTime + ); + const allEventsToRetry = nextEventToRetry ? [nextEventToRetry, ...expiredEvents] : expiredEvents; + for (const event of allEventsToRetry) { + await this.executeRevalidation(event.msg); + this.routeInFailedState.delete(event.msg.MessageDeduplicationId); + } + } + + async addToFailedState(msg: ExtendedQueueMessage) { + const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId); + + if (existingFailedState) { + if (existingFailedState.retryCount >= 6) { + // We give up after 6 retries and log the error + error( + `The revalidation for ${msg.MessageBody.host}${msg.MessageBody.url} has failed after 6 retries. It will not be tried again, but subsequent ISR requests will retry.` + ); + this.routeInFailedState.delete(msg.MessageDeduplicationId); + return; + } + const nextAlarmMs = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000; + this.routeInFailedState.set(msg.MessageDeduplicationId, { + ...existingFailedState, + retryCount: existingFailedState.retryCount + 1, + nextAlarmMs, + }); + } else { + this.routeInFailedState.set(msg.MessageDeduplicationId, { + msg, + retryCount: 1, + nextAlarmMs: Date.now() + 2_000, + }); + } + // We probably want to do something if routeInFailedState is becoming too big, at least log it + await this.addAlarm(); + } + + async addAlarm() { + const existingAlarm = await this.ctx.storage.getAlarm({ allowConcurrency: false }); + if (existingAlarm) return; + if (this.routeInFailedState.size === 0) return; + + const nextAlarmToSetup = Math.min( + ...Array.from(this.routeInFailedState.values()).map(({ nextAlarmMs }) => nextAlarmMs) + ); + await this.ctx.storage.setAlarm(nextAlarmToSetup); + } +} diff --git a/packages/cloudflare/src/api/durable-queue.ts b/packages/cloudflare/src/api/durable-queue.ts new file mode 100644 index 00000000..52dd3bdc --- /dev/null +++ b/packages/cloudflare/src/api/durable-queue.ts @@ -0,0 +1,20 @@ +import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides"; +import { IgnorableError } from "@opennextjs/aws/utils/error.js"; + +import { getCloudflareContext } from "./cloudflare-context"; + +export default { + name: "durable-queue", + send: async (msg: QueueMessage) => { + const durableObject = getCloudflareContext().env.NEXT_CACHE_REVALIDATION_DURABLE_OBJECT; + if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation"); + + const id = durableObject.idFromName(msg.MessageGroupId); + const stub = durableObject.get(id); + const previewModeId = process.env.__NEXT_PREVIEW_MODE_ID!; + await stub.revalidate({ + ...msg, + previewModeId, + }); + }, +} satisfies Queue; diff --git a/packages/cloudflare/src/api/memory-queue.spec.ts b/packages/cloudflare/src/api/memory-queue.spec.ts index 52876d6f..27c5e3d1 100644 --- a/packages/cloudflare/src/api/memory-queue.spec.ts +++ b/packages/cloudflare/src/api/memory-queue.spec.ts @@ -12,6 +12,13 @@ vi.mock("./cloudflare-context", () => ({ }), })); +const generateMessageBody = ({ host, url }: { host: string; url: string }) => ({ + host, + url, + eTag: "etag", + lastModified: Date.now(), +}); + describe("MemoryQueue", () => { beforeAll(() => { vi.useFakeTimers(); @@ -22,7 +29,7 @@ describe("MemoryQueue", () => { it("should process revalidations for a path", async () => { const firstRequest = cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/test"), MessageDeduplicationId: "", }); @@ -31,7 +38,7 @@ describe("MemoryQueue", () => { expect(mockServiceWorkerFetch).toHaveBeenCalledTimes(1); const secondRequest = cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/test"), MessageDeduplicationId: "", }); @@ -42,7 +49,7 @@ describe("MemoryQueue", () => { it("should process revalidations for multiple paths", async () => { const firstRequest = cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/test"), MessageDeduplicationId: "", }); @@ -51,7 +58,7 @@ describe("MemoryQueue", () => { expect(mockServiceWorkerFetch).toHaveBeenCalledTimes(1); const secondRequest = cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/other"), MessageDeduplicationId: "", }); @@ -63,12 +70,12 @@ describe("MemoryQueue", () => { it("should de-dupe revalidations", async () => { const requests = [ cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/test"), MessageDeduplicationId: "", }), cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/test"), MessageDeduplicationId: "", }), diff --git a/packages/cloudflare/src/cli/build/bundle-server.ts b/packages/cloudflare/src/cli/build/bundle-server.ts index 580bdc9e..425ee9e0 100644 --- a/packages/cloudflare/src/cli/build/bundle-server.ts +++ b/packages/cloudflare/src/cli/build/bundle-server.ts @@ -49,14 +49,23 @@ export async function bundleServer(buildOpts: BuildOptions): Promise { patches.copyPackageCliFiles(packageDistDir, buildOpts); const { appPath, outputDir, monorepoRoot } = buildOpts; - const serverFiles = path.join( + const baseManifestPath = path.join( outputDir, "server-functions/default", getPackagePath(buildOpts), - ".next/required-server-files.json" + ".next" ); + const serverFiles = path.join(baseManifestPath, "required-server-files.json"); const nextConfig = JSON.parse(fs.readFileSync(serverFiles, "utf-8")).config; + // TODO: This is a temporary solution to get the previewModeId from the prerender-manifest.json + // We should find a better way to get this value, probably directly provided from aws + // probably in an env variable exactly as for BUILD_ID + const prerenderManifest = path.join(baseManifestPath, "prerender-manifest.json"); + const prerenderManifestContent = fs.readFileSync(prerenderManifest, "utf-8"); + const prerenderManifestJson = JSON.parse(prerenderManifestContent); + const previewModeId = prerenderManifestJson.preview.previewModeId; + console.log(`\x1b[35m⚙️ Bundling the OpenNext server...\n\x1b[0m`); await patchWebpackRuntime(buildOpts); @@ -144,6 +153,8 @@ export async function bundleServer(buildOpts: BuildOptions): Promise { "process.env.TURBOPACK": "false", // This define should be safe to use for Next 14.2+, earlier versions (13.5 and less) will cause trouble "process.env.__NEXT_EXPERIMENTAL_REACT": `${needsExperimentalReact(nextConfig)}`, + // Used for the durable object queue handler + "process.env.__NEXT_PREVIEW_MODE_ID": `"${previewModeId}"`, }, platform: "node", banner: { diff --git a/packages/cloudflare/src/cli/templates/worker.ts b/packages/cloudflare/src/cli/templates/worker.ts index 57504b86..4e36ceb3 100644 --- a/packages/cloudflare/src/cli/templates/worker.ts +++ b/packages/cloudflare/src/cli/templates/worker.ts @@ -17,6 +17,9 @@ Object.defineProperty(globalThis, Symbol.for("__cloudflare-context__"), { }, }); +//@ts-expect-error: Will be resolved by wrangler build +export { DurableObjectQueueHandler } from "@opennextjs/cloudflare/durable-objects/queue"; + // Populate process.env on the first request let processEnvPopulated = false; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 379a94fd..7b31632a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -898,8 +898,8 @@ importers: specifier: 'catalog:' version: 1.31.0 '@opennextjs/aws': - specifier: https://pkg.pr.new/@opennextjs/aws@7e23eee - version: https://pkg.pr.new/@opennextjs/aws@7e23eee + specifier: https://pkg.pr.new/@opennextjs/aws@773 + version: https://pkg.pr.new/@opennextjs/aws@773 enquirer: specifier: ^2.4.1 version: 2.4.1 @@ -3809,8 +3809,8 @@ packages: resolution: {integrity: sha512-8H4FeoxeLb24N2iWO9H3Tp8ln16YG1V3c+gIzwi+5lc+PRie/5TEjNOd1x1LLc/O9s0P2i4JjEQiDk8MFBI4TA==} hasBin: true - '@opennextjs/aws@https://pkg.pr.new/@opennextjs/aws@7e23eee': - resolution: {tarball: https://pkg.pr.new/@opennextjs/aws@7e23eee} + '@opennextjs/aws@https://pkg.pr.new/@opennextjs/aws@773': + resolution: {tarball: https://pkg.pr.new/@opennextjs/aws@773} version: 3.5.2 hasBin: true @@ -13044,7 +13044,7 @@ snapshots: - aws-crt - supports-color - '@opennextjs/aws@https://pkg.pr.new/@opennextjs/aws@7e23eee': + '@opennextjs/aws@https://pkg.pr.new/@opennextjs/aws@773': dependencies: '@aws-sdk/client-cloudfront': 3.398.0 '@aws-sdk/client-dynamodb': 3.699.0