Skip to content

Commit cb08b88

Browse files
authored
refactor(utils): Simple raceForEvent() (#3128)
Implemented a simple `raceForEvent()` utility function which uses `waitForEvent()` internally. It replaces the previous `raceEvents()` function which had equal functionality but different implementation for historical reasons. ## Related changes Removed unnecessary `executeSafePromise()` wrapper in `Router#doRouteMessage()`. That code snippet doesn't do async waiting, so there is no need to have the wrapper.
1 parent b528add commit cb08b88

File tree

6 files changed

+55
-102
lines changed

6 files changed

+55
-102
lines changed

packages/dht/src/connection/connectivityChecker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Logger, raceEvents } from '@streamr/utils'
1+
import { Logger, raceForEvent } from '@streamr/utils'
22
import { v4 } from 'uuid'
33
import * as Err from '../helpers/errors'
44
import {
@@ -19,7 +19,7 @@ export const connectAsync = async ({ url, allowSelfSignedCertificate, timeoutMs
1919
const socket = new WebsocketClientConnection()
2020
let result: { winnerName: 'connected' | 'error' }
2121
try {
22-
const resultPromise = raceEvents(socket, ['connected', 'error'], timeoutMs)
22+
const resultPromise = raceForEvent(socket, ['connected', 'error'], timeoutMs)
2323
socket.connect(url, allowSelfSignedCertificate)
2424
result = await resultPromise
2525
} catch {

packages/dht/src/dht/routing/Router.ts

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Message, PeerDescriptor, RouteMessageAck, RouteMessageError, RouteMessageWrapper } from '../../../generated/packages/dht/protos/DhtRpc'
22
import { RoutingMode, RoutingRemoteContact, RoutingSession } from './RoutingSession'
3-
import { Logger, executeSafePromise, raceEvents, withTimeout } from '@streamr/utils'
3+
import { Logger, raceForEvent } from '@streamr/utils'
44
import { RoutingRpcCommunicator } from '../../transport/RoutingRpcCommunicator'
55
import { DuplicateDetector } from './DuplicateDetector'
66
import { v4 } from 'uuid'
@@ -122,22 +122,18 @@ export class Router {
122122
const contacts = session.updateAndGetRoutablePeers()
123123
if (contacts.length > 0) {
124124
this.addRoutingSession(session)
125-
logger.trace('starting to raceEvents from routingSession: ' + session.sessionId)
126-
let eventReceived: Promise<unknown>
127-
executeSafePromise(async () => {
128-
eventReceived = raceEvents(
129-
session,
130-
['routingSucceeded', 'partialSuccess', 'routingFailed', 'stopped'],
131-
null
132-
)
133-
})
125+
logger.trace('starting to raceForEvent from routingSession: ' + session.sessionId)
126+
const eventReceived = raceForEvent(
127+
session,
128+
['routingSucceeded', 'partialSuccess', 'routingFailed', 'stopped'],
129+
10000 // TODO use options option or named constant?
130+
)
134131
setImmediate(async () => {
135132
try {
136-
// TODO use options option or named constant?
137-
await withTimeout(eventReceived, 10000)
138-
logger.trace('raceEvents ended from routingSession: ' + session.sessionId)
133+
await eventReceived
134+
logger.trace('raceForEvent ended from routingSession: ' + session.sessionId)
139135
} catch {
140-
logger.trace('raceEvents timed out for routingSession ' + session.sessionId)
136+
logger.trace('raceForEvent timed out for routingSession ' + session.sessionId)
141137
}
142138
session.stop()
143139
this.removeRoutingSession(session.sessionId)

packages/utils/src/exports.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ export { toEthereumAddressOrENSName } from './toEthereumAddressOrENSName'
2525
export type { Events, BrandedString } from './types'
2626
export { wait } from './wait'
2727
export { waitForEvent } from './waitForEvent'
28+
export { raceForEvent } from './raceForEvent'
2829
export { TimeoutError, withTimeout } from './withTimeout'
2930
export { composeAbortSignals, type ComposedAbortSignal } from './composeAbortSignals'
3031
export { until } from './until'
31-
export { raceEvents } from './raceEvents'
3232
export { withRateLimit } from './withRateLimit'
3333
export { ObservableEventEmitter } from './ObservableEventEmitter'
3434
export { initEventGateway } from './initEventGateway'

packages/utils/src/raceEvents.ts

Lines changed: 0 additions & 79 deletions
This file was deleted.

packages/utils/src/raceForEvent.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { waitForEvent } from './waitForEvent'
2+
3+
export async function raceForEvent<TEvents extends Record<string, (...args: any[]) => void>, TEventName extends keyof TEvents>(
4+
emitter: {
5+
on: (eventName: TEventName, listener: TEvents[TEventName]) => unknown
6+
off: (eventName: TEventName, listener: TEvents[TEventName]) => unknown
7+
},
8+
eventNames: TEventName[],
9+
timeout: number = 5000
10+
): Promise<{ winnerName: TEventName, winnerArgs: Parameters<TEvents[TEventName]> }> {
11+
const raceAbortController = new AbortController()
12+
const promises = eventNames.map(async (eventName) => {
13+
const eventArgs = await waitForEvent(emitter, eventName, timeout, () => true, raceAbortController.signal)
14+
return {
15+
winnerName: eventName,
16+
winnerArgs: eventArgs
17+
}
18+
})
19+
let result
20+
try {
21+
result = await Promise.race(promises)
22+
} finally {
23+
// Call raceAbortController.abort() to remove the event listeners. Note that this not strictly needed when Promise.race(promises) rejects.
24+
// The race can reject only if withTimeout() timeouts, and as all timeouts happen at the same time, the event listeners are cleaned
25+
// up immediately. In that sense this could be moved out from the finally block. But it makes sense to keep it here so that the function
26+
// can be seen as an atomic operation: all cleanup happens _before_ the function returns, not immediately _after_ it returns.
27+
// The Promise.allSettled() call implements that atomicity by waiting the cleanups initiated by raceAbortController.abort() to complete.
28+
raceAbortController.abort()
29+
await Promise.allSettled(promises)
30+
}
31+
return result
32+
}
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { EventEmitter } from 'eventemitter3'
2-
import { raceEvents } from '../src/raceEvents'
2+
import { raceForEvent } from '../src/raceForEvent'
33

44
interface Events {
55
a: (foo: number, bar: number) => void
66
b: (foo: string) => void
77
c: () => void
88
}
99

10-
describe('raceEvents', () => {
10+
describe('raceForEvent', () => {
1111

1212
let eventEmitter: EventEmitter<Events>
1313

@@ -16,22 +16,22 @@ describe('raceEvents', () => {
1616
})
1717

1818
it('should resolve with the first event that occurs', async () => {
19-
const promise = raceEvents(eventEmitter, ['a', 'b'])
19+
const promise = raceForEvent(eventEmitter, ['a', 'b'])
2020
setTimeout(() => eventEmitter.emit('a', 123, 456), 100)
2121
setTimeout(() => eventEmitter.emit('b', 'foo'), 10)
2222
setTimeout(() => eventEmitter.emit('c'), 50)
2323
expect(await promise).toEqual({ winnerName: 'b', winnerArgs: ['foo'] })
2424
})
2525

2626
it('should resolve with the correct event and arguments', async () => {
27-
const promise = raceEvents(eventEmitter, ['a', 'b'])
27+
const promise = raceForEvent(eventEmitter, ['a', 'b'])
2828
setTimeout(() => eventEmitter.emit('a', 1, 2), 5)
2929
expect(await promise).toEqual({ winnerName: 'a', winnerArgs: [1, 2] })
3030
})
3131

3232
it('should clean up listeners after resolving', async () => {
3333
const offFn = jest.spyOn(eventEmitter, 'off')
34-
const promise = raceEvents(eventEmitter, ['a', 'b'])
34+
const promise = raceForEvent(eventEmitter, ['a', 'b'])
3535
eventEmitter.emit('b', 'payload')
3636
await promise
3737
expect(offFn).toHaveBeenCalledTimes(2)
@@ -40,7 +40,11 @@ describe('raceEvents', () => {
4040
})
4141

4242
it('should not resolve if no event is emitted', async () => {
43-
const promise = raceEvents(eventEmitter, ['a', 'b'], 50)
43+
const offFn = jest.spyOn(eventEmitter, 'off')
44+
const promise = raceForEvent(eventEmitter, ['a', 'b'], 50)
4445
await expect(promise).rejects.toThrow('timed out')
46+
expect(offFn).toHaveBeenCalledTimes(2)
47+
expect(offFn).toHaveBeenCalledWith('a', expect.any(Function))
48+
expect(offFn).toHaveBeenCalledWith('b', expect.any(Function))
4549
})
4650
})

0 commit comments

Comments
 (0)