Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions examples/e2e/app-router/open-next.config.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { defineCloudflareConfig } from "@opennextjs/cloudflare";
import d1TagCache from "@opennextjs/cloudflare/d1-tag-cache";
import kvIncrementalCache from "@opennextjs/cloudflare/kv-cache";
import memoryQueue from "@opennextjs/cloudflare/memory-queue";
// import memoryQueue from "@opennextjs/cloudflare/memory-queue";
import doQueue from "@opennextjs/cloudflare/durable-queue";

export default defineCloudflareConfig({
incrementalCache: kvIncrementalCache,
tagCache: d1TagCache,
queue: memoryQueue,
queue: doQueue,
});
14 changes: 14 additions & 0 deletions examples/e2e/app-router/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@
"directory": ".open-next/assets",
"binding": "ASSETS"
},
"durable_objects": {
"bindings": [
{
"name": "NEXT_CACHE_REVALIDATION_DURABLE_OBJECT",
"class_name": "DurableObjectQueueHandler"
}
]
},
"migrations": [
{
"tag": "v1",
"new_classes": ["DurableObjectQueueHandler"]
}
],
"kv_namespaces": [
{
"binding": "NEXT_CACHE_WORKERS_KV",
Expand Down
2 changes: 1 addition & 1 deletion packages/cloudflare/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
"dependencies": {
"@ast-grep/napi": "^0.36.1",
"@dotenvx/dotenvx": "catalog:",
"@opennextjs/aws": "https://pkg.pr.new/@opennextjs/aws@7e23eee",
"@opennextjs/aws": "https://pkg.pr.new/@opennextjs/aws@773",
"enquirer": "^2.4.1",
"glob": "catalog:",
"yaml": "^2.7.0"
Expand Down
10 changes: 10 additions & 0 deletions packages/cloudflare/src/api/cloudflare-context.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import type { Context, RunningCodeOptions } from "node:vm";

import type { DurableObjectQueueHandler } from "./durable-objects/queue";

declare global {
interface CloudflareEnv {
// KV used for the incremental cache
NEXT_CACHE_WORKERS_KV?: KVNamespace;
// D1 db used for the tag cache
NEXT_CACHE_D1?: D1Database;
// D1 table to use for the tag cache for the tag/path mapping
NEXT_CACHE_D1_TAGS_TABLE?: string;
// D1 table to use for the tag cache for storing the tag and their associated revalidation times
NEXT_CACHE_D1_REVALIDATIONS_TABLE?: string;
// Service binding for the worker itself to be able to call itself from within the worker
NEXT_CACHE_REVALIDATION_WORKER?: Service;
// Durable Object namespace to use for the durable object queue handler
NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace<DurableObjectQueueHandler>;
// Asset binding
ASSETS?: Fetcher;
}
}
Expand Down
314 changes: 314 additions & 0 deletions packages/cloudflare/src/api/durable-objects/queue.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
import { describe, expect, it, vi } from "vitest";

import { DurableObjectQueueHandler } from "./queue";

vi.mock("cloudflare:workers", () => ({
DurableObject: class {
ctx: DurableObjectState;
env: CloudflareEnv;
constructor(ctx: DurableObjectState, env: CloudflareEnv) {
this.ctx = ctx;
this.env = env;
}
},
}));

const createDurableObjectQueue = ({
fetchDuration,
statusCode,
headers,
}: {
fetchDuration: number;
statusCode?: number;
headers?: Headers;
}) => {
const mockState = {
waitUntil: vi.fn(),
blockConcurrencyWhile: vi.fn().mockImplementation(async (fn) => fn()),
storage: {
setAlarm: vi.fn(),
getAlarm: vi.fn(),
},
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return new DurableObjectQueueHandler(mockState as any, {
NEXT_CACHE_REVALIDATION_WORKER: {
fetch: vi.fn().mockReturnValue(
new Promise<Response>((res) =>
setTimeout(
() =>
res(
new Response(null, {
status: statusCode,
headers: headers ?? new Headers([["x-nextjs-cache", "REVALIDATED"]]),
})
),
fetchDuration
)
)
),
connect: vi.fn(),
},
});
};

const createMessage = (dedupId: string, lastModified = Date.now()) => ({
MessageBody: { host: "test.local", url: "/test", eTag: "test", lastModified },
MessageGroupId: "test.local/test",
MessageDeduplicationId: dedupId,
previewModeId: "test",
});

describe("DurableObjectQueue", () => {
describe("successful revalidation", () => {
it("should process a single revalidation", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
const firstRequest = await queue.revalidate(createMessage("id"));
expect(firstRequest).toBeUndefined();
expect(queue.ongoingRevalidations.size).toBe(1);
expect(queue.ongoingRevalidations.has("id")).toBe(true);

await queue.ongoingRevalidations.get("id");

expect(queue.ongoingRevalidations.size).toBe(0);
expect(queue.ongoingRevalidations.has("id")).toBe(false);
expect(queue.service.fetch).toHaveBeenCalledWith("https://test.local/test", {
method: "HEAD",
headers: {
"x-prerender-revalidate": "test",
"x-isr": "1",
},
signal: expect.any(AbortSignal),
});
});

it("should dedupe revalidations", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.revalidate(createMessage("id"));
await queue.revalidate(createMessage("id"));
expect(queue.ongoingRevalidations.size).toBe(1);
expect(queue.ongoingRevalidations.has("id")).toBe(true);
});

it("should block concurrency", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.revalidate(createMessage("id"));
await queue.revalidate(createMessage("id2"));
await queue.revalidate(createMessage("id3"));
await queue.revalidate(createMessage("id4"));
await queue.revalidate(createMessage("id5"));
// the next one should block until one of the previous ones finishes
const blockedReq = queue.revalidate(createMessage("id6"));

expect(queue.ongoingRevalidations.size).toBe(5);
expect(queue.ongoingRevalidations.has("id6")).toBe(false);
expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]);

// @ts-expect-error
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1);

// Here we await the blocked request to ensure it's resolved
await blockedReq;
// We then need to await for the actual revalidation to finish
await Promise.all(Array.from(queue.ongoingRevalidations.values()));
expect(queue.ongoingRevalidations.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(6);
});
});

describe("failed revalidation", () => {
it("should not put it in failed state for an incorrect 200", async () => {
const queue = createDurableObjectQueue({
fetchDuration: 10,
statusCode: 200,
headers: new Headers([["x-nextjs-cache", "MISS"]]),
});
await queue.revalidate(createMessage("id"));

await queue.ongoingRevalidations.get("id");

expect(queue.routeInFailedState.size).toBe(0);
});

it("should not put it in failed state for a failed revalidation with 404", async () => {
const queue = createDurableObjectQueue({
fetchDuration: 10,
statusCode: 404,
});
await queue.revalidate(createMessage("id"));

await queue.ongoingRevalidations.get("id");

expect(queue.routeInFailedState.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);

await queue.revalidate(createMessage("id"));

expect(queue.routeInFailedState.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(2);
});

it("should put it in failed state if revalidation fails with 500", async () => {
const queue = createDurableObjectQueue({
fetchDuration: 10,
statusCode: 500,
});
await queue.revalidate(createMessage("id"));

await queue.ongoingRevalidations.get("id");

expect(queue.routeInFailedState.size).toBe(1);
expect(queue.routeInFailedState.has("id")).toBe(true);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);

await queue.revalidate(createMessage("id"));

expect(queue.routeInFailedState.size).toBe(1);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
});

it("should put it in failed state if revalidation fetch throw", async () => {
const queue = createDurableObjectQueue({
fetchDuration: 10,
});
// @ts-expect-error - This is mocked above
queue.service.fetch.mockImplementationOnce(() => Promise.reject(new Error("fetch error")));
await queue.revalidate(createMessage("id"));

await queue.ongoingRevalidations.get("id");

expect(queue.routeInFailedState.size).toBe(1);
expect(queue.routeInFailedState.has("id")).toBe(true);
expect(queue.ongoingRevalidations.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);

await queue.revalidate(createMessage("id"));

expect(queue.routeInFailedState.size).toBe(1);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
});
});

describe("addAlarm", () => {
const getStorage = (queue: DurableObjectQueueHandler): DurableObjectStorage => {
// @ts-expect-error - ctx is a protected field
return queue.ctx.storage;
};

it("should not add an alarm if there are no failed states", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.addAlarm();
expect(getStorage(queue).setAlarm).not.toHaveBeenCalled();
});

it("should add an alarm if there are failed states", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 });
await queue.addAlarm();
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000);
});

