Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/large-zoos-approve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/cloudflare": patch
---

add an optional cache for the durable queue
3 changes: 2 additions & 1 deletion examples/e2e/app-router/open-next.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { defineCloudflareConfig } from "@opennextjs/cloudflare";
import r2IncrementalCache from "@opennextjs/cloudflare/overrides/incremental-cache/r2-incremental-cache";
import shardedTagCache from "@opennextjs/cloudflare/overrides/tag-cache/do-sharded-tag-cache";
import doQueue from "@opennextjs/cloudflare/overrides/queue/do-queue";
import queueCache from "@opennextjs/cloudflare/overrides/queue/queue-cache";

export default defineCloudflareConfig({
incrementalCache: r2IncrementalCache,
Expand All @@ -13,6 +14,6 @@ export default defineCloudflareConfig({
numberOfHardReplicas: 2,
},
}),
queue: doQueue,
enableCacheInterception: true,
queue: queueCache(doQueue),
});
2 changes: 1 addition & 1 deletion packages/cloudflare/src/api/overrides/queue/do-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js";
import { getCloudflareContext } from "../../cloudflare-context";

export default {
name: "do-queue",
name: "durable-queue",
send: async (msg: QueueMessage) => {
const durableObject = getCloudflareContext().env.NEXT_CACHE_DO_QUEUE;
if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation");
Expand Down
112 changes: 112 additions & 0 deletions packages/cloudflare/src/api/overrides/queue/queue-cache.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import type { Queue } from "@opennextjs/aws/types/overrides";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";

import queueCache from "./queue-cache";

const mockedQueue = {
name: "mocked-queue",
send: vi.fn(),
} satisfies Queue;

const generateMessage = () => ({
MessageGroupId: "test",
MessageBody: {
eTag: "test",
url: "test",
host: "test",
lastModified: Date.now(),
},
MessageDeduplicationId: "test",
});

const mockedPut = vi.fn();
const mockedMatch = vi.fn().mockReturnValue(null);

describe("queue-cache", () => {
beforeEach(() => {
// @ts-ignore
globalThis.caches = {
open: vi.fn().mockReturnValue({
put: mockedPut,
match: mockedMatch,
}),
};
});

afterEach(() => {
vi.resetAllMocks();
});
test("should send the message to the original queue", async () => {
const msg = generateMessage();
const queue = queueCache(mockedQueue, {});
expect(queue.name).toBe("cached-mocked-queue");
await queue.send(msg);
expect(mockedQueue.send).toHaveBeenCalledWith(msg);
});

test("should use the local cache", async () => {
const msg = generateMessage();
const queue = queueCache(mockedQueue, {});
await queue.send(msg);

expect(queue.localCache.size).toBe(1);
expect(queue.localCache.has(`queue/test/test`)).toBe(true);
expect(mockedPut).toHaveBeenCalled();

const spiedHas = vi.spyOn(queue.localCache, "has");
await queue.send(msg);
expect(spiedHas).toHaveBeenCalled();

expect(mockedQueue.send).toHaveBeenCalledTimes(1);

expect(mockedMatch).toHaveBeenCalledTimes(1);
});

test("should clear the local cache after 5s", async () => {
vi.useFakeTimers();
const msg = generateMessage();
const queue = queueCache(mockedQueue, {});
await queue.send(msg);
expect(queue.localCache.size).toBe(1);
expect(queue.localCache.has(`queue/test/test`)).toBe(true);

vi.advanceTimersByTime(5001);
const alteredMsg = generateMessage();
alteredMsg.MessageGroupId = "test2";
await queue.send(alteredMsg);
expect(queue.localCache.size).toBe(1);
console.log(queue.localCache);
expect(queue.localCache.has(`queue/test2/test`)).toBe(true);
expect(queue.localCache.has(`queue/test/test`)).toBe(false);
vi.useRealTimers();
});

test("should use the regional cache if not in local cache", async () => {
const msg = generateMessage();
const queue = queueCache(mockedQueue, {});
await queue.send(msg);

expect(mockedMatch).toHaveBeenCalledTimes(1);
expect(mockedPut).toHaveBeenCalledTimes(1);
expect(queue.localCache.size).toBe(1);
expect(queue.localCache.has(`queue/test/test`)).toBe(true);
// We need to delete the local cache to test the regional cache
queue.localCache.delete(`queue/test/test`);

const spiedHas = vi.spyOn(queue.localCache, "has");
await queue.send(msg);
expect(spiedHas).toHaveBeenCalled();
expect(mockedMatch).toHaveBeenCalledTimes(2);
});

test("should return early if the message is in the regional cache", async () => {
const msg = generateMessage();
const queue = queueCache(mockedQueue, {});

mockedMatch.mockReturnValueOnce(new Response(null, { status: 200 }));

const spiedSend = mockedQueue.send;
await queue.send(msg);
expect(spiedSend).not.toHaveBeenCalled();
});
});
122 changes: 122 additions & 0 deletions packages/cloudflare/src/api/overrides/queue/queue-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { error } from "@opennextjs/aws/adapters/logger.js";
import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides";

interface QueueCachingOptions {
/**
* The TTL for the regional cache in seconds.
* @default 5
*/
regionalCacheTtlSec?: number;

/**
* Whether to wait for the queue ack before returning.
* When set to false, the cache will be populated asap and the queue will be called after.
* When set to true, the cache will be populated only after the queue ack is received.
* @default false
*/
waitForQueueAck?: boolean;
}

const DEFAULT_QUEUE_CACHE_TTL_SEC = 5;

class QueueCache implements Queue {
readonly name;
readonly regionalCacheTtlSec: number;
readonly waitForQueueAck: boolean;
cache: Cache | undefined;
// Local mapping from key to insertedAtSec
localCache: Map<string, number> = new Map();

constructor(
private originalQueue: Queue,
options: QueueCachingOptions
) {
this.name = `cached-${originalQueue.name}`;
this.regionalCacheTtlSec = options.regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC;
this.waitForQueueAck = options.waitForQueueAck ?? false;
}

async send(msg: QueueMessage) {
try {
const isCached = await this.isInCache(msg);
if (isCached) {
return;
}
if (!this.waitForQueueAck) {
await this.putToCache(msg);
await this.originalQueue.send(msg);
} else {
await this.originalQueue.send(msg);
await this.putToCache(msg);
}
} catch (e) {
error("Error sending message to queue", e);
} finally {
this.clearLocalCache();
}
}

private async getCache() {
if (!this.cache) {
this.cache = await caches.open("durable-queue");
}
return this.cache;
}

private getCacheUrlString(msg: QueueMessage) {
return `queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`;
}

private getCacheKey(msg: QueueMessage) {
return "http://local.cache" + this.getCacheUrlString(msg);
}

private async putToCache(msg: QueueMessage) {
this.localCache.set(this.getCacheUrlString(msg), Date.now());
const cacheKey = this.getCacheKey(msg);
const cache = await this.getCache();
await cache.put(
cacheKey,
new Response(null, {
status: 200,
headers: {
"Cache-Control": `max-age=${this.regionalCacheTtlSec}`,
// Tag cache is set to the value of the soft tag assigned by Next.js
// This way you can invalidate this cache as well as any other regional cache
"Cache-Tag": `_N_T_/${msg.MessageBody.url}`,
},
})
);
}

private async isInCache(msg: QueueMessage) {
if (this.localCache.has(this.getCacheUrlString(msg))) {
const insertedAt = this.localCache.get(this.getCacheUrlString(msg))!;
if (Date.now() - insertedAt < this.regionalCacheTtlSec * 1000) {
return true;
}
this.localCache.delete(this.getCacheUrlString(msg));
return false;
}
const cacheKey = this.getCacheKey(msg);
const cache = await this.getCache();
const cachedResponse = await cache.match(cacheKey);
if (cachedResponse) {
return true;
}
}

/**
* Remove any value older than the TTL from the local cache
*/
private clearLocalCache() {
const insertAtSecMax = Date.now() - this.regionalCacheTtlSec * 1000;
for (const [key, insertAtSec] of this.localCache.entries()) {
if (insertAtSec < insertAtSecMax) {
this.localCache.delete(key);
}
}
}
}

export default (originalQueue: Queue, opts: QueueCachingOptions = {}) => new QueueCache(originalQueue, opts);