Skip to content

Commit 6b08836

Browse files
committed
Have stream helpers accept retry options
1 parent df2aa4f commit 6b08836

11 files changed

+488
-369
lines changed

registry/lib/arrays.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
declare const Arrays: {
2-
arrayChunk<T>(arr: T[] | readonly T[], size?: number): T[][]
2+
arrayChunk<T>(arr: T[] | readonly T[], size?: number | undefined): T[][]
33
arrayUnique<T>(arr: T[] | readonly T[]): T[]
44
joinAnd(arr: string[] | readonly string[]): string
55
joinOr(arr: string[] | readonly string[]): string

registry/lib/promises.d.ts

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,85 @@
1+
declare type OnRetry = (attempt: number, error: unknown, delay: number) => void
2+
declare type pIterationContext = {
3+
signal: AbortSignal
4+
}
5+
declare type pIterationOptions = {
6+
concurrency?: number | undefined
7+
retries?: pRetryOptions | undefined
8+
signal?: AbortSignal | undefined
9+
}
110
declare type pRetryOptions =
211
| number
312
| {
13+
args?: any[] | undefined
414
backoffFactor?: number | undefined
515
baseDelayMs?: number | undefined
616
jitter?: boolean | undefined
717
maxDelayMs?: number | undefined
8-
onRetry?: (
9-
attempt: number,
10-
error: unknown,
11-
delay: number
12-
) => void | undefined
18+
onRetry?: OnRetry | undefined
1319
onRetryCancelOnFalse?: boolean | undefined
1420
onRetryRethrow?: boolean | undefined
1521
retries?: number | undefined
1622
signal?: AbortSignal | undefined
1723
}
18-
declare type pOptions = {
19-
retries?: pRetryOptions | undefined
20-
signal?: AbortSignal | undefined
24+
declare type pNormalizedIterationOptions = {
25+
concurrency: number
26+
retries: pRetryOptions
27+
signal: AbortSignal
28+
}
29+
declare type pNormalizedRetryOptions = {
30+
args: any[]
31+
backoffFactor: number
32+
baseDelayMs: number
33+
concurrency: number
34+
jitter: boolean
35+
maxDelayMs: number
36+
onRetry?: OnRetry | undefined
37+
onRetryCancelOnFalse: boolean
38+
onRetryRethrow: boolean
39+
retries: number
40+
signal: AbortSignal
2141
}
2242
declare const Promises: {
43+
normalizeIterationOptions(
44+
options?: pIterationOptions | undefined
45+
): pNormalizedIterationOptions
46+
normalizeRetryOptions(
47+
options?: pRetryOptions | undefined
48+
): pNormalizedRetryOptions
2349
pEach<T>(
2450
array: T[],
25-
concurrency: number,
26-
callbackFn: (value: T, options: pOptions) => Promise<any>,
27-
options?: pOptions | undefined
51+
callbackFn: (value: T, context: pIterationContext) => Promise<any>,
52+
options?: pIterationOptions | undefined
2853
): Promise<void>
2954
pEachChunk<T>(
3055
chunks: T[][],
31-
callbackFn: (value: T, options: pOptions) => Promise<any>,
32-
options?: pOptions | undefined
56+
callbackFn: (value: T, context: pIterationContext) => Promise<any>,
57+
options?: pRetryOptions | undefined
3358
): Promise<void>
3459
pFilter<T>(
3560
array: T[],
36-
concurrency: number,
37-
callbackFn: (value: T, options: pOptions) => Promise<boolean>,
38-
options?: pOptions | undefined
61+
callbackFn: (value: T, context: pIterationContext) => Promise<boolean>,
62+
options?: pIterationOptions | undefined
3963
): Promise<T[]>
4064
pFilterChunk<T>(
4165
chunks: T[][],
42-
callbackFn: (value: T, options: pOptions) => Promise<boolean>,
43-
options?: pOptions | undefined
66+
callbackFn: (value: T, context: pIterationContext) => Promise<boolean>,
67+
options?: pRetryOptions | undefined
4468
): Promise<T[][]>
45-
pRetry<T, P extends (value: T, options: pOptions) => Promise<any>>(
69+
pRetry<T, P extends (value: T, context: pIterationContext) => Promise<any>>(
4670
callbackFn: P,
4771
options?: pRetryOptions | undefined
4872
): ReturnType<P>
73+
resolveRetryOptions(options?: pRetryOptions | undefined): pRetryOptions
4974
}
5075
declare namespace Promises {
51-
export { pOptions, pRetryOptions }
76+
export {
77+
OnRetry,
78+
pIterationContext,
79+
pIterationOptions,
80+
pNormalizedIterationOptions,
81+
pNormalizedRetryOptions,
82+
pRetryOptions
83+
}
5284
}
5385
export = Promises

registry/lib/promises.js

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,25 @@ function getTimers() {
1313
return _timers
1414
}
1515

16+
/*@__NO_SIDE_EFFECTS__*/
17+
function normalizeIterationOptions(options) {
18+
const {
19+
// The number of concurrent executions performed at one time.
20+
concurrency = 1,
21+
// Retries as a number or options object.
22+
retries,
23+
// AbortSignal used to support cancellation.
24+
signal = /*@__PURE__*/ require('./constants/abort-signal')
25+
} = { __proto__: null, ...options }
26+
const retryOpts = resolveRetryOptions(retries)
27+
return {
28+
__proto__: null,
29+
concurrency,
30+
retries: normalizeRetryOptions({ signal, ...retryOpts }),
31+
signal
32+
}
33+
}
34+
1635
/*@__NO_SIDE_EFFECTS__*/
1736
function normalizeRetryOptions(options) {
1837
const {
@@ -62,23 +81,31 @@ function resolveRetryOptions(options) {
6281
}
6382

6483
/*@__NO_SIDE_EFFECTS__*/
65-
async function pEach(array, concurrency, callbackFn, options) {
66-
await pEachChunk(arrayChunk(array, concurrency), callbackFn, options)
84+
async function pEach(array, callbackFn, options) {
85+
const iterOpts = normalizeIterationOptions(options)
86+
await pEachChunk(
87+
arrayChunk(array, iterOpts.concurrency),
88+
callbackFn,
89+
iterOpts.retries
90+
)
6791
}
6892

6993
/*@__NO_SIDE_EFFECTS__*/
70-
async function pFilter(array, concurrency, callbackFn, options) {
94+
async function pFilter(array, callbackFn, options) {
95+
const iterOpts = normalizeIterationOptions(options)
7196
return (
72-
await pFilterChunk(arrayChunk(array, concurrency), callbackFn, options)
97+
await pFilterChunk(
98+
arrayChunk(array, iterOpts.concurrency),
99+
callbackFn,
100+
iterOpts.retries
101+
)
73102
).flat()
74103
}
75104

76105
/*@__NO_SIDE_EFFECTS__*/
77106
async function pEachChunk(chunks, callbackFn, options) {
78-
const {
79-
retries,
80-
signal = /*@__PURE__*/ require('./constants/abort-signal')
81-
} = { __proto__: null, ...options }
107+
const retryOpts = normalizeRetryOptions(options)
108+
const { signal } = retryOpts
82109
for (const chunk of chunks) {
83110
if (signal?.aborted) {
84111
return
@@ -87,8 +114,7 @@ async function pEachChunk(chunks, callbackFn, options) {
87114
await Promise.all(
88115
chunk.map(value =>
89116
pRetry(callbackFn, {
90-
signal,
91-
...resolveRetryOptions(retries),
117+
...retryOpts,
92118
args: [value]
93119
})
94120
)
@@ -98,10 +124,8 @@ async function pEachChunk(chunks, callbackFn, options) {
98124

99125
/*@__NO_SIDE_EFFECTS__*/
100126
async function pFilterChunk(chunks, callbackFn, options) {
101-
const {
102-
retries,
103-
signal = /*@__PURE__*/ require('./constants/abort-signal')
104-
} = { __proto__: null, ...options }
127+
const retryOpts = normalizeRetryOptions(options)
128+
const { signal } = retryOpts
105129
const { length } = chunks
106130
const filteredChunks = Array(length)
107131
for (let i = 0; i < length; i += 1) {
@@ -114,8 +138,7 @@ async function pFilterChunk(chunks, callbackFn, options) {
114138
const predicateResults = await Promise.all(
115139
chunk.map(value =>
116140
pRetry(callbackFn, {
117-
signal,
118-
...resolveRetryOptions(retries),
141+
...retryOpts,
119142
args: [value]
120143
})
121144
)
@@ -197,9 +220,12 @@ async function pRetry(callbackFn, options) {
197220
}
198221

199222
module.exports = {
223+
normalizeIterationOptions,
224+
normalizeRetryOptions,
200225
pEach,
201226
pEachChunk,
202227
pFilter,
203228
pFilterChunk,
204-
pRetry
229+
pRetry,
230+
resolveRetryOptions
205231
}

registry/lib/streams.d.ts

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,29 @@
11
/// <reference types="node" />
2+
import { pIterationOptions } from './promises'
3+
4+
declare type sIterationOptions = pIterationOptions
25
declare type AnyIterable<T> =
36
| Iterable<T>
47
| AsyncIterable<T>
58
| NodeJS.ReadableStream
69
declare const Streams: {
710
parallelForEach<T>(
8-
concurrency: number,
11+
iterable: AnyIterable<T>,
912
func: (data: T) => Promise<void>,
10-
iterable: AnyIterable<T>
13+
options?: sIterationOptions | undefined
1114
): Promise<void>
1215
parallelMap<T, R>(
13-
concurrency: number
14-
): {
15-
(
16-
func: (data: T) => R | Promise<R>,
17-
iterable: AnyIterable<T>
18-
): AsyncIterableIterator<R>
19-
(
20-
func: (data: T) => R | Promise<R>
21-
): (iterable: AnyIterable<T>) => AsyncIterableIterator<R>
22-
}
23-
parallelMap<T, R>(
24-
concurrency: number,
25-
func: (data: T) => R | Promise<R>
26-
): (iterable: AnyIterable<T>) => AsyncIterableIterator<R>
27-
parallelMap<T, R>(
28-
concurrency: number,
16+
iterable: AnyIterable<T>,
2917
func: (data: T) => R | Promise<R>,
30-
iterable: AnyIterable<T>
18+
options?: sIterationOptions | undefined
3119
): AsyncIterableIterator<R>
32-
transform(concurrency: number): {
33-
<T, R>(
34-
func: (data: T) => R | Promise<R>,
35-
iterable: AnyIterable<T>
36-
): AsyncIterableIterator<R>
37-
<T, R>(
38-
func: (data: T) => R | Promise<R>
39-
): (iterable: AnyIterable<T>) => AsyncIterableIterator<R>
40-
}
41-
transform<T, R>(
42-
concurrency: number,
43-
func: (data: T) => R | Promise<R>
44-
): (iterable: AnyIterable<T>) => AsyncIterableIterator<R>
4520
transform<T, R>(
46-
concurrency: number,
21+
iterable: AnyIterable<T>,
4722
func: (data: T) => R | Promise<R>,
48-
iterable: AnyIterable<T>
23+
options?: sIterationOptions | undefined
4924
): AsyncIterableIterator<R>
5025
}
5126
declare namespace Streams {
52-
export { AnyIterable }
27+
export { AnyIterable, sIterationOptions }
5328
}
5429
export = Streams

registry/lib/streams.js

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
'use strict'
22

3-
const { apply: ReflectApply } = Reflect
3+
const {
4+
normalizeIterationOptions,
5+
pRetry
6+
} = /*@__PURE__*/ require('./promises')
47

58
let _streamingIterables
69
/*@__NO_SIDE_EFFECTS__*/
@@ -12,22 +15,40 @@ function getStreamingIterables() {
1215
}
1316

1417
/*@__NO_SIDE_EFFECTS__*/
15-
async function parallelForEach(concurrency, func, iterable) {
16-
for await (const _ of parallelMap(concurrency, func, iterable)) {
18+
async function parallelForEach(iterable, func, options) {
19+
for await (const _ of parallelMap(iterable, func, options)) {
1720
/* empty block */
1821
}
1922
}
2023

2124
/*@__NO_SIDE_EFFECTS__*/
22-
function parallelMap(...args) {
25+
function parallelMap(iterable, func, options) {
2326
const streamingIterables = getStreamingIterables()
24-
return ReflectApply(streamingIterables.parallelMap, undefined, args)
27+
const opts = normalizeIterationOptions(options)
28+
return streamingIterables.parallelMap(
29+
opts.concurrency,
30+
item =>
31+
pRetry(func, {
32+
...opts.retries,
33+
args: [item]
34+
}),
35+
iterable
36+
)
2537
}
2638

2739
/*@__NO_SIDE_EFFECTS__*/
28-
function transform(...args) {
40+
function transform(iterable, func, options) {
2941
const streamingIterables = getStreamingIterables()
30-
return ReflectApply(streamingIterables.transform, undefined, args)
42+
const opts = normalizeIterationOptions(options)
43+
return streamingIterables.transform(
44+
opts.concurrency,
45+
item =>
46+
pRetry(func, {
47+
...opts.retries,
48+
args: [item]
49+
}),
50+
iterable
51+
)
3152
}
3253

3354
module.exports = {

0 commit comments

Comments
 (0)