Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/e2e/app-router/open-next.config.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { defineCloudflareConfig } from "@opennextjs/cloudflare";
import d1TagCache from "@opennextjs/cloudflare/d1-tag-cache";
import kvIncrementalCache from "@opennextjs/cloudflare/kv-cache";
import memoryQueue from "@opennextjs/cloudflare/memory-queue";
import doQueue from "@opennextjs/cloudflare/durable-queue";

export default defineCloudflareConfig({
incrementalCache: kvIncrementalCache,
tagCache: d1TagCache,
queue: memoryQueue,
queue: doQueue,
});
14 changes: 14 additions & 0 deletions examples/e2e/app-router/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@
"directory": ".open-next/assets",
"binding": "ASSETS"
},
"durable_objects": {
"bindings": [
{
"name": "NEXT_CACHE_REVALIDATION_DURABLE_OBJECT",
"class_name": "DurableObjectQueueHandler"
}
]
},
"migrations": [
{
"tag": "v1",
"new_classes": ["DurableObjectQueueHandler"]
}
],
"kv_namespaces": [
{
"binding": "NEXT_CACHE_WORKERS_KV",
Expand Down
2 changes: 1 addition & 1 deletion packages/cloudflare/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
"dependencies": {
"@ast-grep/napi": "^0.36.1",
"@dotenvx/dotenvx": "catalog:",
"@opennextjs/aws": "https://pkg.pr.new/@opennextjs/aws@7e23eee",
"@opennextjs/aws": "https://pkg.pr.new/@opennextjs/aws@773",
"enquirer": "^2.4.1",
"glob": "catalog:",
"yaml": "^2.7.0"
Expand Down
10 changes: 10 additions & 0 deletions packages/cloudflare/src/api/cloudflare-context.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import type { Context, RunningCodeOptions } from "node:vm";

import type { DurableObjectQueueHandler } from "./durable-objects/queue";

declare global {
interface CloudflareEnv {
// KV used for the incremental cache
NEXT_CACHE_WORKERS_KV?: KVNamespace;
// D1 db used for the tag cache
NEXT_CACHE_D1?: D1Database;
// D1 table to use for the tag cache for the tag/path mapping
NEXT_CACHE_D1_TAGS_TABLE?: string;
// D1 table to use for the tag cache for storing the tag and their associated revalidation times
NEXT_CACHE_D1_REVALIDATIONS_TABLE?: string;
// Service binding for the worker itself to be able to call itself from within the worker
NEXT_CACHE_REVALIDATION_WORKER?: Service;
// Durable Object namespace to use for the durable object queue handler
NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace<DurableObjectQueueHandler>;
// Asset binding
ASSETS?: Fetcher;
}
}
Expand Down
312 changes: 312 additions & 0 deletions packages/cloudflare/src/api/durable-objects/queue.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
import { describe, expect, it, vi } from "vitest";

import { DurableObjectQueueHandler } from "./queue";

vi.mock("cloudflare:workers", () => ({
DurableObject: class {
constructor(
public ctx: DurableObjectState,
public env: CloudflareEnv
) {}
},
}));

const createDurableObjectQueue = ({
fetchDuration,
statusCode,
headers,
}: {
fetchDuration: number;
statusCode?: number;
headers?: Headers;
}) => {
const mockState = {
waitUntil: vi.fn(),
blockConcurrencyWhile: vi.fn().mockImplementation(async (fn) => fn()),
storage: {
setAlarm: vi.fn(),
getAlarm: vi.fn(),
},
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return new DurableObjectQueueHandler(mockState as any, {
NEXT_CACHE_REVALIDATION_WORKER: {
fetch: vi.fn().mockReturnValue(
new Promise<Response>((res) =>
setTimeout(
() =>
res(
new Response(null, {
status: statusCode,
headers: headers ?? new Headers([["x-nextjs-cache", "REVALIDATED"]]),
})
),
fetchDuration
)
)
),
connect: vi.fn(),
},
});
};

const createMessage = (dedupId: string, lastModified = Date.now()) => ({
MessageBody: { host: "test.local", url: "/test", eTag: "test", lastModified },
MessageGroupId: "test.local/test",
MessageDeduplicationId: dedupId,
previewModeId: "test",
});

describe("DurableObjectQueue", () => {
describe("successful revalidation", () => {
it("should process a single revalidation", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
const firstRequest = await queue.revalidate(createMessage("id"));
expect(firstRequest).toBeUndefined();
expect(queue.ongoingRevalidations.size).toBe(1);
expect(queue.ongoingRevalidations.has("id")).toBe(true);

await queue.ongoingRevalidations.get("id");

expect(queue.ongoingRevalidations.size).toBe(0);
expect(queue.ongoingRevalidations.has("id")).toBe(false);
expect(queue.service.fetch).toHaveBeenCalledWith("https://test.local/test", {
method: "HEAD",
headers: {
"x-prerender-revalidate": "test",
"x-isr": "1",
},
signal: expect.any(AbortSignal),
});
});

it("should dedupe revalidations", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.revalidate(createMessage("id"));
await queue.revalidate(createMessage("id"));
expect(queue.ongoingRevalidations.size).toBe(1);
expect(queue.ongoingRevalidations.has("id")).toBe(true);
});

it("should block concurrency", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.revalidate(createMessage("id"));
await queue.revalidate(createMessage("id2"));
await queue.revalidate(createMessage("id3"));
await queue.revalidate(createMessage("id4"));
await queue.revalidate(createMessage("id5"));
// the next one should block until one of the previous ones finishes
const blockedReq = queue.revalidate(createMessage("id6"));

expect(queue.ongoingRevalidations.size).toBe(queue.maxRevalidations);
expect(queue.ongoingRevalidations.has("id6")).toBe(false);
expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]);

// @ts-expect-error
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1);

// Here we await the blocked request to ensure it's resolved
await blockedReq;
// We then need to await for the actual revalidation to finish
await Promise.all(Array.from(queue.ongoingRevalidations.values()));
expect(queue.ongoingRevalidations.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(6);
});
});

describe("failed revalidation", () => {
it("should not put it in failed state for an incorrect 200", async () => {
const queue = createDurableObjectQueue({
fetchDuration: 10,
statusCode: 200,
headers: new Headers([["x-nextjs-cache", "MISS"]]),
});
await queue.revalidate(createMessage("id"));

await queue.ongoingRevalidations.get("id");

expect(queue.routeInFailedState.size).toBe(0);
});

it("should not put it in failed state for a failed revalidation with 404", async () => {
const queue = createDurableObjectQueue({
fetchDuration: 10,
statusCode: 404,
});
await queue.revalidate(createMessage("id"));

await queue.ongoingRevalidations.get("id");

expect(queue.routeInFailedState.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);

await queue.revalidate(createMessage("id"));

expect(queue.routeInFailedState.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(2);
});

it("should put it in failed state if revalidation fails with 500", async () => {
const queue = createDurableObjectQueue({
fetchDuration: 10,
statusCode: 500,
});
await queue.revalidate(createMessage("id"));

await queue.ongoingRevalidations.get("id");

expect(queue.routeInFailedState.size).toBe(1);
expect(queue.routeInFailedState.has("id")).toBe(true);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);

await queue.revalidate(createMessage("id"));

expect(queue.routeInFailedState.size).toBe(1);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
});

it("should put it in failed state if revalidation fetch throw", async () => {
const queue = createDurableObjectQueue({
fetchDuration: 10,
});
// @ts-expect-error - This is mocked above
queue.service.fetch.mockImplementationOnce(() => Promise.reject(new Error("fetch error")));
await queue.revalidate(createMessage("id"));

await queue.ongoingRevalidations.get("id");

expect(queue.routeInFailedState.size).toBe(1);
expect(queue.routeInFailedState.has("id")).toBe(true);
expect(queue.ongoingRevalidations.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);

await queue.revalidate(createMessage("id"));

expect(queue.routeInFailedState.size).toBe(1);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
});
});

describe("addAlarm", () => {
const getStorage = (queue: DurableObjectQueueHandler): DurableObjectStorage => {
// @ts-expect-error - ctx is a protected field
return queue.ctx.storage;
};

it("should not add an alarm if there are no failed states", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.addAlarm();
expect(getStorage(queue).setAlarm).not.toHaveBeenCalled();
});

it("should add an alarm if there are failed states", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
await queue.addAlarm();
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000);
});

