Skip to content

Commit f5303c1

Browse files
committed
local queue cache
1 parent 9008a9f commit f5303c1

File tree

1 file changed

+34
-20
lines changed

1 file changed

+34
-20
lines changed

packages/cloudflare/src/api/queue-cache.ts

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,7 @@
1+
import { error } from "@opennextjs/aws/adapters/logger.js";
12
import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides";
23

34
interface QueueCachingOptions {
4-
/**
5-
* Enables a regional cache for the queue.
6-
* When enabled, the first request to the queue is cached for `regionalCacheTtlSec` seconds.
7-
* Subsequent similar requests during this period will bypass processing and use the cached result.
8-
* **Note:** Ensure the `MAX_REVALIDATE_CONCURRENCY` environment variable is appropriately increased before enabling this feature.
9-
* In case of an error, cache revalidation may be delayed by up to `regionalCacheTtlSec` seconds.
10-
* @default false
11-
*/
12-
enableRegionalCache?: boolean;
135
/**
146
* The TTL for the regional cache in seconds.
157
* @default 5
@@ -29,35 +21,36 @@ const DEFAULT_QUEUE_CACHE_TTL_SEC = 5;
2921

3022
class QueueCache implements Queue {
3123
readonly name;
32-
readonly enableRegionalCache: boolean;
3324
readonly regionalCacheTtlSec: number;
3425
readonly waitForQueueAck: boolean;
3526
cache: Cache | undefined;
27+
localCache: Map<string, number> = new Map();
3628

3729
constructor(
3830
private originalQueue: Queue,
3931
options: QueueCachingOptions
4032
) {
4133
this.name = `cached-${originalQueue.name}`;
42-
this.enableRegionalCache = options.enableRegionalCache ?? false;
4334
this.regionalCacheTtlSec = options.regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC;
4435
this.waitForQueueAck = options.waitForQueueAck ?? false;
4536
}
4637

4738
async send(msg: QueueMessage) {
48-
if (this.enableRegionalCache) {
39+
try {
4940
const isCached = await this.isInCache(msg);
5041
if (isCached) {
5142
return;
5243
}
5344
if (!this.waitForQueueAck) {
5445
await this.putToCache(msg);
5546
}
56-
}
5747

58-
await this.originalQueue.send(msg);
59-
if (this.waitForQueueAck) {
60-
await this.putToCache(msg);
48+
await this.originalQueue.send(msg);
49+
if (this.waitForQueueAck) {
50+
await this.putToCache(msg);
51+
}
52+
} catch (e) {
53+
error("Error sending message to queue", e);
6154
}
6255
}
6356

@@ -68,13 +61,16 @@ class QueueCache implements Queue {
6861
return this.cache;
6962
}
7063

64+
private getCacheUrlString(msg: QueueMessage) {
65+
return `queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`;
66+
}
67+
7168
private getCacheKey(msg: QueueMessage) {
72-
return new Request(
73-
new URL(`queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`, "http://local.cache")
74-
);
69+
return new Request(new URL(this.getCacheUrlString(msg), "http://local.cache"));
7570
}
7671

7772
private async putToCache(msg: QueueMessage) {
73+
this.localCache.set(this.getCacheUrlString(msg), Date.now());
7874
const cacheKey = this.getCacheKey(msg);
7975
const cache = await this.getCache();
8076
await cache.put(
@@ -84,9 +80,27 @@ class QueueCache implements Queue {
8480
}
8581

8682
private async isInCache(msg: QueueMessage) {
83+
if (this.localCache.has(this.getCacheUrlString(msg))) {
84+
return true;
85+
}
8786
const cacheKey = this.getCacheKey(msg);
8887
const cache = await this.getCache();
89-
return await cache.match(cacheKey);
88+
const cachedResponse = await cache.match(cacheKey);
89+
if (cachedResponse) {
90+
return true;
91+
}
92+
}
93+
94+
/**
95+
* Remove any value older than the TTL from the local cache
96+
*/
97+
private clearLocalCache() {
98+
const now = Date.now();
99+
for (const [key, value] of this.localCache.entries()) {
100+
if (now - value > this.regionalCacheTtlSec * 1000) {
101+
this.localCache.delete(key);
102+
}
103+
}
90104
}
91105
}
92106

0 commit comments

Comments
 (0)