Skip to content

Commit e7de1bb

Browse files
committed
review fix
1 parent 0a49f3c commit e7de1bb

File tree

3 files changed

+32
-39
lines changed

3 files changed

+32
-39
lines changed

examples/e2e/app-router/open-next.config.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { defineCloudflareConfig } from "@opennextjs/cloudflare";
22
import d1TagCache from "@opennextjs/cloudflare/d1-tag-cache";
33
import kvIncrementalCache from "@opennextjs/cloudflare/kv-cache";
4-
// import memoryQueue from "@opennextjs/cloudflare/memory-queue";
54
import doQueue from "@opennextjs/cloudflare/durable-queue";
65

76
export default defineCloudflareConfig({

packages/cloudflare/src/api/durable-objects/queue.spec.ts

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@ import { DurableObjectQueueHandler } from "./queue";
44

55
vi.mock("cloudflare:workers", () => ({
66
DurableObject: class {
7-
ctx: DurableObjectState;
8-
env: CloudflareEnv;
9-
constructor(ctx: DurableObjectState, env: CloudflareEnv) {
10-
this.ctx = ctx;
11-
this.env = env;
12-
}
7+
constructor(
8+
public ctx: DurableObjectState,
9+
public env: CloudflareEnv
10+
) {}
1311
},
1412
}));
1513

@@ -100,7 +98,7 @@ describe("DurableObjectQueue", () => {
10098
// the next one should block until one of the previous ones finishes
10199
const blockedReq = queue.revalidate(createMessage("id6"));
102100

103-
expect(queue.ongoingRevalidations.size).toBe(5);
101+
expect(queue.ongoingRevalidations.size).toBe(queue.maxRevalidations);
104102
expect(queue.ongoingRevalidations.has("id6")).toBe(false);
105103
expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]);
106104

@@ -203,14 +201,14 @@ describe("DurableObjectQueue", () => {
203201

204202
it("should add an alarm if there are failed states", async () => {
205203
const queue = createDurableObjectQueue({ fetchDuration: 10 });
206-
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 });
204+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
207205
await queue.addAlarm();
208206
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000);
209207
});
210208

211209
it("should not add an alarm if there is already an alarm set", async () => {
212210
const queue = createDurableObjectQueue({ fetchDuration: 10 });
213-
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 });
211+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
214212
// @ts-expect-error
215213
queue.ctx.storage.getAlarm.mockResolvedValueOnce(1000);
216214
await queue.addAlarm();
@@ -219,8 +217,8 @@ describe("DurableObjectQueue", () => {
219217

220218
it("should set the alarm to the lowest nextAlarm", async () => {
221219
const queue = createDurableObjectQueue({ fetchDuration: 10 });
222-
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 });
223-
queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarm: 500 });
220+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
221+
queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarmMs: 500 });
224222
await queue.addAlarm();
225223
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500);
226224
});
@@ -238,21 +236,21 @@ describe("DurableObjectQueue", () => {
238236
it("should add a failed state with the correct nextAlarm", async () => {
239237
const queue = createDurableObjectQueue({ fetchDuration: 10 });
240238
await queue.addToFailedState(createMessage("id"));
241-
expect(queue.routeInFailedState.get("id")?.nextAlarm).toBeGreaterThan(Date.now());
239+
expect(queue.routeInFailedState.get("id")?.nextAlarmMs).toBeGreaterThan(Date.now());
242240
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1);
243241
});
244242

245243
it("should add a failed state with the correct nextAlarm for a retry", async () => {
246244
const queue = createDurableObjectQueue({ fetchDuration: 10 });
247245
await queue.addToFailedState(createMessage("id"));
248246
await queue.addToFailedState(createMessage("id"));
249-
expect(queue.routeInFailedState.get("id")?.nextAlarm).toBeGreaterThan(Date.now());
247+
expect(queue.routeInFailedState.get("id")?.nextAlarmMs).toBeGreaterThan(Date.now());
250248
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(2);
251249
});
252250

253251
it("should not add a failed state if it has been retried 6 times", async () => {
254252
const queue = createDurableObjectQueue({ fetchDuration: 10 });
255-
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 6, nextAlarm: 1000 });
253+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 6, nextAlarmMs: 1000 });
256254
await queue.addToFailedState(createMessage("id"));
257255
expect(queue.routeInFailedState.size).toBe(0);
258256
});
@@ -264,12 +262,12 @@ describe("DurableObjectQueue", () => {
264262
queue.routeInFailedState.set("id", {
265263
msg: createMessage("id"),
266264
retryCount: 0,
267-
nextAlarm: Date.now() - 1000,
265+
nextAlarmMs: Date.now() - 1000,
268266
});
269267
queue.routeInFailedState.set("id2", {
270268
msg: createMessage("id2"),
271269
retryCount: 0,
272-
nextAlarm: Date.now() - 1000,
270+
nextAlarmMs: Date.now() - 1000,
273271
});
274272
await queue.alarm();
275273
expect(queue.routeInFailedState.size).toBe(0);
@@ -281,12 +279,12 @@ describe("DurableObjectQueue", () => {
281279
queue.routeInFailedState.set("id", {
282280
msg: createMessage("id"),
283281
retryCount: 0,
284-
nextAlarm: Date.now() + 1000,
282+
nextAlarmMs: Date.now() + 1000,
285283
});
286284
queue.routeInFailedState.set("id2", {
287285
msg: createMessage("id2"),
288286
retryCount: 0,
289-
nextAlarm: Date.now() + 500,
287+
nextAlarmMs: Date.now() + 500,
290288
});
291289
await queue.alarm();
292290
expect(queue.routeInFailedState.size).toBe(1);
@@ -299,12 +297,12 @@ describe("DurableObjectQueue", () => {
299297
queue.routeInFailedState.set("id", {
300298
msg: createMessage("id"),
301299
retryCount: 0,
302-
nextAlarm: Date.now() + 1000,
300+
nextAlarmMs: Date.now() + 1000,
303301
});
304302
queue.routeInFailedState.set("id2", {
305303
msg: createMessage("id2"),
306304
retryCount: 0,
307-
nextAlarm: Date.now() - 1000,
305+
nextAlarmMs: Date.now() - 1000,
308306
});
309307
await queue.alarm();
310308
expect(queue.routeInFailedState.size).toBe(0);

packages/cloudflare/src/api/durable-objects/queue.ts

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
2424
// TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage
2525
routeInFailedState = new Map<
2626
string,
27-
{ msg: ExtendedQueueMessage; retryCount: number; nextAlarm: number }
27+
{ msg: ExtendedQueueMessage; retryCount: number; nextAlarmMs: number }
2828
>();
2929

3030
service: NonNullable<CloudflareEnv["NEXT_CACHE_REVALIDATION_WORKER"]>;
@@ -34,10 +34,9 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
3434

3535
constructor(ctx: DurableObjectState, env: CloudflareEnv) {
3636
super(ctx, env);
37-
const service = env.NEXT_CACHE_REVALIDATION_WORKER;
37+
this.service = env.NEXT_CACHE_REVALIDATION_WORKER!;
3838
// If there is no service binding, we throw an error because we can't revalidate without it
39-
if (!service) throw new IgnorableError("No service binding for cache revalidation worker");
40-
this.service = service;
39+
if (!this.service) throw new IgnorableError("No service binding for cache revalidation worker");
4140
}
4241

4342
async revalidate(msg: ExtendedQueueMessage) {
@@ -115,18 +114,16 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
115114
}
116115

117116
override async alarm() {
117+
const currentDateTime = Date.now();
118118
// We fetch the first event that needs to be retried or if the date is expired
119119
const nextEventToRetry = Array.from(this.routeInFailedState.values())
120-
.filter((failing) => failing.nextAlarm > Date.now())
121-
.sort(({ nextAlarm: a }, { nextAlarm: b }) => a - b)[0];
120+
.filter(({ nextAlarmMs }) => nextAlarmMs > currentDateTime)
121+
.sort(({ nextAlarmMs: a }, { nextAlarmMs: b }) => a - b)[0];
122122
// We also have to check if there are expired events, if the revalidation takes too long, or if the
123123
const expiredEvents = Array.from(this.routeInFailedState.values()).filter(
124-
({ nextAlarm }) => nextAlarm <= Date.now()
124+
({ nextAlarmMs }) => nextAlarmMs <= currentDateTime
125125
);
126-
const allEventsToRetry =
127-
nextEventToRetry && nextEventToRetry.nextAlarm > Date.now()
128-
? [nextEventToRetry, ...expiredEvents]
129-
: expiredEvents;
126+
const allEventsToRetry = nextEventToRetry ? [nextEventToRetry, ...expiredEvents] : expiredEvents;
130127
for (const event of allEventsToRetry) {
131128
await this.executeRevalidation(event.msg);
132129
this.routeInFailedState.delete(event.msg.MessageDeduplicationId);
@@ -135,7 +132,6 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
135132

136133
async addToFailedState(msg: ExtendedQueueMessage) {
137134
const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId);
138-
let nextAlarm = Date.now() + 2_000;
139135

140136
if (existingFailedState) {
141137
if (existingFailedState.retryCount >= 6) {
@@ -146,17 +142,18 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
146142
this.routeInFailedState.delete(msg.MessageDeduplicationId);
147143
return;
148144
}
149-
nextAlarm = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000;
145+
const nextAlarm = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000;
150146
this.routeInFailedState.set(msg.MessageDeduplicationId, {
151147
...existingFailedState,
152148
retryCount: existingFailedState.retryCount + 1,
153-
nextAlarm,
149+
nextAlarmMs: nextAlarm,
154150
});
155151
} else {
152+
const nextAlarm = Date.now() + 2_000;
156153
this.routeInFailedState.set(msg.MessageDeduplicationId, {
157154
msg,
158155
retryCount: 1,
159-
nextAlarm,
156+
nextAlarmMs: nextAlarm,
160157
});
161158
}
162159
// We probably want to do something if routeInFailedState is becoming too big, at least log it
@@ -168,9 +165,8 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
168165
if (existingAlarm) return;
169166
if (this.routeInFailedState.size === 0) return;
170167

171-
const nextAlarmToSetup = Array.from(this.routeInFailedState.values()).reduce(
172-
(acc, { nextAlarm }) => Math.min(acc, nextAlarm),
173-
Infinity
168+
const nextAlarmToSetup = Math.min(
169+
...Array.from(this.routeInFailedState.values()).map(({ nextAlarmMs }) => nextAlarmMs)
174170
);
175171
await this.ctx.storage.setAlarm(nextAlarmToSetup);
176172
}

0 commit comments

Comments
 (0)