Skip to content

Commit a99837f

Browse files
committed
feat: add clone function
1 parent 897c524 commit a99837f

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed
File renamed without changes.

src/index.ts

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ class Queue<T> {
131131
*/
132132
mapParallel = async (
133133
callback: (v: T) => Promise<unknown>,
134-
n: number = Queue._batchCount
134+
n: number = Queue._batchCount,
135135
) => {
136136
this._preparePipe();
137137
let proms: Promise<unknown>[] = [];
@@ -140,7 +140,7 @@ class Queue<T> {
140140
while (true) {
141141
if (proms.length === n) {
142142
const {index} = await Promise.race(
143-
proms.map(async (p, index) => ({v: await p, index}))
143+
proms.map(async (p, index) => ({v: await p, index})),
144144
);
145145
proms = proms.filter((_, i) => i !== index);
146146
}
@@ -165,7 +165,7 @@ class Queue<T> {
165165
const r = callback(v);
166166
if (r !== undefined) outQueue.push(r);
167167
};
168-
this.map(c).then(outQueue.end);
168+
void this.map(c).then(outQueue.end);
169169
return outQueue;
170170
};
171171

@@ -177,15 +177,15 @@ class Queue<T> {
177177
*/
178178
upipe = <U>(
179179
callback: (v: T) => Promise<U | undefined>,
180-
n: number = Queue._batchCount
180+
n: number = Queue._batchCount,
181181
) => {
182182
const outQueue = new Queue<U>();
183183

184184
const c = async (v: T) => {
185185
const r = await callback(v);
186186
if (r !== undefined) outQueue.push(r);
187187
};
188-
this[n === Infinity ? 'map' : 'mapParallel'](c, n).then(outQueue.end);
188+
void this[n === Infinity ? 'map' : 'mapParallel'](c, n).then(outQueue.end);
189189
return outQueue;
190190
};
191191

@@ -195,7 +195,7 @@ class Queue<T> {
195195
* @returns A tuple containing the two new queues.
196196
*/
197197
split = <U, V = U>(
198-
callback: (v: T) => [U, 0] | [V, 1]
198+
callback: (v: T) => [U, 0] | [V, 1],
199199
): [Queue<U>, Queue<V>] => {
200200
const q1 = new Queue<U>();
201201
const q2 = new Queue<V>();
@@ -207,7 +207,7 @@ class Queue<T> {
207207
else if (index === 1) q2.push(value);
208208
else throw new Error('Invalid index');
209209
};
210-
this.map(c).then(() => {
210+
void this.map(c).then(() => {
211211
q1.end();
212212
q2.end();
213213
});
@@ -223,7 +223,7 @@ class Queue<T> {
223223
const outQueue = new Queue<T[]>();
224224
let buffer: T[] = [];
225225

226-
this.map(v => {
226+
void this.map(v => {
227227
buffer.push(v);
228228

229229
if (buffer.length === n) {
@@ -243,7 +243,7 @@ class Queue<T> {
243243
*/
244244
flat = () => {
245245
const outQueue = new Queue<T extends Array<infer U> ? U : never>();
246-
this.map(v => {
246+
void this.map(v => {
247247
if (v instanceof Array) outQueue.push(...v);
248248
else throw new Error('Value is not an array');
249249
}).then(outQueue.end);
@@ -258,7 +258,7 @@ class Queue<T> {
258258
*/
259259
usplit = <U, V = U>(
260260
callback: (v: T) => Promise<[U, 0] | [V, 1] | undefined>,
261-
n: number = Queue._batchCount
261+
n: number = Queue._batchCount,
262262
): [Queue<U>, Queue<V>] => {
263263
const q1 = new Queue<U>();
264264
const q2 = new Queue<V>();
@@ -272,7 +272,7 @@ class Queue<T> {
272272
else if (index === 1) q2.push(value);
273273
else throw new Error('Invalid index');
274274
};
275-
this[n === Infinity ? 'map' : 'mapParallel'](c, n).then(() => {
275+
void this[n === Infinity ? 'map' : 'mapParallel'](c, n).then(() => {
276276
q1.end();
277277
q2.end();
278278
});
@@ -286,10 +286,27 @@ class Queue<T> {
286286
*/
287287
umerge = (q: Queue<T>) => {
288288
const outQueue = new Queue<T>();
289-
Promise.all([this, q].map(q => q.map(outQueue.push))).then(outQueue.end);
289+
void Promise.all([this, q].map(q => q.map(outQueue.push))).then(
290+
outQueue.end,
291+
);
290292
return outQueue;
291293
};
292294

295+
/**
296+
* Creates multiple clones of the queue.
297+
* @param count The number of clone queues to create (default: 1).
298+
* @returns An array of cloned queues.
299+
*/
300+
clone = (count = 1) => {
301+
if (count < 1) throw new Error('Count must be at least 1');
302+
const queues = Array.from({length: count}, () => new Queue<T>());
303+
304+
void this.map(v => queues.map(q => q.push(v))).then(() =>
305+
queues.map(q => q.end()),
306+
);
307+
return queues;
308+
};
309+
293310
/**
294311
* Collects all the values in the queue into an array.
295312
* @returns A promise that resolves to an array containing all the values in the queue.

0 commit comments

Comments
 (0)