Skip to content

Commit 1ff4a62

Browse files
authored
chore: Add implementation for async task queue. (#841)
This PR adds an async task queue. This queue is roughly based on the flutter implementation, but with support for non-shedable tasks like iOS. https://github.com/launchdarkly/flutter-client-sdk/blob/01841d65d8eb9dbf7cdc1c29b76f2ac8a699fb05/packages/common/lib/src/async/async_single_queue.dart https://github.com/launchdarkly/ios-client-sdk/blob/082430ff008d76d503327c52b27e3b2af99c3ef2/LaunchDarkly/LaunchDarkly/ServiceObjects/SheddingQueue.swift This is a `chore`, because the type is unused and doesn't need a release. A subsequent PR will use it for functionality. This queue is going to be used for an implementation of the client-side identify spec. https://github.com/launchdarkly/sdk-specs/tree/main/specs/CSI-client-side-identify (Spec is private.) Available for review by anyone, I added Matthew for reference to the iOS SDK in case he was interested.
1 parent 4973bf7 commit 1ff4a62

File tree

2 files changed

+384
-0
lines changed

2 files changed

+384
-0
lines changed
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import { AsyncTaskQueue } from '../../src/async/AsyncTaskQueue';
2+
3+
it.each([true, false])(
4+
'executes the initial task it is given: sheddable: %s',
5+
async (sheddable) => {
6+
const queue = new AsyncTaskQueue<string>();
7+
const task = jest.fn().mockResolvedValue('test');
8+
const result = await queue.execute(task, sheddable);
9+
expect(queue.pendingCount()).toBe(0);
10+
expect(result).toEqual({
11+
status: 'complete',
12+
result: 'test',
13+
});
14+
expect(task).toHaveBeenCalled();
15+
},
16+
);
17+
18+
it.each([true, false])(
19+
'executes the next task in the queue when the previous task completes: sheddable: %s',
20+
async (sheddable) => {
21+
const queue = new AsyncTaskQueue<string>();
22+
const task1 = jest.fn().mockResolvedValue('test1');
23+
const task2 = jest.fn().mockResolvedValue('test2');
24+
const promise1 = queue.execute(task1, sheddable);
25+
const promise2 = queue.execute(task2, sheddable);
26+
// We have not awaited, so there has not been an opportunity to execute any tasks.
27+
expect(queue.pendingCount()).toBe(1);
28+
29+
const [result1, result2] = await Promise.all([promise1, promise2]);
30+
expect(result1).toEqual({
31+
status: 'complete',
32+
result: 'test1',
33+
});
34+
expect(result2).toEqual({
35+
status: 'complete',
36+
result: 'test2',
37+
});
38+
expect(task1).toHaveBeenCalled();
39+
expect(task2).toHaveBeenCalled();
40+
},
41+
);
42+
43+
it('can shed pending sheddable tasks', async () => {
44+
const queue = new AsyncTaskQueue<string>();
45+
const task1 = jest.fn().mockResolvedValue('test1');
46+
const task2 = jest.fn().mockResolvedValue('test2');
47+
const task3 = jest.fn().mockResolvedValue('test3');
48+
const promise1 = queue.execute(task1, true);
49+
const promise2 = queue.execute(task2, true);
50+
const promise3 = queue.execute(task3, true);
51+
52+
const [result1, result2, result3] = await Promise.all([promise1, promise2, promise3]);
53+
expect(result1).toEqual({
54+
status: 'complete',
55+
result: 'test1',
56+
});
57+
expect(result2).toEqual({
58+
status: 'shed',
59+
});
60+
expect(result3).toEqual({
61+
status: 'complete',
62+
result: 'test3',
63+
});
64+
expect(task1).toHaveBeenCalled();
65+
expect(task2).not.toHaveBeenCalled();
66+
expect(task3).toHaveBeenCalled();
67+
});
68+
69+
it('does not shed pending non-sheddable tasks', async () => {
70+
const queue = new AsyncTaskQueue<string>();
71+
const task1 = jest.fn().mockResolvedValue('test1');
72+
const task2 = jest.fn().mockResolvedValue('test2');
73+
const task3 = jest.fn().mockResolvedValue('test3');
74+
const promise1 = queue.execute(task1, false);
75+
const promise2 = queue.execute(task2, false);
76+
const promise3 = queue.execute(task3, false);
77+
78+
const [result1, result2, result3] = await Promise.all([promise1, promise2, promise3]);
79+
expect(result1).toEqual({
80+
status: 'complete',
81+
result: 'test1',
82+
});
83+
expect(result2).toEqual({
84+
status: 'complete',
85+
result: 'test2',
86+
});
87+
expect(result3).toEqual({
88+
status: 'complete',
89+
result: 'test3',
90+
});
91+
expect(task1).toHaveBeenCalled();
92+
expect(task2).toHaveBeenCalled();
93+
expect(task3).toHaveBeenCalled();
94+
});
95+
96+
it('can handle errors from tasks', async () => {
97+
const queue = new AsyncTaskQueue<string>();
98+
const task1 = jest.fn().mockRejectedValue(new Error('test'));
99+
const task2 = jest.fn().mockResolvedValue('test2');
100+
const promise1 = queue.execute(task1, true);
101+
const promise2 = queue.execute(task2, true);
102+
const [result1, result2] = await Promise.all([promise1, promise2]);
103+
expect(result1).toEqual({
104+
status: 'error',
105+
error: new Error('test'),
106+
});
107+
expect(result2).toEqual({
108+
status: 'complete',
109+
result: 'test2',
110+
});
111+
expect(task1).toHaveBeenCalled();
112+
expect(task2).toHaveBeenCalled();
113+
});
114+
115+
it('handles mix of sheddable and non-sheddable tasks correctly', async () => {
116+
const queue = new AsyncTaskQueue<string>();
117+
const task1 = jest.fn().mockResolvedValue('test1');
118+
const task2 = jest.fn().mockResolvedValue('test2');
119+
const task3 = jest.fn().mockResolvedValue('test3');
120+
const task4 = jest.fn().mockResolvedValue('test4');
121+
122+
// Add tasks in order: sheddable, non-sheddable, sheddable, non-sheddable
123+
const promise1 = queue.execute(task1, true);
124+
const promise2 = queue.execute(task2, false);
125+
const promise3 = queue.execute(task3, true);
126+
const promise4 = queue.execute(task4, false);
127+
128+
const [result1, result2, result3, result4] = await Promise.all([
129+
promise1,
130+
promise2,
131+
promise3,
132+
promise4,
133+
]);
134+
135+
// First task should complete
136+
expect(result1).toEqual({
137+
status: 'complete',
138+
result: 'test1',
139+
});
140+
141+
// Second task should complete (not sheddable)
142+
expect(result2).toEqual({
143+
status: 'complete',
144+
result: 'test2',
145+
});
146+
147+
// Third task should be shed
148+
expect(result3).toEqual({
149+
status: 'shed',
150+
});
151+
152+
// Fourth task should complete
153+
expect(result4).toEqual({
154+
status: 'complete',
155+
result: 'test4',
156+
});
157+
158+
expect(task1).toHaveBeenCalled();
159+
expect(task2).toHaveBeenCalled();
160+
expect(task3).not.toHaveBeenCalled();
161+
expect(task4).toHaveBeenCalled();
162+
});
163+
164+
it('executes tasks in order regardless of time to complete', async () => {
165+
const queue = new AsyncTaskQueue<string>();
166+
const timedPromise = (ms: number) =>
167+
new Promise((resolve) => {
168+
setTimeout(resolve, ms);
169+
});
170+
const callOrder: string[] = [];
171+
const task1 = jest.fn().mockImplementation(() => {
172+
callOrder.push('task1Start');
173+
return timedPromise(10).then(() => {
174+
callOrder.push('task1End');
175+
return 'test1';
176+
});
177+
});
178+
const task2 = jest.fn().mockImplementation(() => {
179+
callOrder.push('task2Start');
180+
return timedPromise(5).then(() => {
181+
callOrder.push('task2End');
182+
return 'test2';
183+
});
184+
});
185+
const task3 = jest.fn().mockImplementation(() => {
186+
callOrder.push('task3Start');
187+
return timedPromise(20).then(() => {
188+
callOrder.push('task3End');
189+
return 'test3';
190+
});
191+
});
192+
const promise1 = queue.execute(task1, false);
193+
const promise2 = queue.execute(task2, false);
194+
const promise3 = queue.execute(task3, false);
195+
196+
await Promise.all([promise1, promise2, promise3]);
197+
expect(callOrder).toEqual([
198+
'task1Start',
199+
'task1End',
200+
'task2Start',
201+
'task2End',
202+
'task3Start',
203+
'task3End',
204+
]);
205+
});
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import { LDLogger } from '@launchdarkly/js-sdk-common';
2+
3+
/**
4+
* Represents a task that has been shed from the queue.
5+
* This task will never be executed.
6+
*/
7+
export interface ShedTask {
8+
status: 'shed';
9+
}
10+
11+
/**
12+
* Represents a task that has been ran to completion.
13+
*/
14+
export interface CompletedTask<TTaskResult> {
15+
status: 'complete';
16+
result: TTaskResult;
17+
}
18+
19+
/**
20+
* Represents a task that has errored.
21+
*/
22+
export interface ErroredTask {
23+
status: 'error';
24+
error: Error;
25+
}
26+
27+
/**
28+
* Represents the result of a task.
29+
*/
30+
export type TaskResult<TTaskResult> = CompletedTask<TTaskResult> | ErroredTask | ShedTask;
31+
32+
/**
33+
* Represents a pending task. This encapsulates the async function that needs to be executed as well as a promise that represents its state.
34+
* The promise is not directly the promise associated with the async function, because we will not execute the async function until some point in the future, if at all.
35+
* */
36+
interface PendingTask<TTaskResult> {
37+
sheddable: boolean;
38+
execute: () => void;
39+
shed: () => void;
40+
promise: Promise<TaskResult<TTaskResult>>;
41+
}
42+
43+
const duplicateExecutionError = new Error(
44+
'Task has already been executed or shed. This is likely an implementation error. The task will not be executed again.',
45+
);
46+
47+
/**
48+
* Creates a pending task.
49+
* @param task The async function to execute.
50+
* @param sheddable Whether the task can be shed from the queue.
51+
* @returns A pending task.
52+
*/
53+
function makePending<TTaskResult>(
54+
task: () => Promise<TTaskResult>,
55+
_logger?: LDLogger,
56+
sheddable: boolean = false,
57+
): PendingTask<TTaskResult> {
58+
let res: (value: TaskResult<TTaskResult>) => void;
59+
60+
const promise = new Promise<TaskResult<TTaskResult>>((resolve) => {
61+
res = resolve;
62+
});
63+
64+
let executedOrShed = false;
65+
return {
66+
execute: () => {
67+
if (executedOrShed) {
68+
// This should never happen. If it does, then it represents an implementation error in the SDK.
69+
_logger?.error(duplicateExecutionError);
70+
}
71+
executedOrShed = true;
72+
task()
73+
.then((result) => res({ status: 'complete', result }))
74+
.catch((error) => res({ status: 'error', error }));
75+
},
76+
shed: () => {
77+
if (executedOrShed) {
78+
// This should never happen. If it does, then it represents an implementation error in the SDK.
79+
_logger?.error(duplicateExecutionError);
80+
}
81+
executedOrShed = true;
82+
res({ status: 'shed' });
83+
},
84+
promise,
85+
sheddable,
86+
};
87+
}
88+
89+
/**
90+
* An asynchronous task queue with the ability to replace pending tasks.
91+
*
92+
* This is useful when you have asynchronous operations which much execute in order, and for cases where intermediate
93+
* operations can be discarded.
94+
*
95+
* For instance, the SDK can only have one active context at a time, if you request identification of many contexts,
96+
* then the ultimate state will be based on the last request. The intermediate identifies can be discarded.
97+
*
98+
* This class will always begin execution of the first item added to the queue, at that point the item itself is not
99+
* queued, but active. If another request is made while that item is still active, then it is added to the queue.
100+
* A third request would then replace the second request if the second request had not yet become active, and it was
101+
* sheddable.
102+
*
103+
* Once a task is active the queue will complete it. It doesn't cancel tasks that it has started, but it can shed tasks
104+
* that have not started.
105+
*
106+
* TTaskResult Is the return type of the task to be executed. Tasks accept no parameters. So if you need parameters
107+
* you should use a lambda to capture them.
108+
*
109+
* Exceptions from tasks are always handled and the execute method will never reject a promise.
110+
*
111+
* Queue management should be done synchronously. There should not be asynchronous operations between checking the queue
112+
* and acting on the results of said check.
113+
*/
114+
export class AsyncTaskQueue<TTaskResult> {
115+
private _activeTask?: Promise<TaskResult<TTaskResult>>;
116+
private _queue: PendingTask<TTaskResult>[] = [];
117+
118+
constructor(private readonly _logger?: LDLogger) {}
119+
120+
/**
121+
* Execute a task using the queue.
122+
*
123+
* @param task The async function to execute.
124+
* @param sheddable Whether the task can be shed from the queue.
125+
* @returns A promise that resolves to the result of the task.
126+
*/
127+
execute(
128+
task: () => Promise<TTaskResult>,
129+
sheddable: boolean = false,
130+
): Promise<TaskResult<TTaskResult>> {
131+
const pending = makePending(task, this._logger, sheddable);
132+
133+
if (!this._activeTask) {
134+
this._activeTask = pending.promise.finally(() => {
135+
this._activeTask = undefined;
136+
this._checkPending();
137+
});
138+
pending.execute();
139+
} else {
140+
// If the last pending task is sheddable, we need to shed it before adding the new task.
141+
if (this._queue[this._queue.length - 1]?.sheddable) {
142+
this._queue.pop()?.shed();
143+
}
144+
this._queue.push(pending);
145+
}
146+
147+
return pending.promise;
148+
}
149+
150+
private _checkPending() {
151+
// There is an existing active task, so we don't need to do anything.
152+
if (this._activeTask) {
153+
return;
154+
}
155+
156+
// There are pending tasks, so we need to execute the next one.
157+
if (this._queue.length > 0) {
158+
const nextTask = this._queue.shift()!;
159+
160+
this._activeTask = nextTask.promise.finally(() => {
161+
this._activeTask = undefined;
162+
this._checkPending();
163+
});
164+
165+
nextTask.execute();
166+
}
167+
}
168+
169+
/**
170+
* Returns the number of pending tasks in the queue.
171+
* Intended for testing purposes only.
172+
*
173+
* @internal
174+
* @returns The number of pending tasks in the queue.
175+
*/
176+
public pendingCount(): number {
177+
return this._queue.length;
178+
}
179+
}

0 commit comments

Comments
 (0)