-
Notifications
You must be signed in to change notification settings - Fork 43
Expand file tree
/
Copy pathpromises.ts
More file actions
129 lines (114 loc) · 3.74 KB
/
promises.ts
File metadata and controls
129 lines (114 loc) · 3.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import pLimit from 'p-limit'
import pThrottle from 'p-throttle'
/**
* Returns a function that executes with limited concurrency.
*/
export function pLimitFn<ArgsType extends unknown[], ReturnType>(
fn: (...args: ArgsType) => ReturnType | Promise<ReturnType>,
limit = 1
): ((...args: ArgsType) => Promise<ReturnType>) & { clear(): void } {
const queue = pLimit(limit)
return Object.assign((...args: ArgsType) => queue(() => fn(...args)), {
clear() {
queue.clearQueue()
}
})
}
/**
* Only allows one outstanding call.
* Returns same promise while task is executing.
*/
export function pOne<ArgsType extends unknown[], ReturnType>(
fn: (...args: ArgsType) => ReturnType | Promise<ReturnType>,
): ((...args: ArgsType) => Promise<ReturnType>) {
const once = pOnce(fn)
return async (...args: ArgsType): Promise<ReturnType> => {
try {
return await once(...args)
} finally {
once.reset()
}
}
}
/**
* Only allows calling `fn` once.
* Returns same promise while task is executing.
*/
export function pOnce<ArgsType extends unknown[], ReturnType>(
fn: (...args: ArgsType) => ReturnType | Promise<ReturnType>
): ((...args: ArgsType) => Promise<ReturnType>) & { reset(): void, isStarted(): boolean } {
type CallStatus = PromiseSettledResult<ReturnType> | { status: 'init' } | { status: 'pending', promise: Promise<ReturnType> }
let currentCall: CallStatus = { status: 'init' }
return Object.assign(async function pOnceWrap(...args: ArgsType): Promise<ReturnType> { // eslint-disable-line prefer-arrow-callback
// capture currentCall so can assign to it, even after reset
const thisCall = currentCall
if (thisCall.status === 'pending') {
return thisCall.promise
}
if (thisCall.status === 'fulfilled') {
return thisCall.value
}
if (thisCall.status === 'rejected') {
throw thisCall.reason
}
// status === 'init'
currentCall = thisCall
const promise = (async () => {
// capture value/error
try {
const value = await fn(...args)
Object.assign(thisCall, {
promise: undefined, // release promise
status: 'fulfilled',
value,
})
return value
} catch (reason) {
Object.assign(thisCall, {
promise: undefined, // release promise
status: 'rejected',
reason,
})
throw reason
}
})()
promise.catch(() => {}) // prevent unhandled
Object.assign(thisCall, {
status: 'pending',
promise,
})
return promise
}, {
isStarted() {
return currentCall.status !== 'init'
},
reset() {
currentCall = { status: 'init' }
}
})
}
// TODO better type annotations
export const withThrottling = (fn: (...args: any[]) => Promise<any>, maxInvocationsPerSecond: number): ((...args: any[]) => Promise<any>) => {
const throttler = pThrottle({
limit: maxInvocationsPerSecond,
interval: 1000
})
return throttler(fn)
}
export const tryInSequence = async <T>(fns: ((...args: any[]) => Promise<T>)[]): Promise<T | never> => {
if (fns.length === 0) {
throw new Error('no tasks')
}
let firstError: any
for (const fn of fns) {
try {
const promise = fn()
return await promise
} catch (e: any) {
if (firstError === undefined) {
firstError = e
}
}
}
throw firstError
}