Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 8c96e46

Browse files
authored
Feature: Alarm Flush (#324)
* flush working + tests + jest * remove unused * update name * flushMiniflareDurableObjectAlarms returned nothing
1 parent ae2349c commit 8c96e46

File tree

4 files changed

+141
-18
lines changed

4 files changed

+141
-18
lines changed

package-lock.json

Lines changed: 16 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/durable-objects/src/plugin.ts

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,17 +214,40 @@ export class DurableObjectsPlugin
214214
const id = new DurableObjectId(objectName, hexId);
215215
const state = await this.getObject(storage, id);
216216
// execute the alarm
217-
await new RequestContext({
218-
requestDepth: 1,
219-
pipelineDepth: 1,
220-
durableObject: true,
221-
externalSubrequestLimit: usageModelExternalSubrequestLimit(
222-
this.ctx.usageModel
223-
),
224-
}).runWith(() => state[kAlarm]());
217+
await this.#executeAlarm(state);
225218
});
226219
}
227220

221+
async flushAlarms(
222+
storageFactory: StorageFactory,
223+
ids?: DurableObjectId[]
224+
): Promise<void> {
225+
if (ids !== undefined) {
226+
for (const id of ids) {
227+
const state = await this.getObject(storageFactory, id);
228+
await this.#executeAlarm(state);
229+
this.#alarmStore.deleteAlarm(`${id[kObjectName]}:${id.toString()}`);
230+
}
231+
} else {
232+
// otherwise flush all alarms
233+
for (const [key, state] of this.#objects) {
234+
await this.#executeAlarm(await state);
235+
this.#alarmStore.deleteAlarm(key);
236+
}
237+
}
238+
}
239+
240+
async #executeAlarm(state: DurableObjectState): Promise<void> {
241+
await new RequestContext({
242+
requestDepth: 1,
243+
pipelineDepth: 1,
244+
durableObject: true,
245+
externalSubrequestLimit: usageModelExternalSubrequestLimit(
246+
this.ctx.usageModel
247+
),
248+
}).runWith(() => state[kAlarm]());
249+
}
250+
228251
beforeReload(): void {
229252
// Clear instance map, this should cause old instances to be GCed
230253
this.#objects.clear();

packages/durable-objects/test/plugin.spec.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,3 +363,87 @@ test("DurableObjectsPlugin: set alarm and run list filters out alarm", async (t)
363363
const res1 = await ns1.get(ns1.newUniqueId()).fetch("/");
364364
t.is(await res1.text(), "{}");
365365
});
366+
367+
test("DurableObjectsPlugin: flush alarms", async (t) => {
368+
class TestObject implements DurableObject {
369+
constructor(private readonly state: DurableObjectState) {}
370+
371+
fetch = async () => {
372+
await this.state.storage.setAlarm(Date.now() + 60 * 1000);
373+
return new Response("ok");
374+
};
375+
alarm = async () => {
376+
await this.state.storage.put("a", 1);
377+
};
378+
}
379+
380+
const map = new Map<string, StoredValue>();
381+
const factory = new MemoryStorageFactory({
382+
[`test://map:TEST:${testId.toString()}`]: map,
383+
});
384+
const plugin = new DurableObjectsPlugin(ctx, {
385+
durableObjects: { TEST: "TestObject" },
386+
durableObjectsPersist: "test://map",
387+
durableObjectsAlarms: true,
388+
});
389+
await plugin.setup(factory);
390+
plugin.beforeReload();
391+
plugin.reload({}, { TestObject }, new Map());
392+
393+
const ns = plugin.getNamespace(factory, "TEST");
394+
const res = await ns.get(testId).fetch("/");
395+
t.is(await res.text(), "ok");
396+
// now flush
397+
await plugin.flushAlarms(factory);
398+
// check storage "a" is 1
399+
const state = await plugin.getObject(factory, testId);
400+
t.is(await state.storage.get("a"), 1);
401+
});
402+
403+
test("DurableObjectsPlugin: flush a specific alarm", async (t) => {
404+
class TestObject implements DurableObject {
405+
constructor(private readonly state: DurableObjectState) {}
406+
407+
fetch = async () => {
408+
await this.state.storage.setAlarm(Date.now() + 60 * 1000);
409+
return new Response("ok");
410+
};
411+
alarm = async () => {
412+
await this.state.storage.put("key", 1);
413+
};
414+
}
415+
416+
const mapA = new Map<string, StoredValue>();
417+
const mapB = new Map<string, StoredValue>();
418+
const factory = new MemoryStorageFactory({
419+
[`test://map:TEST:a`]: mapA,
420+
[`test://map:TEST:b`]: mapB,
421+
});
422+
const plugin = new DurableObjectsPlugin(ctx, {
423+
durableObjects: { TEST: "TestObject" },
424+
durableObjectsPersist: "test://map",
425+
durableObjectsAlarms: true,
426+
});
427+
await plugin.setup(factory);
428+
plugin.beforeReload();
429+
plugin.reload({}, { TestObject }, new Map());
430+
431+
const ns = plugin.getNamespace(factory, "TEST");
432+
const idA = ns.idFromName("a");
433+
const idB = ns.idFromName("b");
434+
435+
const resA = await ns.get(idA).fetch("/");
436+
t.is(await resA.text(), "ok");
437+
const resB = await ns.get(idB).fetch("/");
438+
t.is(await resB.text(), "ok");
439+
// now flush
440+
await plugin.flushAlarms(factory, [idA]);
441+
// check storage "a" is 1
442+
const stateA = await plugin.getObject(factory, idA);
443+
t.is(await stateA.storage.get("key"), 1);
444+
// check that storage "b" is still empty
445+
const stateB = await plugin.getObject(factory, idB);
446+
t.is(await stateB.storage.get("key"), undefined);
447+
// dispose
448+
plugin.dispose();
449+
});

packages/jest-environment-miniflare/src/index.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ declare global {
2929
function getMiniflareDurableObjectStorage(
3030
id: DurableObjectId
3131
): Promise<DurableObjectStorage>;
32+
function flushMiniflareDurableObjectAlarms(
33+
ids: DurableObjectId[]
34+
): Promise<void>;
3235
}
3336

3437
// MiniflareCore will ensure CorePlugin is first and BindingsPlugin is last,
@@ -223,6 +226,13 @@ export default class MiniflareEnvironment implements JestEnvironment<Timer> {
223226
const state = await plugin.getObject(storage, id);
224227
return state.storage;
225228
};
229+
global.flushMiniflareDurableObjectAlarms = async (
230+
ids?: DurableObjectId[]
231+
): Promise<void> => {
232+
const plugin = (await mf.getPlugins()).DurableObjectsPlugin;
233+
const storage = mf.getPluginStorage("DurableObjectsPlugin");
234+
return plugin.flushAlarms(storage, ids);
235+
};
226236
}
227237

228238
async teardown(): Promise<void> {

0 commit comments

Comments
 (0)