diff --git a/CHANGELOG.md b/CHANGELOG.md index da864e9fcf..08107c57e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,11 @@ Changes before Tatum release are not documented in this file. - it is supported for `PUBLISH` and `SUBSCRIBE` permissions - new `StreamrClient#getUserId()` method - Method `StreamrClient#getDiagnosticInfo()` provides diagnostic info about network (https://github.com/streamr-dev/network/pull/2740, https://github.com/streamr-dev/network/pull/2741) -- Add accessors for stream metadata fields: (https://github.com/streamr-dev/network/pull/2825, https://github.com/streamr-dev/network/pull/2845) +- Add accessors for stream metadata fields: (https://github.com/streamr-dev/network/pull/2825, https://github.com/streamr-dev/network/pull/2845, https://github.com/streamr-dev/network/pull/2883) - `Stream#getPartitionCount()` - `Stream#getDescription()` and `Stream#setDescription()` - `Stream#getStorageDayCount()` and `Stream#setStorageDayCount()` +- Add method `StreamrClient#getStreamMetadata()` (https://github.com/streamr-dev/network/pull/2883) - Add validation for public permissions (https://github.com/streamr-dev/network/pull/2819) - Add `opts` parameter to `StreamrClient#addStreamToStorageNode` (https://github.com/streamr-dev/network/pull/2858) - controls how long to wait for storage node to pick up on assignment @@ -40,6 +41,7 @@ Changes before Tatum release are not documented in this file. - **BREAKING CHANGE:** Replace methods `StreamrClient#updateStream()` and `Stream#update()`: (https://github.com/streamr-dev/network/pull/2826, https://github.com/streamr-dev/network/pull/2855, https://github.com/streamr-dev/network/pull/2859, https://github.com/streamr-dev/network/pull/2862) - use `StreamrClient#setStreamMetadata()` and `Stream#setMetadata()` instead - both methods overwrite metadata instead of merging it +- **BREAKING CHANGE:** Methods `Stream#getMetadata()` and `Stream#getStreamParts()` are async (https://github.com/streamr-dev/network/pull/2883) - Caching changes: - storage node addresses (https://github.com/streamr-dev/network/pull/2877, https://github.com/streamr-dev/network/pull/2878) - stream metadata and permissions (https://github.com/streamr-dev/network/pull/2889) diff --git a/docs/docs/usage/streams/partitioning.md b/docs/docs/usage/streams/partitioning.md index c80171f52a..673ed7e6c2 100644 --- a/docs/docs/usage/streams/partitioning.md +++ b/docs/docs/usage/streams/partitioning.md @@ -38,7 +38,7 @@ const stream = await streamr.createStream({ }); console.log( `Stream created: ${stream.id}. It has ${ - stream.getPartitionCount() + await stream.getPartitionCount() } partitions.` ); ``` diff --git a/packages/cli-tools/bin/streamr-storage-node-list-streams.ts b/packages/cli-tools/bin/streamr-storage-node-list-streams.ts index ea77c1a49c..f628eaa45c 100755 --- a/packages/cli-tools/bin/streamr-storage-node-list-streams.ts +++ b/packages/cli-tools/bin/streamr-storage-node-list-streams.ts @@ -8,12 +8,12 @@ import { createClientCommand } from '../src/command' createClientCommand((async (client: StreamrClient, storageNodeAddress: string) => { const { streams } = await client.getStoredStreams(storageNodeAddress) if (streams.length > 0) { - console.info(EasyTable.print(streams.map((stream) => { + console.info(EasyTable.print(await Promise.all(streams.map(async (stream) => { return { id: stream.id, - partitions: stream.getPartitionCount() + partitions: await stream.getPartitionCount() } - }))) + })))) } })) .arguments('') diff --git a/packages/cli-tools/bin/streamr-stream-create.ts b/packages/cli-tools/bin/streamr-stream-create.ts index e7c9af523e..59149c1404 100755 --- a/packages/cli-tools/bin/streamr-stream-create.ts +++ b/packages/cli-tools/bin/streamr-stream-create.ts @@ -19,7 +19,7 @@ createClientCommand(async (client: StreamrClient, streamIdOrPath: string, option partitions: options.partitions } const stream = await client.createStream(body) - console.info(JSON.stringify({ id: stream.id, ...stream.getMetadata() }, null, 2)) + console.info(JSON.stringify({ id: stream.id, ...await stream.getMetadata() }, null, 2)) }) .arguments('') .description('create a new stream') diff --git a/packages/cli-tools/bin/streamr-stream-show.ts b/packages/cli-tools/bin/streamr-stream-show.ts index 7a2c8ec56d..5a8ee33f3b 100755 --- a/packages/cli-tools/bin/streamr-stream-show.ts +++ b/packages/cli-tools/bin/streamr-stream-show.ts @@ -19,7 +19,7 @@ const withRenamedField = (obj: any, from: string, to: string) => { createClientCommand(async (client: StreamrClient, streamId: string, options: Options) => { const stream = await client.getStream(streamId) - const obj: any = { id: stream.id, ...stream.getMetadata() } + const obj: any = { id: stream.id, ...await stream.getMetadata() } if (options.includePermissions) { const assigments = await stream.getPermissions() obj.permissions = assigments.map((assignment) => { diff --git a/packages/cli-tools/test/stream-create.test.ts b/packages/cli-tools/test/stream-create.test.ts index 21ce3bcc3a..6286f48502 100644 --- a/packages/cli-tools/test/stream-create.test.ts +++ b/packages/cli-tools/test/stream-create.test.ts @@ -19,7 +19,7 @@ describe('create stream', () => { }) const client = createTestClient() const stream = await client.getStream(streamId) - expect(stream.getPartitionCount()).toBe(1) + expect(await stream.getPartitionCount()).toBe(1) await client.destroy() }, 20 * 1000) diff --git a/packages/node/src/plugins/storage/DeleteExpiredCmd.ts b/packages/node/src/plugins/storage/DeleteExpiredCmd.ts index b5d5bc17af..ad650a9dcf 100644 --- a/packages/node/src/plugins/storage/DeleteExpiredCmd.ts +++ b/packages/node/src/plugins/storage/DeleteExpiredCmd.ts @@ -116,7 +116,7 @@ export class DeleteExpiredCmd { return { streamId: stream.streamId, partition: stream.partition, - storageDays: streamFromChain.getStorageDayCount() ?? 365 + storageDays: (await streamFromChain.getStorageDayCount()) ?? 365 } } catch (err) { logger.error('Failed to fetch stream info', { err }) } }) diff --git a/packages/node/src/plugins/storage/StorageConfig.ts b/packages/node/src/plugins/storage/StorageConfig.ts index 5822544968..86a0446207 100644 --- a/packages/node/src/plugins/storage/StorageConfig.ts +++ b/packages/node/src/plugins/storage/StorageConfig.ts @@ -51,14 +51,14 @@ export class StorageConfig { this.clusterSize = clusterSize this.myIndexInCluster = myIndexInCluster this.listener = listener - this.storagePoller = new StoragePoller(clusterId, pollInterval, streamrClient, (streams, block) => { - const streamParts = streams.flatMap((stream: Stream) => ([ - ...this.createMyStreamParts(stream) - ])) + this.storagePoller = new StoragePoller(clusterId, pollInterval, streamrClient, async (streams, block) => { + const streamParts = (await Promise.all(streams.map(async (stream: Stream) => { + return [...await this.createMyStreamParts(stream)] + }))).flat() this.handleDiff(this.synchronizer.ingestSnapshot(new Set(streamParts), block)) }) - this.storageEventListener = new StorageEventListener(clusterId, streamrClient, (stream, type, block) => { - const streamParts = this.createMyStreamParts(stream) + this.storageEventListener = new StorageEventListener(clusterId, streamrClient, async (stream, type, block) => { + const streamParts = await this.createMyStreamParts(stream) this.handleDiff(this.synchronizer.ingestPatch(streamParts, type, block)) }) this.abortController = new AbortController() @@ -82,8 +82,8 @@ export class StorageConfig { return this.synchronizer.getState() } - private createMyStreamParts(stream: Stream): Set { - return new Set(stream.getStreamParts().filter((streamPart) => { + private async createMyStreamParts(stream: Stream): Promise> { + return new Set((await stream.getStreamParts()).filter((streamPart) => { const hashedIndex = keyToArrayIndex(this.clusterSize, streamPart) return hashedIndex === this.myIndexInCluster })) diff --git a/packages/node/src/plugins/storage/StorageEventListener.ts b/packages/node/src/plugins/storage/StorageEventListener.ts index 0aa451217c..2ee3070f66 100644 --- a/packages/node/src/plugins/storage/StorageEventListener.ts +++ b/packages/node/src/plugins/storage/StorageEventListener.ts @@ -10,14 +10,14 @@ const logger = new Logger(module) export class StorageEventListener { private readonly clusterId: EthereumAddress private readonly streamrClient: StreamrClient - private readonly onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => void + private readonly onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => Promise private readonly onAddToStorageNode: (event: StorageNodeAssignmentEvent) => void private readonly onRemoveFromStorageNode: (event: StorageNodeAssignmentEvent) => void constructor( clusterId: EthereumAddress, streamrClient: StreamrClient, - onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => void + onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => Promise ) { this.clusterId = clusterId this.streamrClient = streamrClient @@ -26,14 +26,14 @@ export class StorageEventListener { this.onRemoveFromStorageNode = (event: StorageNodeAssignmentEvent) => this.handleEvent(event, 'removed') } - private async handleEvent(event: StorageNodeAssignmentEvent, type: 'added' | 'removed') { + private async handleEvent(event: StorageNodeAssignmentEvent, type: 'added' | 'removed'): Promise { if (event.nodeAddress !== this.clusterId) { return } logger.info('Received StorageNodeAssignmentEvent', { type, event }) try { const stream = await this.streamrClient.getStream(event.streamId) - this.onEvent(stream, type, event.blockNumber) + await this.onEvent(stream, type, event.blockNumber) } catch (err) { logger.warn('Encountered error handling StorageNodeAssignmentEvent', { err, event, type }) } diff --git a/packages/node/src/plugins/storage/StoragePoller.ts b/packages/node/src/plugins/storage/StoragePoller.ts index 23f38160fd..b3798d152e 100644 --- a/packages/node/src/plugins/storage/StoragePoller.ts +++ b/packages/node/src/plugins/storage/StoragePoller.ts @@ -10,13 +10,13 @@ export class StoragePoller { private readonly clusterId: string private readonly pollInterval: number private readonly streamrClient: StreamrClient - private readonly onNewSnapshot: (streams: Stream[], block: number) => void + private readonly onNewSnapshot: (streams: Stream[], block: number) => Promise constructor( clusterId: string, pollInterval: number, streamrClient: StreamrClient, - onNewSnapshot: (streams: Stream[], block: number) => void + onNewSnapshot: (streams: Stream[], block: number) => Promise ) { this.clusterId = clusterId this.pollInterval = pollInterval @@ -39,7 +39,7 @@ export class StoragePoller { foundStreams: streams.length, blockNumber }) - this.onNewSnapshot(streams, blockNumber) + await this.onNewSnapshot(streams, blockNumber) } private async tryPoll(): Promise { diff --git a/packages/node/test/integration/plugins/operator/MaintainTopologyService.test.ts b/packages/node/test/integration/plugins/operator/MaintainTopologyService.test.ts index 320c84dd01..085ae2ef6c 100644 --- a/packages/node/test/integration/plugins/operator/MaintainTopologyService.test.ts +++ b/packages/node/test/integration/plugins/operator/MaintainTopologyService.test.ts @@ -101,21 +101,21 @@ describe('MaintainTopologyService', () => { await maintainTopologyHelper.start() await until(async () => { - return containsAll(await getSubscribedStreamPartIds(client), stream1.getStreamParts()) + return containsAll(await getSubscribedStreamPartIds(client), await stream1.getStreamParts()) }, 10000, 1000) await stake(operatorContract, await sponsorship2.getAddress(), 10000) await until(async () => { return containsAll(await getSubscribedStreamPartIds(client), [ - ...stream1.getStreamParts(), - ...stream2.getStreamParts() + ...await stream1.getStreamParts(), + ...await stream2.getStreamParts() ]) }, 10000, 1000) await (await operatorContract.unstake(await sponsorship1.getAddress())).wait() await until(async () => { const state = await getSubscribedStreamPartIds(client) - return containsAll(state, stream2.getStreamParts()) && doesNotContainAny(state, stream1.getStreamParts()) + return containsAll(state, await stream2.getStreamParts()) && doesNotContainAny(state, await stream1.getStreamParts()) }, 10000, 1000) }, 120 * 1000) }) diff --git a/packages/node/test/unit/plugins/storage/StorageConfig.test.ts b/packages/node/test/unit/plugins/storage/StorageConfig.test.ts index 4f86663bd5..e67da2ff9a 100644 --- a/packages/node/test/unit/plugins/storage/StorageConfig.test.ts +++ b/packages/node/test/unit/plugins/storage/StorageConfig.test.ts @@ -19,7 +19,7 @@ function makeStubStream(streamId: string): Stream { const partitions = PARTITION_COUNT_LOOKUP[streamId] const stub: Partial = { id: toStreamID(streamId), - getStreamParts(): StreamPartID[] { // TODO: duplicated code from client + async getStreamParts(): Promise { // TODO: duplicated code from client return range(0, partitions).map((p) => toStreamPartID(toStreamID(streamId), p)) } } diff --git a/packages/node/test/unit/plugins/storage/StorageEventListener.test.ts b/packages/node/test/unit/plugins/storage/StorageEventListener.test.ts index 48d68903c5..93ffdf9497 100644 --- a/packages/node/test/unit/plugins/storage/StorageEventListener.test.ts +++ b/packages/node/test/unit/plugins/storage/StorageEventListener.test.ts @@ -11,7 +11,7 @@ const otherClusterId = toEthereumAddress('0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb describe(StorageEventListener, () => { let stubClient: Pick const storageEventListeners: Map any)> = new Map() - let onEvent: jest.Mock + let onEvent: jest.Mock, [stream: Stream, type: 'added' | 'removed', block: number]> let listener: StorageEventListener beforeEach(() => { diff --git a/packages/node/test/unit/plugins/storage/StoragePoller.test.ts b/packages/node/test/unit/plugins/storage/StoragePoller.test.ts index 8eb1a6f49e..8a25291359 100644 --- a/packages/node/test/unit/plugins/storage/StoragePoller.test.ts +++ b/packages/node/test/unit/plugins/storage/StoragePoller.test.ts @@ -14,7 +14,7 @@ const POLL_RESULT = Object.freeze({ describe(StoragePoller, () => { let getStoredStreams: jest.Mock, [nodeAddress: EthereumAddress]> - let onNewSnapshot: jest.Mock + let onNewSnapshot: jest.Mock, [streams: Stream[], block: number]> let stubClient: Pick let poller: StoragePoller let abortController: AbortController diff --git a/packages/sdk/src/Stream.ts b/packages/sdk/src/Stream.ts index c742346516..a5a7a3e433 100644 --- a/packages/sdk/src/Stream.ts +++ b/packages/sdk/src/Stream.ts @@ -26,17 +26,14 @@ import { */ export class Stream { readonly id: StreamID - private metadata: StreamMetadata private readonly client: StreamrClient /** @internal */ constructor( id: StreamID, - metadata: StreamMetadata, client: StreamrClient ) { this.id = id - this.metadata = metadata this.client = client } @@ -49,14 +46,6 @@ export class Stream { return this.client.publish(this.id, content, metadata) } - /** - * Updates the metadata of the stream. - */ - async setMetadata(metadata: StreamMetadata): Promise { - await this.client.setStreamMetadata(this.id, metadata) - this.metadata = metadata - } - /** * See {@link StreamrClient.hasPermission | StreamrClient.hasPermission}. * @@ -122,16 +111,16 @@ export class Stream { /** * Returns the partitions of the stream. */ - getStreamParts(): StreamPartID[] { - return range(0, this.getPartitionCount()).map((p) => toStreamPartID(this.id, p)) + async getStreamParts(): Promise { + return range(0, await this.getPartitionCount()).map((p) => toStreamPartID(this.id, p)) } - getPartitionCount(): number { - return getPartitionCount(this.getMetadata()) + async getPartitionCount(): Promise { + return getPartitionCount(await this.getMetadata()) } - getDescription(): string | undefined { - const value = this.getMetadata().description + async getDescription(): Promise { + const value = (await this.getMetadata()).description if (isString(value)) { return value } else { @@ -141,7 +130,7 @@ export class Stream { async setDescription(description: string): Promise { await this.setMetadata({ - ...this.getMetadata(), + ...await this.getMetadata(), description }) } @@ -149,8 +138,8 @@ export class Stream { /** * Gets the value of `storageDays` field */ - getStorageDayCount(): number | undefined { - const value = this.getMetadata().storageDays + async getStorageDayCount(): Promise { + const value = (await this.getMetadata()).storageDays if (isNumber(value)) { return value } else { @@ -163,7 +152,7 @@ export class Stream { */ async setStorageDayCount(count: number): Promise { await this.setMetadata({ - ...this.getMetadata(), + ...await this.getMetadata(), storageDays: count }) } @@ -171,7 +160,14 @@ export class Stream { /** * Returns the metadata of the stream. */ - getMetadata(): StreamMetadata { - return this.metadata + async getMetadata(): Promise { + return this.client.getStreamMetadata(this.id) + } + + /** + * Updates the metadata of the stream. + */ + async setMetadata(metadata: StreamMetadata): Promise { + await this.client.setStreamMetadata(this.id, metadata) } } diff --git a/packages/sdk/src/StreamrClient.ts b/packages/sdk/src/StreamrClient.ts index eb8a2090c8..870a6e6882 100644 --- a/packages/sdk/src/StreamrClient.ts +++ b/packages/sdk/src/StreamrClient.ts @@ -3,7 +3,7 @@ import './utils/PatchTsyringe' import { DhtAddress } from '@streamr/dht' import { ProxyDirection } from '@streamr/trackerless-network' -import { DEFAULT_PARTITION_COUNT, EthereumAddress, HexString, StreamID, TheGraphClient, toEthereumAddress, toUserId } from '@streamr/utils' +import { DEFAULT_PARTITION_COUNT, EthereumAddress, HexString, Logger, StreamID, TheGraphClient, toEthereumAddress, toUserId } from '@streamr/utils' import type { Overrides } from 'ethers' import EventEmitter from 'eventemitter3' import merge from 'lodash/merge' @@ -52,6 +52,7 @@ import { LoggerFactory } from './utils/LoggerFactory' import { pOnce } from './utils/promises' import { convertPeerDescriptorToNetworkPeerDescriptor, createTheGraphClient } from './utils/utils' import { addStreamToStorageNode } from './utils/addStreamToStorageNode' +import { map } from './utils/GeneratorUtils' // TODO: this type only exists to enable tsdoc to generate proper documentation export type SubscribeOptions = StreamDefinition & ExtraSubscribeOptions @@ -73,6 +74,8 @@ export interface ExtraSubscribeOptions { erc1271Contract?: HexString } +const logger = new Logger(module) + /** * The main API used to interact with Streamr. * @@ -357,8 +360,11 @@ export class StreamrClient { */ async getStream(streamIdOrPath: string): Promise { const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath) - const metadata = await this.streamRegistry.getStreamMetadata(streamId) - return new Stream(streamId, metadata, this) + // Check if the stream exists by querying its metadata. Throws an error if no metadata is found, + // indicating the stream doesn't exist. As a side-effect this populates StreamRegistry's metadata + // cache for future use, such as stream.getPartitionCount() calls. + await this.streamRegistry.getStreamMetadata(streamId) + return new Stream(streamId, this) } /** @@ -376,7 +382,7 @@ export class StreamrClient { const streamId = await this.streamIdBuilder.toStreamID(props.id) const metadata = merge({ partitions: DEFAULT_PARTITION_COUNT }, omit(props, 'id') ) await this.streamRegistry.createStream(streamId, metadata) - return new Stream(streamId, metadata, this) + return new Stream(streamId, this) } /** @@ -399,6 +405,14 @@ export class StreamrClient { } } + /** + * Returns the metadata of a stream. + */ + async getStreamMetadata(streamIdOrPath: string): Promise { + const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath) + return this.streamRegistry.getStreamMetadata(streamId) + } + /** * Updates the metadata of a stream. */ @@ -426,12 +440,16 @@ export class StreamrClient { permissionFilter: SearchStreamsPermissionFilter | undefined, orderBy: SearchStreamsOrderBy = { field: 'id', direction: 'asc' } ): AsyncIterable { - return this.streamRegistry.searchStreams( + logger.debug('Search for streams', { term, permissionFilter }) + if ((term === undefined) && (permissionFilter === undefined)) { + throw new Error('Requires a search term or a permission filter') + } + const streamIds = this.streamRegistry.searchStreams( term, (permissionFilter !== undefined) ? toInternalSearchStreamsPermissionFilter(permissionFilter) : undefined, - orderBy, - this + orderBy ) + return map(streamIds, (id) => new Stream(id, this)) } // -------------------------------------------------------------------------------------------- @@ -564,8 +582,11 @@ export class StreamrClient { */ async getStoredStreams(storageNodeAddress: HexString): Promise<{ streams: Stream[], blockNumber: number }> { const queryResult = await this.streamStorageRegistry.getStoredStreams(toEthereumAddress(storageNodeAddress)) + for (const stream of queryResult.streams) { + this.streamRegistry.populateMetadataCache(stream.id, stream.metadata) + } return { - streams: queryResult.streams.map((item) => new Stream(item.id, item.metadata, this)), + streams: queryResult.streams.map((item) => new Stream(item.id, this)), blockNumber: queryResult.blockNumber } } diff --git a/packages/sdk/src/contracts/StreamRegistry.ts b/packages/sdk/src/contracts/StreamRegistry.ts index 8652a22e38..7417419892 100644 --- a/packages/sdk/src/contracts/StreamRegistry.ts +++ b/packages/sdk/src/contracts/StreamRegistry.ts @@ -20,10 +20,8 @@ import { Lifecycle, inject, scoped } from 'tsyringe' import { Authentication, AuthenticationInjectionToken } from '../Authentication' import { ConfigInjectionToken, StrictStreamrClientConfig } from '../Config' import { RpcProviderSource } from '../RpcProviderSource' -import { Stream } from '../Stream' import { StreamIDBuilder } from '../StreamIDBuilder' import { StreamMetadata, parseMetadata } from '../StreamMetadata' -import { StreamrClient } from '../StreamrClient' import { StreamrClientError } from '../StreamrClientError' import type { StreamRegistryV5 as StreamRegistryContract } from '../ethereumArtifacts/StreamRegistryV5' import StreamRegistryArtifact from '../ethereumArtifacts/StreamRegistryV5Abi.json' @@ -295,19 +293,21 @@ export class StreamRegistry { return parseMetadata(metadata) } - searchStreams( + async* searchStreams( term: string | undefined, permissionFilter: InternalSearchStreamsPermissionFilter | undefined, - orderBy: SearchStreamsOrderBy, - client: StreamrClient - ): AsyncIterable { - return _searchStreams( + orderBy: SearchStreamsOrderBy + ): AsyncGenerator { + const queryResult = _searchStreams( term, permissionFilter, orderBy, - this.theGraphClient, - this.logger, - client) + this.theGraphClient) + for await (const item of queryResult) { + const id = toStreamID(item.stream.id) + this.populateMetadataCache(id, parseMetadata(item.stream.metadata)) + yield id + } } getStreamPublishers(streamIdOrPath: string): AsyncIterable { @@ -533,7 +533,7 @@ export class StreamRegistry { return this.publicSubscribePermissionCache.get(streamId) } - private populateMetadataCache(streamId: StreamID, metadata: StreamMetadata): void { + populateMetadataCache(streamId: StreamID, metadata: StreamMetadata): void { this.metadataCache.set([streamId], metadata) } diff --git a/packages/sdk/src/contracts/searchStreams.ts b/packages/sdk/src/contracts/searchStreams.ts index cd3acd021a..0bbbfc1ce8 100644 --- a/packages/sdk/src/contracts/searchStreams.ts +++ b/packages/sdk/src/contracts/searchStreams.ts @@ -1,10 +1,7 @@ -import { ChangeFieldType, GraphQLQuery, HexString, Logger, TheGraphClient, toStreamID, toUserId, UserID } from '@streamr/utils' -import { Stream } from '../Stream' +import { ChangeFieldType, GraphQLQuery, HexString, TheGraphClient, toUserId, UserID } from '@streamr/utils' import { ChainPermissions, convertChainPermissionsToStreamPermissions, PUBLIC_PERMISSION_USER_ID, StreamPermission } from '../permission' -import { filter, map, unique } from '../utils/GeneratorUtils' +import { filter, unique } from '../utils/GeneratorUtils' import { StreamQueryResult } from './StreamRegistry' -import { parseMetadata } from '../StreamMetadata' -import { StreamrClient } from '../StreamrClient' export interface SearchStreamsPermissionFilter { userId: HexString @@ -35,25 +32,7 @@ export const toInternalSearchStreamsPermissionFilter = (filter: SearchStreamsPer } } -export const searchStreams = ( - term: string | undefined, - permissionFilter: InternalSearchStreamsPermissionFilter | undefined, - orderBy: SearchStreamsOrderBy, - theGraphClient: TheGraphClient, - logger: Logger, - client: StreamrClient -): AsyncGenerator => { - if ((term === undefined) && (permissionFilter === undefined)) { - throw new Error('Requires a search term or a permission filter') - } - logger.debug('Search for streams', { term, permissionFilter }) - return map( - fetchSearchStreamsResultFromTheGraph(term, permissionFilter, orderBy, theGraphClient), - (item: SearchStreamsResultItem) => new Stream(toStreamID(item.stream.id), parseMetadata(item.stream.metadata), client) - ) -} - -async function* fetchSearchStreamsResultFromTheGraph( +export async function* searchStreams( term: string | undefined, permissionFilter: InternalSearchStreamsPermissionFilter | undefined, orderBy: SearchStreamsOrderBy, diff --git a/packages/sdk/test/end-to-end/StreamRegistry.test.ts b/packages/sdk/test/end-to-end/StreamRegistry.test.ts index 1a1142edde..9513509757 100644 --- a/packages/sdk/test/end-to-end/StreamRegistry.test.ts +++ b/packages/sdk/test/end-to-end/StreamRegistry.test.ts @@ -241,16 +241,18 @@ describe('StreamRegistry', () => { await createdStream.setMetadata({ description }) + const createdMetadata = await createdStream.getMetadata() await until(async () => { try { - return (await client.getStream(createdStream.id)).getMetadata().description === createdStream.getMetadata().description + const queriedMetadata = await (await client.getStream(createdStream.id)).getMetadata() + return queriedMetadata.description === createdMetadata.description } catch { return false } }, 100000, 1000) // check that other fields not overwritten const updatedStream = await client.getStream(createdStream.id) - expect(updatedStream.getMetadata()).toEqual({ + expect(await updatedStream.getMetadata()).toEqual({ description }) }, TIMEOUT) diff --git a/packages/sdk/test/end-to-end/contract-call-cache.test.ts b/packages/sdk/test/end-to-end/contract-call-cache.test.ts new file mode 100644 index 0000000000..036c7adf8d --- /dev/null +++ b/packages/sdk/test/end-to-end/contract-call-cache.test.ts @@ -0,0 +1,100 @@ +import { config as CHAIN_CONFIG } from '@streamr/config' +import { fetchPrivateKeyWithGas } from '@streamr/test-utils' +import { collect, StreamID, until } from '@streamr/utils' +import { Wallet } from 'ethers' +import { StreamPermission } from '../../src/permission' +import { StreamrClient } from '../../src/StreamrClient' +import { ProxyHttpServer, ProxyHttpServerRequest } from '../test-utils/ProxyHttpServer' +import { + createRelativeTestStreamId, + createTestClient, + createTestStream, + formEthereumFunctionSelector, + parseEthereumFunctionSelectorFromCallData +} from '../test-utils/utils' +import { nextValue } from './../../src/utils/iterators' + +export const waitForTheGraphToHaveIndexed = async (streamId: StreamID, client: StreamrClient): Promise => { + await until(async () => { + const streams = await collect(client.searchStreams(streamId, undefined)) + return streams.length > 0 + }) +} + +describe('contract call cache', () => { + + describe('metadata', () => { + + let client: StreamrClient + let authenticatedUser: Wallet + let server: ProxyHttpServer + let existingStreamId: StreamID + const METADATA_QUERY_FUNCTION_SELECTOR = formEthereumFunctionSelector('getStreamMetadata(string)') + + const getMethodCalls = (): ProxyHttpServerRequest[] => { + const methodCalls = server.getRequests().filter(((r) => r.body.method === 'eth_call')) + return methodCalls.filter((c) => { + return parseEthereumFunctionSelectorFromCallData(c.body.params[0].data) === METADATA_QUERY_FUNCTION_SELECTOR + }) + } + + beforeAll(async () => { + authenticatedUser = new Wallet(await fetchPrivateKeyWithGas()) + const creator = createTestClient(await fetchPrivateKeyWithGas()) + existingStreamId = (await createTestStream(creator, module)).id + creator.grantPermissions(existingStreamId, { + userId: authenticatedUser.address, + permissions: [StreamPermission.EDIT] + }) + await creator.destroy() + await waitForTheGraphToHaveIndexed(existingStreamId, creator) + }) + + beforeEach(async () => { + server = new ProxyHttpServer(CHAIN_CONFIG.dev2.rpcEndpoints[0].url) + await server.start() + client = new StreamrClient({ + environment: 'dev2', + auth: { + privateKey: authenticatedUser.privateKey + }, + contracts: { + rpcs: [{ + url: `http://localhost:${server.getPort()}` + }] + } + }) + }) + + afterEach(async () => { + await client.destroy() + await server.stop() + }) + + it('is in cache after calling getStream()', async () => { + const stream = await client.getStream(existingStreamId) + expect(getMethodCalls()).toHaveLength(1) + await stream.getMetadata() + expect(getMethodCalls()).toHaveLength(1) + }) + + it('is in cache after calling createStream()', async () => { + const stream = await client.createStream(createRelativeTestStreamId(module)) + await stream.getMetadata() + expect(getMethodCalls()).toHaveLength(0) + }) + + it('is in cache after calling searchStreams()', async () => { + const stream = (await nextValue(client.searchStreams(existingStreamId, undefined)[Symbol.asyncIterator]()))! + await stream.getMetadata() + expect(getMethodCalls()).toHaveLength(0) + }) + + it('cache updated when calling setStreamMetatadata()', async () => { + const NEW_METADATA = { foo: Date.now() } + await client.setStreamMetadata(existingStreamId, NEW_METADATA) + expect(await client.getStreamMetadata(existingStreamId)).toEqual(NEW_METADATA) + expect(getMethodCalls()).toHaveLength(0) + }) + }) +}) diff --git a/packages/sdk/test/integration/GroupKeyPersistence.test.ts b/packages/sdk/test/integration/GroupKeyPersistence.test.ts index 18adcddad6..961f6d64a4 100644 --- a/packages/sdk/test/integration/GroupKeyPersistence.test.ts +++ b/packages/sdk/test/integration/GroupKeyPersistence.test.ts @@ -106,7 +106,7 @@ describe('Group Key Persistence', () => { }) it('works', async () => { - await startPublisherKeyExchangeSubscription(publisher2, stream.getStreamParts()[0]) + await startPublisherKeyExchangeSubscription(publisher2, (await stream.getStreamParts())[0]) const received: Message[] = [] const sub = await subscriber.resend( diff --git a/packages/sdk/test/integration/PublisherKeyExchange.test.ts b/packages/sdk/test/integration/PublisherKeyExchange.test.ts index 5e6056f95c..809fdf650c 100644 --- a/packages/sdk/test/integration/PublisherKeyExchange.test.ts +++ b/packages/sdk/test/integration/PublisherKeyExchange.test.ts @@ -73,7 +73,7 @@ describe('PublisherKeyExchange', () => { } }) const stream = await createStream() - streamPartId = stream.getStreamParts()[0] + streamPartId = (await stream.getStreamParts())[0] await startPublisherKeyExchangeSubscription(publisherClient, streamPartId) }) diff --git a/packages/sdk/test/integration/StreamrClient.test.ts b/packages/sdk/test/integration/StreamrClient.test.ts index c305ddb2fb..1781df3527 100644 --- a/packages/sdk/test/integration/StreamrClient.test.ts +++ b/packages/sdk/test/integration/StreamrClient.test.ts @@ -35,7 +35,7 @@ describe('StreamrClient', () => { } }) const stream = await createTestStream(client, module) - streamDefinition = stream.getStreamParts()[0] + streamDefinition = (await stream.getStreamParts())[0] const publisherWallet = fastWallet() await stream.grantPermissions({ userId: publisherWallet.address, diff --git a/packages/sdk/test/integration/resend-and-subscribe.test.ts b/packages/sdk/test/integration/resend-and-subscribe.test.ts index d85b3a6448..cfbb3d1340 100644 --- a/packages/sdk/test/integration/resend-and-subscribe.test.ts +++ b/packages/sdk/test/integration/resend-and-subscribe.test.ts @@ -57,7 +57,7 @@ describe('resend and subscribe', () => { streamId: stream.id, distributionMethod: 'rekey' }) - await startPublisherKeyExchangeSubscription(publisher, stream.getStreamParts()[0]) + await startPublisherKeyExchangeSubscription(publisher, (await stream.getStreamParts())[0]) const historicalMessage = await createMockMessage({ timestamp: 1000, diff --git a/packages/sdk/test/integration/resend-with-existing-key.test.ts b/packages/sdk/test/integration/resend-with-existing-key.test.ts index e773378e27..51ab35e5be 100644 --- a/packages/sdk/test/integration/resend-with-existing-key.test.ts +++ b/packages/sdk/test/integration/resend-with-existing-key.test.ts @@ -40,8 +40,8 @@ describe('resend with existing key', () => { storageNode.storeMessage(message) } - const resendRange = (fromTimestamp: number, toTimestamp: number) => { - return subscriber.resend(stream.getStreamParts()[0], { + const resendRange = async (fromTimestamp: number, toTimestamp: number) => { + return subscriber.resend((await stream.getStreamParts())[0], { from: { timestamp: fromTimestamp }, diff --git a/packages/sdk/test/integration/update-encryption-key.test.ts b/packages/sdk/test/integration/update-encryption-key.test.ts index c7e3c89609..cdedc27fcf 100644 --- a/packages/sdk/test/integration/update-encryption-key.test.ts +++ b/packages/sdk/test/integration/update-encryption-key.test.ts @@ -35,7 +35,7 @@ describe('update encryption key', () => { userId: await subscriber.getUserId(), permissions: [StreamPermission.SUBSCRIBE] }) - streamPartId = stream.getStreamParts()[0] + streamPartId = (await stream.getStreamParts())[0] const sub = await subscriber.subscribe(streamPartId) messageIterator = sub[Symbol.asyncIterator]() onError = jest.fn() diff --git a/packages/sdk/test/test-utils/ProxyHttpServer.ts b/packages/sdk/test/test-utils/ProxyHttpServer.ts new file mode 100644 index 0000000000..807fac673f --- /dev/null +++ b/packages/sdk/test/test-utils/ProxyHttpServer.ts @@ -0,0 +1,67 @@ +import { Logger } from '@streamr/utils' +import { once } from 'events' +import express, { Request, Response } from 'express' +import { Server } from 'http' +import { AddressInfo } from 'net' +import fetch, { RequestInit } from 'node-fetch' + +export interface ProxyHttpServerRequest { + body?: any + response: any + timestamp: number +} + +const logger = new Logger(module) + +/** + * Forwards requests to targetUrl. Supports only JSON requests and responses. + */ +export class ProxyHttpServer { + + private httpServer?: Server + private readonly targetUrl: string + private readonly requests: ProxyHttpServerRequest[] = [] + + constructor(targetUrl: string) { + this.targetUrl = targetUrl + } + + async start(): Promise { + logger.debug('Starting proxy server') + const app = express() + app.use(express.json()) + app.all('/', async (req: Request, res: Response) => { + logger.debug('Query proxy server', { body: req.body }) + const isPost = req.method === 'POST' + const requestInit: RequestInit = { + method: req.method, + body: isPost ? JSON.stringify(req.body) : undefined, + } + const targetResponse = await fetch(this.targetUrl, requestInit) + const targetBody = await targetResponse.json() + res.json(targetBody) + this.requests.push({ + body: isPost ? req.body : undefined, + response: targetBody, + timestamp: Date.now() + }) + }) + this.httpServer = app.listen(0) // uses random port + await once(this.httpServer, 'listening') + logger.debug(`Started proxy server at port ${this.getPort()}`) + } + + async stop(): Promise { + this.httpServer!.close() + await once(this.httpServer!, 'close') + logger.debug('Stopped proxy server') + } + + getRequests(): ProxyHttpServerRequest[] { + return this.requests + } + + getPort(): number { + return (this.httpServer!.address() as AddressInfo).port + } +} diff --git a/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts b/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts index 984f6df2f5..70351501a1 100644 --- a/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts +++ b/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts @@ -167,6 +167,11 @@ export class FakeStreamRegistry implements Methods { }) } + // eslint-disable-next-line class-methods-use-this + populateMetadataCache(): void { + // no-op + } + // eslint-disable-next-line class-methods-use-this invalidatePermissionCaches(): void { // no-op @@ -196,7 +201,7 @@ export class FakeStreamRegistry implements Methods { } // eslint-disable-next-line class-methods-use-this - searchStreams(_term: string | undefined, _permissionFilter: InternalSearchStreamsPermissionFilter | undefined): AsyncIterable { + searchStreams(_term: string | undefined, _permissionFilter: InternalSearchStreamsPermissionFilter | undefined): AsyncGenerator { throw new Error('not implemented') } diff --git a/packages/sdk/test/test-utils/utils.ts b/packages/sdk/test/test-utils/utils.ts index 8b64fca26d..242fed26dc 100644 --- a/packages/sdk/test/test-utils/utils.ts +++ b/packages/sdk/test/test-utils/utils.ts @@ -129,7 +129,7 @@ export const createMockMessage = async ( opts: CreateMockMessageOptions ): Promise => { const [streamId, partition] = StreamPartIDUtils.getStreamIDAndPartition( - opts.streamPartId ?? opts.stream.getStreamParts()[0] + opts.streamPartId ?? (await opts.stream.getStreamParts())[0] ) const authentication = createPrivateKeyAuthentication(opts.publisher.privateKey) const factory = new MessageFactory({ diff --git a/packages/sdk/test/unit/Stream.test.ts b/packages/sdk/test/unit/Stream.test.ts index 7818930d28..a5a9b9e62c 100644 --- a/packages/sdk/test/unit/Stream.test.ts +++ b/packages/sdk/test/unit/Stream.test.ts @@ -1,55 +1,19 @@ import 'reflect-metadata' -import { toStreamID } from '@streamr/utils' -import { StreamrClient } from '../../src/StreamrClient' import { Stream } from '../../src/Stream' describe('Stream', () => { - it('initial fields', () => { - const stream = new Stream(toStreamID('mock-id'), {}, undefined as any) - expect(stream.getMetadata()).toEqual({}) - }) - - it('getMetadata', () => { - const stream = new Stream(toStreamID('mock-id'), { - partitions: 10, - storageDays: 20 - }, undefined as any) - expect(stream.getMetadata()).toEqual({ - partitions: 10, - storageDays: 20 - }) - }) - it('getPartitionCount', () => { const stream = new Stream( undefined as any, - { partitions: 150 }, - undefined as any, + { + getStreamMetadata: async () => ({ partitions: 150 }) + } as any ) - expect(() => stream.getPartitionCount()).toThrowStreamrError({ + expect(() => stream.getPartitionCount()).rejects.toThrowStreamrError({ message: 'Invalid partition count: 150', code: 'INVALID_STREAM_METADATA' }) }) - - describe('setMetadata', () => { - it('fields not updated if transaction fails', async () => { - const client: Partial = { - setStreamMetadata: jest.fn().mockRejectedValue(new Error('mock-error')), - } - - const stream = new Stream(toStreamID('mock-id'), { - description: 'original-description' - }, client as any) - - await expect(() => { - return stream.setMetadata({ - description: 'updated-description' - }) - }).rejects.toThrow('mock-error') - expect(stream.getMetadata().description).toBe('original-description') - }) - }) }) diff --git a/packages/sdk/test/unit/searchStreams.test.ts b/packages/sdk/test/unit/searchStreams.test.ts index b7a1f8f21e..a5d69f84c1 100644 --- a/packages/sdk/test/unit/searchStreams.test.ts +++ b/packages/sdk/test/unit/searchStreams.test.ts @@ -3,7 +3,6 @@ import 'reflect-metadata' import { randomEthereumAddress } from '@streamr/test-utils' import { StreamID, TheGraphClient, collect, toStreamID } from '@streamr/utils' import { SearchStreamsResultItem, searchStreams } from '../../src/contracts/searchStreams' -import { mockLoggerFactory } from '../test-utils/utils' const MOCK_USER = randomEthereumAddress() @@ -44,9 +43,7 @@ describe('searchStreams', () => { '/', undefined, orderBy, - theGraphClient as any, - mockLoggerFactory().createLogger(module), - undefined as any, + theGraphClient as any )) const graphQLquery = ((theGraphClient as any).queryEntities as jest.Mock).mock.calls[0][0]()