Skip to content

Commit 9008a9f

Browse files
committed
moved cache out of the durable queue
1 parent fdd07ed commit 9008a9f

File tree

3 files changed

+107
-55
lines changed

3 files changed

+107
-55
lines changed

examples/e2e/app-router/open-next.config.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ import { defineCloudflareConfig } from "@opennextjs/cloudflare";
22
import kvIncrementalCache from "@opennextjs/cloudflare/kv-cache";
33
import shardedTagCache from "@opennextjs/cloudflare/do-sharded-tag-cache";
44
import doQueue from "@opennextjs/cloudflare/durable-queue";
5+
import queueCache from "@opennextjs/cloudflare/queue-cache";
56

67
export default defineCloudflareConfig({
78
incrementalCache: kvIncrementalCache,
89
tagCache: shardedTagCache({ numberOfShards: 12 }),
9-
queue: doQueue({ enableRegionalCache: true, regionalCacheTtlSec: 5 }),
10+
queue: queueCache(doQueue),
1011
});

packages/cloudflare/src/api/durable-queue.ts

Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -3,58 +3,16 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js";
33

44
import { getCloudflareContext } from "./cloudflare-context";
55

6-
interface DurableQueueOptions {
7-
/**
8-
* Enables a regional cache for the queue.
9-
* When enabled, the first request to the queue is cached for `regionalCacheTtlSec` seconds.
10-
* Subsequent similar requests during this period will bypass processing and use the cached result.
11-
* **Note:** Ensure the `MAX_REVALIDATE_CONCURRENCY` environment variable is appropriately increased before enabling this feature.
12-
* In case of an error, cache revalidation may be delayed by up to `regionalCacheTtlSec` seconds.
13-
* @default false
14-
*/
15-
enableRegionalCache?: boolean;
16-
/**
17-
* The TTL for the regional cache in seconds.
18-
* @default 5
19-
*/
20-
regionalCacheTtlSec?: number;
21-
}
6+
export default {
7+
name: "durable-queue",
8+
send: async (msg: QueueMessage) => {
9+
const durableObject = getCloudflareContext().env.NEXT_CACHE_REVALIDATION_DURABLE_OBJECT;
10+
if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation");
2211

23-
const DEFAULT_QUEUE_CACHE_TTL_SEC = 5;
24-
25-
function getCacheKey(msg: QueueMessage) {
26-
return new Request(
27-
new URL(`queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`, "http://local.cache")
28-
);
29-
}
30-
31-
export default ({enableRegionalCache, regionalCacheTtlSec}: DurableQueueOptions = {}) => {
32-
return {
33-
name: "durable-queue",
34-
send: async (msg: QueueMessage) => {
35-
const durableObject = getCloudflareContext().env.NEXT_CACHE_REVALIDATION_DURABLE_OBJECT;
36-
if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation");
37-
38-
if(enableRegionalCache) {
39-
const cacheKey = getCacheKey(msg);
40-
const cache = await caches.open("durable-queue");
41-
const cachedResponse = await cache.match(cacheKey);
42-
if(cachedResponse) {
43-
return;
44-
}
45-
46-
// Here we cache the first request to the queue for `regionalCacheTtlSec` seconds
47-
// We want to do it as soon as possible so that subsequent requests can use the cached response
48-
// TODO: Do we really want to cache this before sending the message to the queue? It could be an option to cache it after the message is sent
49-
await cache.put(cacheKey, new Response(null, { status: 200, headers: { "Cache-Control": `max-age=${regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC}` } }));
50-
51-
}
52-
53-
const id = durableObject.idFromName(msg.MessageGroupId);
54-
const stub = durableObject.get(id);
55-
await stub.revalidate({
56-
...msg,
57-
});
58-
},
59-
} satisfies Queue;
60-
}
12+
const id = durableObject.idFromName(msg.MessageGroupId);
13+
const stub = durableObject.get(id);
14+
await stub.revalidate({
15+
...msg,
16+
});
17+
},
18+
} satisfies Queue;
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides";
2+
3+
interface QueueCachingOptions {
4+
/**
5+
* Enables a regional cache for the queue.
6+
* When enabled, the first request to the queue is cached for `regionalCacheTtlSec` seconds.
7+
* Subsequent similar requests during this period will bypass processing and use the cached result.
8+
* **Note:** Ensure the `MAX_REVALIDATE_CONCURRENCY` environment variable is appropriately increased before enabling this feature.
9+
* In case of an error, cache revalidation may be delayed by up to `regionalCacheTtlSec` seconds.
10+
* @default false
11+
*/
12+
enableRegionalCache?: boolean;
13+
/**
14+
* The TTL for the regional cache in seconds.
15+
* @default 5
16+
*/
17+
regionalCacheTtlSec?: number;
18+
19+
/**
20+
* Whether to wait for the queue ack before returning.
21+
* When set to false, the cache will be populated asap and the queue will be called after.
22+
* When set to true, the cache will be populated only after the queue ack is received.
23+
* @default false
24+
*/
25+
waitForQueueAck?: boolean;
26+
}
27+
28+
const DEFAULT_QUEUE_CACHE_TTL_SEC = 5;
29+
30+
class QueueCache implements Queue {
31+
readonly name;
32+
readonly enableRegionalCache: boolean;
33+
readonly regionalCacheTtlSec: number;
34+
readonly waitForQueueAck: boolean;
35+
cache: Cache | undefined;
36+
37+
constructor(
38+
private originalQueue: Queue,
39+
options: QueueCachingOptions
40+
) {
41+
this.name = `cached-${originalQueue.name}`;
42+
this.enableRegionalCache = options.enableRegionalCache ?? false;
43+
this.regionalCacheTtlSec = options.regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC;
44+
this.waitForQueueAck = options.waitForQueueAck ?? false;
45+
}
46+
47+
async send(msg: QueueMessage) {
48+
if (this.enableRegionalCache) {
49+
const isCached = await this.isInCache(msg);
50+
if (isCached) {
51+
return;
52+
}
53+
if (!this.waitForQueueAck) {
54+
await this.putToCache(msg);
55+
}
56+
}
57+
58+
await this.originalQueue.send(msg);
59+
if (this.waitForQueueAck) {
60+
await this.putToCache(msg);
61+
}
62+
}
63+
64+
private async getCache() {
65+
if (!this.cache) {
66+
this.cache = await caches.open("durable-queue");
67+
}
68+
return this.cache;
69+
}
70+
71+
private getCacheKey(msg: QueueMessage) {
72+
return new Request(
73+
new URL(`queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`, "http://local.cache")
74+
);
75+
}
76+
77+
private async putToCache(msg: QueueMessage) {
78+
const cacheKey = this.getCacheKey(msg);
79+
const cache = await this.getCache();
80+
await cache.put(
81+
cacheKey,
82+
new Response(null, { status: 200, headers: { "Cache-Control": `max-age=${this.regionalCacheTtlSec}` } })
83+
);
84+
}
85+
86+
private async isInCache(msg: QueueMessage) {
87+
const cacheKey = this.getCacheKey(msg);
88+
const cache = await this.getCache();
89+
return await cache.match(cacheKey);
90+
}
91+
}
92+
93+
export default (originalQueue: Queue, opts: QueueCachingOptions = {}) => new QueueCache(originalQueue, opts);

0 commit comments

Comments
 (0)