Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 0 additions & 195 deletions packages/sdk/src/utils/iterators.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import Emitter from 'events'
import pMemoize from 'p-memoize'
import { MaybeAsync } from '../types'

import { AggregatedError } from './AggregatedError'
import { pTimeout } from './promises'
import { Defer } from '@streamr/utils'

export interface ICancelable {
cancel(err?: Error): Promise<void>
isCancelled: () => boolean
Expand Down Expand Up @@ -133,195 +127,6 @@ export function iteratorFinally<T>(
)
}

async function endGenerator(gtr: AsyncGenerator, error?: Error) {
return error
? gtr.throw(error).catch(() => {}) // ignore err
: gtr.return(undefined)
}

function canCancel<T extends object>(gtr: MaybeCancelable<T>): gtr is Cancelable<T> {
return (
gtr
&& 'cancel' in gtr && typeof gtr.cancel === 'function'
&& typeof gtr.isCancelled === 'function'
&& !gtr.isCancelled()
)
}

async function cancelGenerator<T extends object>(gtr: MaybeCancelable<T>, error?: Error) {
if (!canCancel(gtr)) { return }
await gtr.cancel(error)
}

const endGeneratorTimeout = pMemoize(async (gtr, error, timeout = 250) => {
await pTimeout(endGenerator(gtr, error), {
timeout,
rejectOnTimeout: false,
})

if (canCancel(gtr)) {
await cancelGenerator(gtr, error)
}
}, {
cache: new WeakMap(),
cachePromiseRejection: true,
})

/**
* Creates a generator that can be cancelled and perform optional final cleanup.
* const [cancal, generator] = CancelableGenerator(iterable, onFinally)
*/
export function CancelableGenerator<T>(
iterable: MaybeCancelable<AsyncIterable<T> | AsyncGenerator<T>>,
onFinally: OnFinallyFn = () => {},
{ timeout = 250 } = {}
): Cancelable<AsyncGenerator<T, any, unknown>> {
let cancelled = false
let finalCalled = false
let error: Error | AggregatedError | undefined

const cancelSignal = new Emitter()
const onDone = new Defer<undefined>()

let iterator: AsyncIterator<T>

async function cancelIterable(err?: Error) {
// cancel inner if has cancel
await cancelGenerator(iterable, err)
await cancelGenerator(iterator, err)
}

function collectErrors(value?: Error | AggregatedError) {
if (!value || value === error) { return }

if (!error) {
error = value
return
}

error = 'extend' in error
? error.extend(value, value.message)
: new AggregatedError([value, error], value.message)
}

function resolveCancel(value?: Error) {
if (value instanceof Error) {
collectErrors(value)
}

if (error) {
cancelSignal.emit('cancel', error)
} else {
cancelSignal.emit('cancel', value)
}
}

const cancel = (gtr: MaybeCancelable<AsyncGenerator<T>>) => async (value: Error) => {
if (cancelled || finalCalled) {
// prevent recursion
return onDone
}

cancelled = true
resolveCancel(value)

// need to make sure we don't try return inside final otherwise we end up deadlocked
await endGeneratorTimeout(gtr, error, timeout)
return onDone
}

let pendingNext = 0

async function* CancelableGeneratorFn() {
// manually iterate
iterator = iterable[Symbol.asyncIterator]()

try {
yield* {
// each next() races against cancel signal
next: async () => {
pendingNext += 1
cancelSignal.setMaxListeners(pendingNext)
// NOTE:
// Very easy to create a memleak here.
// Using a shared promise with Promise.race
// between loop iterations prevents data from being GC'ed.
// Create new per-loop promise and resolve using an event emitter.
const cancelPromise = new Defer<{ value: undefined, done: true }>()
const onCancel = (v?: Error) => {
if (v instanceof Error) {
cancelPromise.reject(v)
} else {
cancelPromise.resolve({ value: undefined, done: true })
}
}

cancelSignal.once('cancel', onCancel)
return Promise.race([
iterator.next(),
cancelPromise,
]).finally(() => {
pendingNext -= 1
cancelSignal.setMaxListeners(pendingNext)
cancelSignal.off('cancel', onCancel)
})
},
async throw(err: Error): Promise<{ value: T, done: true }> {
cancelSignal.removeAllListeners()
await endGeneratorTimeout(iterator, err, timeout)
throw err
},
async return(v?: T) {
cancelSignal.removeAllListeners()
await endGeneratorTimeout(iterator, error, timeout)
return {
value: v,
done: true,
}
},
[Symbol.asyncIterator]() {
return this
},
}
} finally {
cancelSignal.removeAllListeners()
// try end iterator
if (iterator) {
await endGeneratorTimeout(iterator, error, timeout)
}
}
}
const c = CancelableGeneratorFn()
const cancelableGenerator = iteratorFinally(c, async (err) => {
finalCalled = true
try {
// cancel inner if has cancel
await cancelIterable(err ?? error)
await onFinally(err ?? error)
} finally {
onDone.resolve(undefined)
}

// error whole generator, for await of will reject.
if (error) {
throw error
}

return onDone
}) as AsyncGenerator<T>

const cancelFn = cancel(cancelableGenerator)

Object.assign(cancelableGenerator, {
cancel: cancelFn,
timeout,
isCancelled: () => cancelled,
isDone: () => finalCalled,
})

return cancelableGenerator as Cancelable<typeof cancelableGenerator>
}