it("should not add an alarm if there is already an alarm set", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
// @ts-expect-error
queue.ctx.storage.getAlarm.mockResolvedValueOnce(1000);
await queue.addAlarm();
expect(getStorage(queue).setAlarm).not.toHaveBeenCalled();
});

it("should set the alarm to the lowest nextAlarm", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarmMs: 500 });
await queue.addAlarm();
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500);
});
});

describe("addToFailedState", () => {
it("should add a failed state", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.addToFailedState(createMessage("id"));
expect(queue.routeInFailedState.size).toBe(1);
expect(queue.routeInFailedState.has("id")).toBe(true);
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1);
});

it("should add a failed state with the correct nextAlarm", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.addToFailedState(createMessage("id"));
expect(queue.routeInFailedState.get("id")?.nextAlarmMs).toBeGreaterThan(Date.now());
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1);
});

it("should add a failed state with the correct nextAlarm for a retry", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.addToFailedState(createMessage("id"));
await queue.addToFailedState(createMessage("id"));
expect(queue.routeInFailedState.get("id")?.nextAlarmMs).toBeGreaterThan(Date.now());
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(2);
});

it("should not add a failed state if it has been retried 6 times", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 6, nextAlarmMs: 1000 });
await queue.addToFailedState(createMessage("id"));
expect(queue.routeInFailedState.size).toBe(0);
});
});

describe("alarm", () => {
it("should execute revalidations for expired events", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", {
msg: createMessage("id"),
retryCount: 0,
nextAlarmMs: Date.now() - 1000,
});
queue.routeInFailedState.set("id2", {
msg: createMessage("id2"),
retryCount: 0,
nextAlarmMs: Date.now() - 1000,
});
await queue.alarm();
expect(queue.routeInFailedState.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(2);
});

it("should execute revalidations for the next event to retry", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", {
msg: createMessage("id"),
retryCount: 0,
nextAlarmMs: Date.now() + 1000,
});
queue.routeInFailedState.set("id2", {
msg: createMessage("id2"),
retryCount: 0,
nextAlarmMs: Date.now() + 500,
});
await queue.alarm();
expect(queue.routeInFailedState.size).toBe(1);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
expect(queue.routeInFailedState.has("id2")).toBe(false);
});

it("should execute revalidations for the next event to retry and expired events", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", {
msg: createMessage("id"),
retryCount: 0,
nextAlarmMs: Date.now() + 1000,
});
queue.routeInFailedState.set("id2", {
msg: createMessage("id2"),
retryCount: 0,
nextAlarmMs: Date.now() - 1000,
});
await queue.alarm();
expect(queue.routeInFailedState.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(2);
});
});
});
Loading