it("should not add an alarm if there is already an alarm set", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 });
// @ts-expect-error
queue.ctx.storage.getAlarm.mockResolvedValueOnce(1000);
await queue.addAlarm();
expect(getStorage(queue).setAlarm).not.toHaveBeenCalled();
});

it("should set the alarm to the lowest nextAlarm", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 });
queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarm: 500 });
await queue.addAlarm();
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500);
});
});

describe("addToFailedState", () => {
it("should add a failed state", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.addToFailedState(createMessage("id"));
expect(queue.routeInFailedState.size).toBe(1);
expect(queue.routeInFailedState.has("id")).toBe(true);
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1);
});

it("should add a failed state with the correct nextAlarm", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.addToFailedState(createMessage("id"));
expect(queue.routeInFailedState.get("id")?.nextAlarm).toBeGreaterThan(Date.now());
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1);
});

it("should add a failed state with the correct nextAlarm for a retry", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
await queue.addToFailedState(createMessage("id"));
await queue.addToFailedState(createMessage("id"));
expect(queue.routeInFailedState.get("id")?.nextAlarm).toBeGreaterThan(Date.now());
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(2);
});

it("should not add a failed state if it has been retried 6 times", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 6, nextAlarm: 1000 });
await queue.addToFailedState(createMessage("id"));
expect(queue.routeInFailedState.size).toBe(0);
});
});

describe("alarm", () => {
it("should execute revalidations for expired events", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", {
msg: createMessage("id"),
retryCount: 0,
nextAlarm: Date.now() - 1000,
});
queue.routeInFailedState.set("id2", {
msg: createMessage("id2"),
retryCount: 0,
nextAlarm: Date.now() - 1000,
});
await queue.alarm();
expect(queue.routeInFailedState.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(2);
});

it("should execute revalidations for the next event to retry", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", {
msg: createMessage("id"),
retryCount: 0,
nextAlarm: Date.now() + 1000,
});
queue.routeInFailedState.set("id2", {
msg: createMessage("id2"),
retryCount: 0,
nextAlarm: Date.now() + 500,
});
await queue.alarm();
expect(queue.routeInFailedState.size).toBe(1);
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
expect(queue.routeInFailedState.has("id2")).toBe(false);
});

it("should execute revalidations for the next event to retry and expired events", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", {
msg: createMessage("id"),
retryCount: 0,
nextAlarm: Date.now() + 1000,
});
queue.routeInFailedState.set("id2", {
msg: createMessage("id2"),
retryCount: 0,
nextAlarm: Date.now() - 1000,
});
await queue.alarm();
expect(queue.routeInFailedState.size).toBe(0);
expect(queue.service.fetch).toHaveBeenCalledTimes(2);
});
});
});
Loading