Skip to content

Commit 8b2f211

Browse files
committed
feat: made poll and Status.waitFor cancellable
1 parent de86b0e commit 8b2f211

File tree

4 files changed

+107
-25
lines changed

4 files changed

+107
-25
lines changed

src/status/Status.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ import type {
66
StatusDead,
77
} from './types';
88
import type { FileSystem, FileHandle } from '../types';
9+
import type { PromiseCancellable } from '@matrixai/async-cancellable';
10+
import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts';
911
import Logger from '@matrixai/logger';
1012
import lock from 'fd-lock';
1113
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
14+
import { context, timedCancellable } from '@matrixai/contexts/dist/decorators';
1215
import * as statusUtils from './utils';
1316
import * as statusErrors from './errors';
1417
import * as statusEvents from './events';
@@ -299,9 +302,14 @@ class Status {
299302
}
300303
}
301304

305+
public waitFor(
306+
status: StatusInfo['status'],
307+
ctx?: Partial<ContextTimedInput>,
308+
): PromiseCancellable<StatusInfo>;
309+
@timedCancellable(true)
302310
public async waitFor(
303311
status: StatusInfo['status'],
304-
timeout?: number,
312+
@context ctx: ContextTimed,
305313
): Promise<StatusInfo> {
306314
let statusInfo;
307315
try {
@@ -323,7 +331,7 @@ class Status {
323331
return false;
324332
},
325333
50,
326-
timeout,
334+
ctx,
327335
);
328336
} catch (e) {
329337
if (e instanceof errors.ErrorUtilsPollTimeout) {

src/utils/utils.ts

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import type {
55
PromiseDeconstructed,
66
Callback,
77
} from '../types';
8+
import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts';
89
import os from 'os';
910
import process from 'process';
1011
import path from 'path';
1112
import nodesEvents from 'events';
1213
import lexi from 'lexicographic-integer';
1314
import { PromiseCancellable } from '@matrixai/async-cancellable';
15+
import { timedCancellable } from '@matrixai/contexts/dist/functions';
1416
import * as utilsErrors from './errors';
1517

1618
const AsyncFunction = (async () => {}).constructor;
@@ -93,6 +95,22 @@ async function sleep(ms: number): Promise<void> {
9395
return await new Promise<void>((r) => setTimeout(r, ms));
9496
}
9597

98+
function sleepCancellable(ms: number): PromiseCancellable<void> {
99+
return new PromiseCancellable<void>((resolve, reject, signal) => {
100+
if (signal.aborted) return reject(signal.reason);
101+
const handleTimeout = () => {
102+
signal.removeEventListener('abort', handleAbort);
103+
resolve();
104+
};
105+
const handleAbort = () => {
106+
clearTimeout(timer);
107+
reject(signal.reason);
108+
};
109+
signal.addEventListener('abort', handleAbort, { once: true });
110+
const timer = setTimeout(handleTimeout, ms);
111+
});
112+
}
113+
96114
/**
97115
* Checks if value is an object.
98116
* Arrays are also considered objects.
@@ -149,23 +167,31 @@ function getUnixtime(date: Date = new Date()) {
149167
return Math.round(date.getTime() / 1000);
150168
}
151169

170+
const sleepCancelReasonSymbol = Symbol('sleepCancelReasonSymbol');
171+
152172
/**
153173
* Poll execution and use condition to accept or reject the results
154174
*/
155-
async function poll<T, E = any>(
175+
async function poll_<T, E = any>(
176+
ctx: ContextTimed,
156177
f: () => T | PromiseLike<T>,
157178
condition: {
158179
(e: E, result?: undefined): boolean;
159180
(e: null, result: T): boolean;
160181
},
161-
interval = 1000,
162-
timeout?: number,
182+
interval: number,
163183
): Promise<T> {
164-
const timer = timeout != null ? timerStart(timeout) : undefined;
184+
let result: T;
185+
const { p: abortP, resolveP: resolveAbortP } = promise();
186+
const handleAbortP = () => resolveAbortP();
187+
if (ctx.signal.aborted) {
188+
resolveAbortP();
189+
} else {
190+
ctx.signal.addEventListener('abort', handleAbortP, { once: true });
191+
}
165192
try {
166-
let result: T;
167193
while (true) {
168-
if (timer?.timedOut) {
194+
if (ctx.signal.aborted) {
169195
throw new utilsErrors.ErrorUtilsPollTimeout();
170196
}
171197
try {
@@ -178,13 +204,46 @@ async function poll<T, E = any>(
178204
throw e;
179205
}
180206
}
181-
await sleep(interval);
207+
const sleepP = sleepCancellable(interval);
208+
await Promise.race([sleepP, abortP])
209+
.finally(async () => {
210+
// Clean up
211+
sleepP.cancel(sleepCancelReasonSymbol);
212+
await sleepP;
213+
})
214+
.catch((e) => {
215+
if (e !== sleepCancelReasonSymbol) throw e;
216+
});
182217
}
183218
} finally {
184-
if (timer != null) timerStop(timer);
219+
resolveAbortP();
220+
await abortP;
221+
ctx.signal.removeEventListener('abort', handleAbortP);
185222
}
186223
}
187224

225+
const pollCancellable = timedCancellable(
226+
poll_,
227+
true,
228+
undefined,
229+
utilsErrors.ErrorUtilsPollTimeout,
230+
);
231+
232+
/**
233+
* Poll execution and use condition to accept or reject the results
234+
*/
235+
function poll<T, E = any>(
236+
f: () => T | PromiseLike<T>,
237+
condition: {
238+
(e: E, result?: undefined): boolean;
239+
(e: null, result: T): boolean;
240+
},
241+
interval = 1000,
242+
ctx?: Partial<ContextTimedInput>,
243+
): PromiseCancellable<T> {
244+
return pollCancellable(ctx, f, condition, interval);
245+
}
246+
188247
/**
189248
* Convert callback-style to promise-style
190249
* If this is applied to overloaded function
@@ -492,6 +551,7 @@ export {
492551
dirEmpty,
493552
pathIncludes,
494553
sleep,
554+
sleepCancellable,
495555
isObject,
496556
isEmptyObject,
497557
filterEmptyObject,

tests/status/Status.test.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ describe('Status', () => {
294294
// In this case, it is possible that upon reacting to `LIVE` status
295295
// When it reads the status, it has already changed to `STOPPING`
296296
// Which means the `statusWaitFor` never resolves
297-
const statusWaitFor = status.waitFor('LIVE', 1000);
297+
const statusWaitFor = status.waitFor('LIVE', { timer: 1000 });
298298
const p1 = status.finishStart({
299299
clientHost: '',
300300
clientPort: 0,
@@ -320,4 +320,19 @@ describe('Status', () => {
320320
).toBe(true);
321321
await status.stop({});
322322
});
323+
test('wait for is cancellable', async () => {
324+
const status = new Status({
325+
statusPath: path.join(dataDir, config.paths.statusBase),
326+
statusLockPath: path.join(dataDir, config.paths.statusLockBase),
327+
fs: fs,
328+
logger: logger,
329+
});
330+
await status.start({ pid: 0 });
331+
// Is in `STARTED` state, so `DEAD` state will never be reached
332+
const statusWaitForP = status.waitFor('DEAD');
333+
statusWaitForP.cancel('reason');
334+
await expect(statusWaitForP).rejects.toThrow(
335+
statusErrors.ErrorStatusTimeout,
336+
);
337+
});
323338
});

tests/tasks/TaskManager.test.ts

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -261,15 +261,15 @@ describe(TaskManager.name, () => {
261261
}
262262

263263
let completed = false;
264-
const waitForcompletionProm = (async () => {
264+
const waitForCompletionProm = (async () => {
265265
await Promise.all(pendingTasks);
266266
completed = true;
267267
})();
268268

269269
// Check for active tasks while tasks are still running
270270
while (!completed) {
271271
expect(taskManager.activeCount).toBeLessThanOrEqual(activeLimit);
272-
await Promise.race([utils.sleep(100), waitForcompletionProm]);
272+
await Promise.race([utils.sleep(100), waitForCompletionProm]);
273273
}
274274

275275
await taskManager.stop();
@@ -555,7 +555,7 @@ describe(TaskManager.name, () => {
555555
const handler = jest.fn();
556556
const { p: pauseP, resolveP: resolvePauseP } = utils.promise();
557557
handler.mockImplementation(async (ctx: ContextTimed) => {
558-
const abortP = new Promise((resolve, reject) =>
558+
const abortP = new Promise((_, reject) =>
559559
ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)),
560560
);
561561
await Promise.race([pauseP, abortP]);
@@ -589,7 +589,7 @@ describe(TaskManager.name, () => {
589589
return status![0] === 'active' && status![1] === 'active';
590590
},
591591
10,
592-
1000,
592+
{ timer: 1000 },
593593
);
594594
// Cancellation should reject promise
595595
const taskPromise = task1.promise();
@@ -657,7 +657,7 @@ describe(TaskManager.name, () => {
657657
const handler = jest.fn();
658658
const { p: pauseP, resolveP: resolvePauseP } = utils.promise();
659659
handler.mockImplementation(async (ctx: ContextTimed) => {
660-
const abortP = new Promise((resolve, reject) =>
660+
const abortP = new Promise((_, reject) =>
661661
ctx.signal.addEventListener('abort', () =>
662662
reject(
663663
new tasksErrors.ErrorTaskRetry(undefined, {
@@ -696,7 +696,7 @@ describe(TaskManager.name, () => {
696696
return status![0] === 'active' && status![1] === 'active';
697697
},
698698
10,
699-
1000,
699+
{ timer: 1000 },
700700
);
701701
await taskManager.stop();
702702
await taskManager.start({ lazy: true });
@@ -710,7 +710,7 @@ describe(TaskManager.name, () => {
710710
const handlerId1 = 'handler1' as TaskHandlerId;
711711
const handler1 = jest.fn();
712712
handler1.mockImplementation(async (ctx: ContextTimed) => {
713-
const abortP = new Promise((resolve, reject) =>
713+
const abortP = new Promise((_, reject) =>
714714
ctx.signal.addEventListener('abort', () =>
715715
reject(
716716
new tasksErrors.ErrorTaskRetry(undefined, {
@@ -724,7 +724,7 @@ describe(TaskManager.name, () => {
724724
const handlerId2 = 'handler2' as TaskHandlerId;
725725
const handler2 = jest.fn();
726726
handler2.mockImplementation(async (ctx: ContextTimed) => {
727-
const abortP = new Promise((resolve, reject) =>
727+
const abortP = new Promise((_, reject) =>
728728
ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)),
729729
);
730730
await Promise.race([pauseP, abortP]);
@@ -758,7 +758,7 @@ describe(TaskManager.name, () => {
758758
return status![0] === 'active' && status![1] === 'active';
759759
},
760760
10,
761-
1000,
761+
{ timer: 1000 },
762762
);
763763
await taskManager.stop();
764764
// Tasks were run
@@ -973,7 +973,7 @@ describe(TaskManager.name, () => {
973973
return status === 'queued';
974974
},
975975
10,
976-
1000,
976+
{ timer: 1000 },
977977
);
978978
await expect(
979979
taskManager.updateTask(task1.id, {
@@ -1124,14 +1124,13 @@ describe(TaskManager.name, () => {
11241124
const tasks = await Promise.all(
11251125
taskIds.map((id) => taskManager.getTask(id)),
11261126
);
1127-
const statuses = tasks.map((task) => task!.status);
1128-
return statuses;
1127+
return tasks.map((task) => task!.status);
11291128
},
11301129
(_: any, statuses?: Array<TaskStatus>) => {
11311130
return statuses!.every((status) => status === 'queued');
11321131
},
11331132
10,
1134-
1000,
1133+
{ timer: 1000 },
11351134
);
11361135
// @ts-ignore: Then queueing, which makes the tasks active
11371136
await taskManager.startQueueing();
@@ -1144,7 +1143,7 @@ describe(TaskManager.name, () => {
11441143
const handler = jest.fn();
11451144
const { p: pauseP, resolveP: resolvePauseP } = utils.promise();
11461145
handler.mockImplementation(async (ctx: ContextTimed) => {
1147-
const abortP = new Promise((resolve, reject) =>
1146+
const abortP = new Promise((_, reject) =>
11481147
ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)),
11491148
);
11501149
await Promise.race([pauseP, abortP]);

0 commit comments

Comments
 (0)