Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 5361d60

Browse files
committed
Execute DurableObjectTransaction ops in program order, closes #344
1 parent ca3aae4 commit 5361d60

File tree

2 files changed

+22
-8
lines changed

2 files changed

+22
-8
lines changed

packages/durable-objects/src/storage.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,8 @@ const kWriteSet = Symbol("kWriteSet");
272272
export const kAlarmExists = Symbol("kAlarmExists");
273273

274274
export class DurableObjectTransaction implements DurableObjectOperator {
275+
readonly #mutex = new ReadWriteMutex();
276+
275277
readonly [kInner]: ShadowStorage;
276278
readonly [kStartTxnCount]: number;
277279
[kRolledback] = false;
@@ -325,7 +327,7 @@ export class DurableObjectTransaction implements DurableObjectOperator {
325327
}
326328
this.#check("get");
327329
return runWithInputGateClosed(
328-
() => get(this[kInner], keys as any),
330+
() => this.#mutex.runWithRead(() => get(this[kInner], keys as any)),
329331
options?.allowConcurrency
330332
);
331333
}
@@ -363,7 +365,8 @@ export class DurableObjectTransaction implements DurableObjectOperator {
363365
if (!options && typeof keyEntries !== "string") options = valueOptions;
364366
return waitUntilOnOutputGate(
365367
runWithInputGateClosed(
366-
() => this.#put(keyEntries, valueOptions),
368+
() =>
369+
this.#mutex.runWithWrite(() => this.#put(keyEntries, valueOptions)),
367370
options?.allowConcurrency
368371
),
369372
options?.allowUnconfirmed
@@ -394,7 +397,7 @@ export class DurableObjectTransaction implements DurableObjectOperator {
394397
this.#check("delete");
395398
return waitUntilOnOutputGate(
396399
runWithInputGateClosed(
397-
() => this.#delete(keys),
400+
() => this.#mutex.runWithWrite(() => this.#delete(keys)),
398401
options?.allowConcurrency
399402
),
400403
options?.allowUnconfirmed
@@ -410,7 +413,7 @@ export class DurableObjectTransaction implements DurableObjectOperator {
410413
): Promise<Map<string, Value>> {
411414
this.#check("list");
412415
return runWithInputGateClosed(
413-
() => list(this[kInner], options),
416+
() => this.#mutex.runWithRead(() => list(this[kInner], options)),
414417
options.allowConcurrency
415418
);
416419
}
@@ -421,7 +424,7 @@ export class DurableObjectTransaction implements DurableObjectOperator {
421424
this.#check("getAlarm");
422425
if (!this[kAlarmExists]) return null;
423426
return runWithInputGateClosed(
424-
() => this[kInner].getAlarm(),
427+
() => this.#mutex.runWithRead(() => this[kInner].getAlarm()),
425428
options?.allowConcurrency
426429
);
427430
}
@@ -438,7 +441,8 @@ export class DurableObjectTransaction implements DurableObjectOperator {
438441
}
439442
return waitUntilOnOutputGate(
440443
runWithInputGateClosed(
441-
() => this[kInner].setAlarm(scheduledTime),
444+
() =>
445+
this.#mutex.runWithWrite(() => this[kInner].setAlarm(scheduledTime)),
442446
options?.allowConcurrency
443447
),
444448
options?.allowUnconfirmed
@@ -449,7 +453,7 @@ export class DurableObjectTransaction implements DurableObjectOperator {
449453
this.#check("deleteAlarm");
450454
return waitUntilOnOutputGate(
451455
runWithInputGateClosed(
452-
() => this[kInner].deleteAlarm(),
456+
() => this.#mutex.runWithWrite(() => this[kInner].deleteAlarm()),
453457
options?.allowConcurrency
454458
),
455459
options?.allowUnconfirmed
@@ -825,7 +829,7 @@ export class DurableObjectStorage implements DurableObjectOperator {
825829
}
826830
return runWithGatesClosed(async () => {
827831
await this.#mutex.runWithWrite(async () => {
828-
this.#shadow.setAlarm(scheduledTime);
832+
await this.#shadow.setAlarm(scheduledTime);
829833
// "Commit" write
830834
this.#txnRecordWriteSet(new Set([ALARM_KEY]));
831835
});

packages/durable-objects/test/storage.spec.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,16 @@ test("transaction: waits for un-awaited writes before committing", async (t) =>
13041304
});
13051305
t.is(await storage.get("key"), "value");
13061306
});
1307+
test("transaction: performs operations in program order", async (t) => {
1308+
// https://github.com/cloudflare/miniflare/issues/344
1309+
const { storage } = t.context;
1310+
await storage.transaction((txn) => {
1311+
void txn.delete("key");
1312+
void txn.put("key", "value");
1313+
return Promise.resolve();
1314+
});
1315+
t.is(await storage.get("key"), "value"); // not `undefined`
1316+
});
13071317

13081318
test("hides implementation details", (t) => {
13091319
const { storage } = t.context;

0 commit comments

Comments
 (0)