diff --git a/packages/sdk/src/contracts/StreamRegistry.ts b/packages/sdk/src/contracts/StreamRegistry.ts index 94a1678533..f4c40cfd10 100644 --- a/packages/sdk/src/contracts/StreamRegistry.ts +++ b/packages/sdk/src/contracts/StreamRegistry.ts @@ -11,7 +11,8 @@ import { isEthereumAddressUserId, toEthereumAddress, toStreamID, - toUserId + toUserId, + until } from '@streamr/utils' import { ContractTransactionResponse } from 'ethers' import { intersection } from 'lodash' @@ -22,6 +23,7 @@ import { RpcProviderSource } from '../RpcProviderSource' import { Stream } from '../Stream' import { StreamIDBuilder } from '../StreamIDBuilder' import { StreamMetadata, parseMetadata } from '../StreamMetadata' +import { StreamrClient } from '../StreamrClient' import { StreamrClientError } from '../StreamrClientError' import type { StreamRegistryV5 as StreamRegistryContract } from '../ethereumArtifacts/StreamRegistryV5' import StreamRegistryArtifact from '../ethereumArtifacts/StreamRegistryV5Abi.json' @@ -40,15 +42,13 @@ import { isPublicPermissionQuery, streamPermissionToSolidityType } from '../permission' +import { CachingMap } from '../utils/CachingMap' import { filter, map } from '../utils/GeneratorUtils' import { LoggerFactory } from '../utils/LoggerFactory' -import { CachingMap } from '../utils/CachingMap' -import { until } from '../utils/promises' import { ChainEventPoller } from './ChainEventPoller' import { ContractFactory } from './ContractFactory' import { ObservableContract, initContractEventGateway, waitForTx } from './contract' import { InternalSearchStreamsPermissionFilter, SearchStreamsOrderBy, searchStreams as _searchStreams } from './searchStreams' -import { StreamrClient } from '../StreamrClient' /* * On-chain registry of stream metadata and permissions. diff --git a/packages/sdk/src/utils/promises.ts b/packages/sdk/src/utils/promises.ts index 6a4a925150..12c62205a0 100644 --- a/packages/sdk/src/utils/promises.ts +++ b/packages/sdk/src/utils/promises.ts @@ -1,7 +1,5 @@ import pLimit from 'p-limit' import pThrottle from 'p-throttle' -import { wait } from '@streamr/utils' -import { MaybeAsync } from '../types' /** * Returns a function that executes with limited concurrency. @@ -103,63 +101,6 @@ export function pOnce( }) } -export class TimeoutError extends Error { - public timeout: number - - constructor(msg = '', timeout = 0) { - super(`The operation timed out. ${timeout}ms. ${msg}`) - this.timeout = timeout - } -} - -// TODO use streamr-test-utils#until instead (when streamr-test-utils is no longer a test-only dependency) -/** - * Wait until a condition is true - * @param condition - wait until this callback function returns true - * @param timeOutMs - stop waiting after that many milliseconds, -1 for disable - * @param pollingIntervalMs - check condition between so many milliseconds - * @param failedMsgFn - append the string return value of this getter function to the error message, if given - * @return the (last) truthy value returned by the condition function - */ -export async function until( - condition: MaybeAsync<() => boolean>, - timeOutMs = 10000, - pollingIntervalMs = 100, - failedMsgFn?: () => string -): Promise { - // condition could as well return any instead of boolean, could be convenient - // sometimes if waiting until a value is returned. Maybe change if such use - // case emerges. - const err = new Error(`Timeout after ${timeOutMs} milliseconds`) - let isTimedOut = false - let t!: ReturnType - if (timeOutMs > 0) { - t = setTimeout(() => { isTimedOut = true }, timeOutMs) - } - - try { - // Promise wrapped condition function works for normal functions just the same as Promises - let wasDone = false - while (!wasDone && !isTimedOut) { - wasDone = await Promise.resolve().then(condition) - if (!wasDone && !isTimedOut) { - await wait(pollingIntervalMs) - } - } - - if (isTimedOut) { - if (failedMsgFn) { - err.message += ` ${failedMsgFn()}` - } - throw err - } - - return wasDone - } finally { - clearTimeout(t) - } -} - // TODO better type annotations export const withThrottling = (fn: (...args: any[]) => Promise, maxInvocationsPerSecond: number): ((...args: any[]) => Promise) => { const throttler = pThrottle({ diff --git a/packages/sdk/test/end-to-end/StorageNodeRegistry.test.ts b/packages/sdk/test/end-to-end/StorageNodeRegistry.test.ts index 4b070a4327..b0d9f08fd9 100644 --- a/packages/sdk/test/end-to-end/StorageNodeRegistry.test.ts +++ b/packages/sdk/test/end-to-end/StorageNodeRegistry.test.ts @@ -1,11 +1,12 @@ import 'reflect-metadata' + import { Wallet } from 'ethers' import { fetchPrivateKeyWithGas, randomEthereumAddress } from '@streamr/test-utils' import { DOCKER_DEV_STORAGE_NODE } from '../../src/ConfigTest' import { Stream } from '../../src/Stream' import { StreamrClient } from '../../src/StreamrClient' -import { until } from '../../src/utils/promises' import { createTestStream, createTestClient } from '../test-utils/utils' +import { until } from '@streamr/utils' const TEST_TIMEOUT = 30 * 1000 diff --git a/packages/sdk/test/end-to-end/StreamRegistry.test.ts b/packages/sdk/test/end-to-end/StreamRegistry.test.ts index 1671867fd1..1a1142edde 100644 --- a/packages/sdk/test/end-to-end/StreamRegistry.test.ts +++ b/packages/sdk/test/end-to-end/StreamRegistry.test.ts @@ -6,7 +6,6 @@ import { Wallet } from 'ethers' import { CONFIG_TEST } from '../../src/ConfigTest' import { Stream } from '../../src/Stream' import { StreamrClient } from '../../src/StreamrClient' -import { until as until_ } from '../../src/utils/promises' import { createRelativeTestStreamId, createTestStream } from '../test-utils/utils' const TIMEOUT = 20000 @@ -242,7 +241,7 @@ describe('StreamRegistry', () => { await createdStream.setMetadata({ description }) - await until_(async () => { + await until(async () => { try { return (await client.getStream(createdStream.id)).getMetadata().description === createdStream.getMetadata().description } catch { @@ -262,7 +261,7 @@ describe('StreamRegistry', () => { const props = { id: createRelativeTestStreamId(module) } const stream = await client.createStream(props) await client.deleteStream(stream.id) - await until_(async () => { + await until(async () => { try { await client.getStream(stream.id) return false diff --git a/packages/sdk/test/integration/GroupKeyPersistence.test.ts b/packages/sdk/test/integration/GroupKeyPersistence.test.ts index 80b3c07627..18adcddad6 100644 --- a/packages/sdk/test/integration/GroupKeyPersistence.test.ts +++ b/packages/sdk/test/integration/GroupKeyPersistence.test.ts @@ -1,14 +1,13 @@ import 'reflect-metadata' import { fastPrivateKey } from '@streamr/test-utils' -import { collect, toStreamPartID } from '@streamr/utils' +import { collect, toStreamPartID, until } from '@streamr/utils' import { Message } from '../../src/Message' import { Stream } from '../../src/Stream' import { StreamrClient } from '../../src/StreamrClient' import { GroupKey } from '../../src/encryption/GroupKey' import { StreamPermission } from '../../src/permission' import { StreamMessageType } from '../../src/protocol/StreamMessage' -import { until } from '../../src/utils/promises' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' import { FakeStorageNode } from '../test-utils/fake/FakeStorageNode' import { getPublishTestStreamMessages } from '../test-utils/publish' diff --git a/packages/sdk/test/unit/promises.test.ts b/packages/sdk/test/unit/promises.test.ts index 9901427e5c..a4b1de6a78 100644 --- a/packages/sdk/test/unit/promises.test.ts +++ b/packages/sdk/test/unit/promises.test.ts @@ -1,60 +1,8 @@ import { wait } from '@streamr/utils' -import { pLimitFn, pOnce, pOne, until } from '../../src/utils/promises' +import { pLimitFn, pOnce, pOne } from '../../src/utils/promises' const WAIT_TIME = 25 -describe('until', () => { - it('works with sync true', async () => { - const condition = jest.fn(() => true) - await until(condition) - expect(condition).toHaveBeenCalledTimes(1) - }) - - it('works with async true', async () => { - const condition = jest.fn(async () => true) - await until(condition) - expect(condition).toHaveBeenCalledTimes(1) - }) - - it('works with sync false -> true', async () => { - let calls = 0 - const condition = jest.fn(() => { - calls += 1 - return calls > 1 - }) - await until(condition) - expect(condition).toHaveBeenCalledTimes(2) - }) - - it('works with sync false -> true', async () => { - let calls = 0 - const condition = jest.fn(async () => { - calls += 1 - return calls > 1 - }) - await until(condition) - expect(condition).toHaveBeenCalledTimes(2) - }) - - it('can time out', async () => { - const condition = jest.fn(() => false) - await expect(async () => { - await until(condition, 100) - }).rejects.toThrow('Timeout') - expect(condition).toHaveBeenCalled() - }) - - it('can set interval', async () => { - const condition = jest.fn(() => false) - await expect(async () => { - await until(condition, 100, 20) - }).rejects.toThrow('Timeout') - expect(condition.mock.calls.length).toBeLessThan(7) - // ideally it should be 5. - expect(condition.mock.calls.length).toBeGreaterThan(4) - }) -}) - describe('pLimitFn', () => { it('adopts type of wrapped function', async () => { // actually checking via ts-expect-error