Skip to content

Commit 554afe3

Browse files
committed
feat(minor): add abort signal support for locks
1 parent 481cfa0 commit 554afe3

File tree

2 files changed

+249
-8
lines changed

2 files changed

+249
-8
lines changed

src/withLock.ts

Lines changed: 99 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,48 @@ const locks = new Map<any, Map<string, Promise<any>>>();
33
/**
44
* Only allow one instance of the callback to run at a time for a given `scope` and `key`.
55
*/
6-
export async function withLock<ReturnType>(scope: any, key: string, callback: () => Promise<ReturnType>): Promise<ReturnType> {
6+
export async function withLock<ReturnType>(scope: any, key: string, callback: () => Promise<ReturnType>): Promise<ReturnType>;
7+
export async function withLock<ReturnType>(
8+
scope: any, key: string, acquireLockSignal: AbortSignal | undefined, callback: () => Promise<ReturnType>
9+
): Promise<ReturnType>;
10+
export async function withLock<ReturnType>(
11+
scope: any,
12+
key: string,
13+
acquireLockSignalOrCallback: AbortSignal | undefined | (() => Promise<ReturnType>),
14+
callback?: () => Promise<ReturnType>
15+
): Promise<ReturnType> {
16+
let acquireLockSignal: AbortSignal | undefined = undefined;
17+
18+
if (acquireLockSignalOrCallback instanceof AbortSignal)
19+
acquireLockSignal = acquireLockSignalOrCallback;
20+
else if (acquireLockSignalOrCallback != null)
21+
callback = acquireLockSignalOrCallback;
22+
23+
if (callback == null)
24+
throw new Error("callback is required");
25+
726
while (locks.get(scope)?.has(key)) {
27+
if (acquireLockSignal?.aborted)
28+
throw acquireLockSignal.reason;
29+
830
try {
9-
await locks.get(scope)?.get(key);
31+
if (acquireLockSignal != null) {
32+
const acquireLockPromise = createAbortSignalAbortPromise(acquireLockSignal);
33+
34+
await Promise.race([
35+
acquireLockPromise.promise,
36+
locks.get(scope)?.get(key)
37+
]);
38+
39+
acquireLockPromise.dispose();
40+
} else
41+
await locks.get(scope)?.get(key);
1042
} catch (err) {
1143
// we only need to wait here for the promise to resolve, we don't care about the result
1244
}
45+
46+
if (acquireLockSignal?.aborted)
47+
throw acquireLockSignal.reason;
1348
}
1449

1550
const promise = callback();
@@ -39,17 +74,20 @@ export function isLockActive(scope: any, key: string): boolean {
3974
/**
4075
* Acquire a lock for a given `scope` and `key`.
4176
*/
42-
export async function acquireLock<S = any, K extends string = string>(scope: S, key: K): Promise<Lock<S, K>> {
77+
export async function acquireLock<S = any, K extends string = string>(
78+
scope: S, key: K, acquireLockSignal?: AbortSignal
79+
): Promise<Lock<S, K>> {
4380
let releaseLock: (param: null) => void;
4481

45-
await new Promise((accept) => {
46-
void withLock(scope, key, async () => {
82+
await new Promise((accept, reject) => {
83+
void withLock(scope, key, acquireLockSignal, async () => {
4784
accept(null);
4885

4986
await new Promise((accept) => {
5087
releaseLock = accept;
5188
});
52-
});
89+
})
90+
.catch(reject);
5391
});
5492

5593
return {
@@ -67,20 +105,39 @@ export async function acquireLock<S = any, K extends string = string>(scope: S,
67105
/**
68106
* Wait for a lock to be released for a given `scope` and `key`.
69107
*/
70-
export async function waitForLockRelease(scope: any, key: string): Promise<void> {
108+
export async function waitForLockRelease(scope: any, key: string, signal?: AbortSignal): Promise<void> {
71109
// eslint-disable-next-line no-constant-condition
72110
while (true) {
111+
if (signal?.aborted)
112+
throw signal.reason;
113+
73114
try {
74-
await locks.get(scope)?.get(key);
115+
if (signal != null) {
116+
const signalPromise = createAbortSignalAbortPromise(signal);
117+
118+
await Promise.race([
119+
signalPromise.promise,
120+
locks.get(scope)?.get(key)
121+
]);
122+
123+
signalPromise.dispose();
124+
} else
125+
await locks.get(scope)?.get(key);
75126
} catch (err) {
76127
// we only need to wait here for the promise to resolve, we don't care about the result
77128
}
78129

130+
if (signal?.aborted)
131+
throw signal.reason;
132+
79133
if (locks.get(scope)?.has(key))
80134
continue;
81135

82136
await Promise.resolve(); // wait for a microtask to run, so other pending locks can be registered
83137

138+
if (signal?.aborted)
139+
throw signal.reason;
140+
84141
if (locks.get(scope)?.has(key))
85142
continue;
86143

@@ -94,3 +151,37 @@ export type Lock<S = any, K extends string = string> = {
94151
dispose(): void,
95152
[Symbol.dispose](): void
96153
};
154+
155+
function createControlledPromise<T = any>() {
156+
let resolve: (value: T | Promise<T>) => void;
157+
let reject: (reason?: any) => void;
158+
159+
const promise = new Promise<T>((accept, fail) => {
160+
resolve = accept;
161+
reject = fail;
162+
});
163+
164+
return {
165+
promise,
166+
resolve: resolve!,
167+
reject: reject!
168+
};
169+
}
170+
171+
function createAbortSignalAbortPromise(signal: AbortSignal) {
172+
const acquireLockPromise = createControlledPromise<void>();
173+
174+
const onAbort = () => {
175+
acquireLockPromise.resolve();
176+
signal.removeEventListener("abort", onAbort);
177+
};
178+
signal.addEventListener("abort", onAbort);
179+
180+
return {
181+
promise: acquireLockPromise.promise,
182+
dispose() {
183+
signal.removeEventListener("abort", onAbort);
184+
}
185+
};
186+
}
187+

test/withLock.test.ts

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,90 @@ describe("withLock", () => {
5050
expect(isLockActive(scope1, key1)).toBe(false);
5151
});
5252

53+
test("lock works with acquireLockSignal", async () => {
54+
const scope1 = {};
55+
const key1 = "key";
56+
57+
const waitForEnoughMicrotasks = async () => {
58+
for (let i = 0; i < 10; i++)
59+
await Promise.resolve();
60+
};
61+
62+
let proceedLock1: ((value: "something") => void) | null = null;
63+
let lock1Done = false;
64+
let lock1Error: any = undefined;
65+
const lock1Controller = new AbortController();
66+
const lockPromise1 = withLock(scope1, key1, lock1Controller.signal, async () => {
67+
const res = await new Promise((accept) => {
68+
proceedLock1 = accept;
69+
});
70+
lock1Done = true;
71+
return res;
72+
})
73+
.catch((error) => {
74+
lock1Error = error;
75+
});
76+
77+
let proceedLock2: ((value: "something2") => void) | null = null;
78+
let lock2Done = false;
79+
let lock2Error: any = undefined;
80+
const lock2Controller = new AbortController();
81+
withLock(scope1, key1, lock2Controller.signal, async () => {
82+
const res = await new Promise((accept) => {
83+
proceedLock2 = accept;
84+
});
85+
lock2Done = true;
86+
return res;
87+
})
88+
.catch((error) => {
89+
lock2Error = error;
90+
});
91+
92+
expect(proceedLock1).not.toBeNull();
93+
expect(proceedLock2).toBeNull();
94+
expect(lock1Error).toBeUndefined();
95+
expect(lock2Error).toBeUndefined();
96+
expect(lock1Done).toBe(false);
97+
expect(lock2Done).toBe(false);
98+
expect(isLockActive(scope1, key1)).toBe(true);
99+
100+
lock1Controller.abort(new TestError());
101+
102+
expect(proceedLock1).not.toBeNull();
103+
expect(proceedLock2).toBeNull();
104+
expect(lock1Error).toBeUndefined();
105+
expect(lock2Error).toBeUndefined();
106+
expect(lock1Done).toBe(false);
107+
expect(lock2Done).toBe(false);
108+
expect(isLockActive(scope1, key1)).toBe(true);
109+
110+
lock2Controller.abort(new TestError());
111+
112+
expect(proceedLock1).not.toBeNull();
113+
expect(proceedLock2).toBeNull();
114+
expect(lock1Error).toBeUndefined();
115+
expect(lock2Error).toBeUndefined();
116+
expect(lock1Done).toBe(false);
117+
expect(lock2Done).toBe(false);
118+
expect(isLockActive(scope1, key1)).toBe(true);
119+
120+
await waitForEnoughMicrotasks();
121+
122+
expect(proceedLock1).not.toBeNull();
123+
expect(proceedLock2).toBeNull();
124+
expect(lock1Error).toBeUndefined();
125+
expect(lock2Error).to.be.instanceof(TestError);
126+
expect(lock1Done).toBe(false);
127+
expect(lock2Done).toBe(false);
128+
expect(isLockActive(scope1, key1)).toBe(true);
129+
130+
proceedLock1!("something");
131+
await expect(lockPromise1).resolves.toBe("something");
132+
expect(isLockActive(scope1, key1)).toBe(false);
133+
expect(proceedLock2).toBeNull();
134+
expect(lock2Done).toBe(false);
135+
});
136+
53137
test("acquireLock", async () => {
54138
const scope1 = {};
55139
const key1 = "key";
@@ -82,6 +166,32 @@ describe("withLock", () => {
82166
expect(isLockActive(scope1, key1)).toBe(false);
83167
});
84168

169+
test("acquireLock with acquireLockSignal", async () => {
170+
const scope1 = {};
171+
const key1 = "key";
172+
173+
const lock1Controller = new AbortController();
174+
const lock1Promise = acquireLock(scope1, key1, lock1Controller.signal);
175+
expect(isLockActive(scope1, key1)).toBe(true);
176+
177+
lock1Controller.abort(new TestError());
178+
expect(isLockActive(scope1, key1)).toBe(true);
179+
180+
const lock1 = await lock1Promise;
181+
expect(isLockActive(scope1, key1)).toBe(true);
182+
183+
const lock2Controller = new AbortController();
184+
const lock2Promise = acquireLock(scope1, key1, lock2Controller.signal);
185+
186+
lock2Controller.abort(new TestError());
187+
await expect(lock2Promise).rejects.toBeInstanceOf(TestError);
188+
189+
lock1.dispose();
190+
await new Promise((accept) => setTimeout(accept, 0));
191+
192+
expect(isLockActive(scope1, key1)).toBe(false);
193+
});
194+
85195
test("call order", async () => {
86196
const scope1 = {};
87197
const key1 = "key";
@@ -155,4 +265,44 @@ describe("withLock", () => {
155265
await waitForEnoughMicrotasks();
156266
expect(lockReleased).toBe(true);
157267
});
268+
269+
test("waitForLockRelease with signal", async () => {
270+
const scope1 = {};
271+
const key1 = "key";
272+
273+
const lock1 = await acquireLock(scope1, key1);
274+
275+
const lockWithError2 = withLock(scope1, key1, async () => {
276+
throw new Error("some error");
277+
});
278+
279+
const lockReleasedSignal = new AbortController();
280+
281+
let lockReleased = false;
282+
const lockReleasePromise = (async () => {
283+
await waitForLockRelease(scope1, key1, lockReleasedSignal.signal);
284+
lockReleased = true;
285+
})();
286+
287+
expect(lockReleased).toBe(false);
288+
lockReleasedSignal.abort(new TestError());
289+
290+
expect(lockReleased).toBe(false);
291+
await expect(lockReleasePromise).rejects.toBeInstanceOf(TestError);
292+
expect(lockReleased).toBe(false);
293+
294+
lock1.dispose();
295+
296+
try {
297+
await lockWithError2;
298+
expect.unreachable("lockWithError2 should throw");
299+
} catch (err) {
300+
// do nothing
301+
}
302+
});
158303
});
304+
305+
306+
class TestError extends Error {
307+
308+
}

0 commit comments

Comments
 (0)