export const nextValue = async <T>(source: AsyncIterator<T>): Promise<T | undefined> => {
const item = source.next()
return (await item).value
Expand Down
67 changes: 1 addition & 66 deletions packages/sdk/src/utils/promises.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pLimit from 'p-limit'
import pThrottle from 'p-throttle'
import { Defer, wait } from '@streamr/utils'
import { wait } from '@streamr/utils'
import { MaybeAsync } from '../types'

/**
Expand Down Expand Up @@ -112,71 +112,6 @@ export class TimeoutError extends Error {
}
}

/**
* Takes a promise and a timeout and an optional message for timeout errors.
* Returns a promise that rejects when timeout expires, or when promise settles, whichever comes first.
*
* Invoke with positional arguments for timeout & message:
* await pTimeout(promise, timeout, message)
*
* or using an options object for timeout, message & rejectOnTimeout:
*
* await pTimeout(promise, { timeout, message, rejectOnTimeout })
*
* message and rejectOnTimeout are optional.
*/

interface pTimeoutOpts {
timeout?: number
message?: string
rejectOnTimeout?: boolean
}

type pTimeoutArgs = [timeout?: number, message?: string] | [pTimeoutOpts]

export async function pTimeout<T>(promise: Promise<T>, ...args: pTimeoutArgs): Promise<T | undefined> {
let opts: pTimeoutOpts = {}
if (args[0] && typeof args[0] === 'object') {
[opts] = args
} else {
[opts.timeout, opts.message] = args
}

const { timeout = 0, message = '', rejectOnTimeout = true } = opts

if (typeof timeout !== 'number') {
throw new Error(`timeout must be a number, got ${timeout}`)
}

let timedOut = false
const p = new Defer<undefined>()
const t = setTimeout(() => {
timedOut = true
if (rejectOnTimeout) {
p.reject(new TimeoutError(message, timeout))
} else {
p.resolve(undefined)
}
}, timeout)
p.catch(() => {})

return Promise.race([
Promise.resolve(promise).catch((err) => {
clearTimeout(t)
if (timedOut) {
// ignore errors after timeout
return undefined
}

throw err
}),
p
]).finally(() => {
clearTimeout(t)
p.resolve(undefined)
})
}

// TODO use streamr-test-utils#waitForCondition instead (when streamr-test-utils is no longer a test-only dependency)
/**
* Wait until a condition is true
Expand Down
Loading