Skip to content

Commit eb3f231

Browse files
committed
remove metadata field, call populateMetadataCache()
1 parent b175078 commit eb3f231

File tree

8 files changed

+59
-54
lines changed

8 files changed

+59
-54
lines changed

packages/sdk/src/Stream.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ export class Stream {
3131
/** @internal */
3232
constructor(
3333
id: StreamID,
34-
_metadata: StreamMetadata,
3534
client: StreamrClient
3635
) {
3736
this.id = id

packages/sdk/src/StreamrClient.ts

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import './utils/PatchTsyringe'
33

44
import { DhtAddress } from '@streamr/dht'
55
import { ProxyDirection } from '@streamr/trackerless-network'
6-
import { DEFAULT_PARTITION_COUNT, EthereumAddress, HexString, StreamID, TheGraphClient, toEthereumAddress, toUserId } from '@streamr/utils'
6+
import { DEFAULT_PARTITION_COUNT, EthereumAddress, HexString, Logger, StreamID, TheGraphClient, toEthereumAddress, toUserId } from '@streamr/utils'
77
import type { Overrides } from 'ethers'
88
import EventEmitter from 'eventemitter3'
99
import merge from 'lodash/merge'
@@ -52,6 +52,7 @@ import { LoggerFactory } from './utils/LoggerFactory'
5252
import { pOnce } from './utils/promises'
5353
import { convertPeerDescriptorToNetworkPeerDescriptor, createTheGraphClient } from './utils/utils'
5454
import { addStreamToStorageNode } from './utils/addStreamToStorageNode'
55+
import { map } from './utils/GeneratorUtils'
5556

5657
// TODO: this type only exists to enable tsdoc to generate proper documentation
5758
export type SubscribeOptions = StreamDefinition & ExtraSubscribeOptions
@@ -73,6 +74,8 @@ export interface ExtraSubscribeOptions {
7374
erc1271Contract?: HexString
7475
}
7576

77+
const logger = new Logger(module)
78+
7679
/**
7780
* The main API used to interact with Streamr.
7881
*
@@ -357,8 +360,11 @@ export class StreamrClient {
357360
*/
358361
async getStream(streamIdOrPath: string): Promise<Stream> {
359362
const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath)
360-
const metadata = await this.streamRegistry.getStreamMetadata(streamId, false)
361-
return new Stream(streamId, metadata, this)
363+
// Check if the stream exists by querying its metadata. Throws an error if no metadata is found,
364+
// indicating the stream doesn't exist. As a side-effect this populates StreamRegistry's metadata
365+
// cache for future use, such as stream.getPartitionCount() calls.
366+
await this.streamRegistry.getStreamMetadata(streamId, false)
367+
return new Stream(streamId, this)
362368
}
363369

364370
/**
@@ -376,7 +382,7 @@ export class StreamrClient {
376382
const streamId = await this.streamIdBuilder.toStreamID(props.id)
377383
const metadata = merge({ partitions: DEFAULT_PARTITION_COUNT }, omit(props, 'id') )
378384
await this.streamRegistry.createStream(streamId, metadata)
379-
return new Stream(streamId, metadata, this)
385+
return new Stream(streamId, this)
380386
}
381387

382388
/**
@@ -434,12 +440,16 @@ export class StreamrClient {
434440
permissionFilter: SearchStreamsPermissionFilter | undefined,
435441
orderBy: SearchStreamsOrderBy = { field: 'id', direction: 'asc' }
436442
): AsyncIterable<Stream> {
437-
return this.streamRegistry.searchStreams(
443+
logger.debug('Search for streams', { term, permissionFilter })
444+
if ((term === undefined) && (permissionFilter === undefined)) {
445+
throw new Error('Requires a search term or a permission filter')
446+
}
447+
const streamIds = this.streamRegistry.searchStreams(
438448
term,
439449
(permissionFilter !== undefined) ? toInternalSearchStreamsPermissionFilter(permissionFilter) : undefined,
440-
orderBy,
441-
this
450+
orderBy
442451
)
452+
return map(streamIds, (id) => new Stream(id, this))
443453
}
444454

445455
// --------------------------------------------------------------------------------------------
@@ -572,8 +582,11 @@ export class StreamrClient {
572582
*/
573583
async getStoredStreams(storageNodeAddress: HexString): Promise<{ streams: Stream[], blockNumber: number }> {
574584
const queryResult = await this.streamStorageRegistry.getStoredStreams(toEthereumAddress(storageNodeAddress))
585+
for (const stream of queryResult.streams) {
586+
this.streamRegistry.populateMetadataCache(stream.id, stream.metadata)
587+
}
575588
return {
576-
streams: queryResult.streams.map((item) => new Stream(item.id, item.metadata, this)),
589+
streams: queryResult.streams.map((item) => new Stream(item.id, this)),
577590
blockNumber: queryResult.blockNumber
578591
}
579592
}

packages/sdk/src/contracts/StreamRegistry.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@ import { Lifecycle, inject, scoped } from 'tsyringe'
2020
import { Authentication, AuthenticationInjectionToken } from '../Authentication'
2121
import { ConfigInjectionToken, StrictStreamrClientConfig } from '../Config'
2222
import { RpcProviderSource } from '../RpcProviderSource'
23-
import { Stream } from '../Stream'
2423
import { StreamIDBuilder } from '../StreamIDBuilder'
2524
import { StreamMetadata, parseMetadata } from '../StreamMetadata'
26-
import { StreamrClient } from '../StreamrClient'
2725
import { StreamrClientError } from '../StreamrClientError'
2826
import type { StreamRegistryV5 as StreamRegistryContract } from '../ethereumArtifacts/StreamRegistryV5'
2927
import StreamRegistryArtifact from '../ethereumArtifacts/StreamRegistryV5Abi.json'
@@ -241,6 +239,7 @@ export class StreamRegistry {
241239
await this.ensureStreamIdInNamespaceOfAuthenticatedUser(domain, streamId)
242240
await waitForTx(this.streamRegistryContract!.createStream(path, JSON.stringify(metadata), ethersOverrides))
243241
}
242+
this.populateMetadataCache(streamId, metadata)
244243
}
245244

246245
private async ensureStreamIdInNamespaceOfAuthenticatedUser(address: EthereumAddress, streamId: StreamID): Promise<void> {
@@ -289,19 +288,21 @@ export class StreamRegistry {
289288
return parseMetadata(metadata)
290289
}
291290

292-
searchStreams(
291+
async* searchStreams(
293292
term: string | undefined,
294293
permissionFilter: InternalSearchStreamsPermissionFilter | undefined,
295-
orderBy: SearchStreamsOrderBy,
296-
client: StreamrClient
297-
): AsyncIterable<Stream> {
298-
return _searchStreams(
294+
orderBy: SearchStreamsOrderBy
295+
): AsyncGenerator<StreamID> {
296+
const queryResult = _searchStreams(
299297
term,
300298
permissionFilter,
301299
orderBy,
302-
this.theGraphClient,
303-
this.logger,
304-
client)
300+
this.theGraphClient)
301+
for await (const item of queryResult) {
302+
const id = toStreamID(item.stream.id)
303+
this.populateMetadataCache(id, parseMetadata(item.stream.metadata))
304+
yield id
305+
}
305306
}
306307

307308
getStreamPublishers(streamIdOrPath: string): AsyncIterable<UserID> {
@@ -539,6 +540,10 @@ export class StreamRegistry {
539540
return this.hasPublicSubscribePermission_cached.get(streamId)
540541
}
541542

543+
populateMetadataCache(streamId: StreamID, metadata: StreamMetadata): void {
544+
this.getStreamMetadata_cached.set([streamId], metadata)
545+
}
546+
542547
invalidateMetadataCache(streamId: StreamID): void {
543548
this.logger.trace('Clear metadata cache for stream', { streamId })
544549
this.getStreamMetadata_cached.invalidate((s) => s.startsWith(formCacheKeyPrefix(streamId)))

packages/sdk/src/contracts/searchStreams.ts

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
import { ChangeFieldType, GraphQLQuery, HexString, Logger, TheGraphClient, toStreamID, toUserId, UserID } from '@streamr/utils'
2-
import { Stream } from '../Stream'
1+
import { ChangeFieldType, GraphQLQuery, HexString, TheGraphClient, toUserId, UserID } from '@streamr/utils'
32
import { ChainPermissions, convertChainPermissionsToStreamPermissions, PUBLIC_PERMISSION_USER_ID, StreamPermission } from '../permission'
4-
import { filter, map, unique } from '../utils/GeneratorUtils'
3+
import { filter, unique } from '../utils/GeneratorUtils'
54
import { StreamQueryResult } from './StreamRegistry'
6-
import { parseMetadata } from '../StreamMetadata'
7-
import { StreamrClient } from '../StreamrClient'
85

96
export interface SearchStreamsPermissionFilter {
107
userId: HexString
@@ -35,25 +32,7 @@ export const toInternalSearchStreamsPermissionFilter = (filter: SearchStreamsPer
3532
}
3633
}
3734

38-
export const searchStreams = (
39-
term: string | undefined,
40-
permissionFilter: InternalSearchStreamsPermissionFilter | undefined,
41-
orderBy: SearchStreamsOrderBy,
42-
theGraphClient: TheGraphClient,
43-
logger: Logger,
44-
client: StreamrClient
45-
): AsyncGenerator<Stream> => {
46-
if ((term === undefined) && (permissionFilter === undefined)) {
47-
throw new Error('Requires a search term or a permission filter')
48-
}
49-
logger.debug('Search for streams', { term, permissionFilter })
50-
return map(
51-
fetchSearchStreamsResultFromTheGraph(term, permissionFilter, orderBy, theGraphClient),
52-
(item: SearchStreamsResultItem) => new Stream(toStreamID(item.stream.id), parseMetadata(item.stream.metadata), client)
53-
)
54-
}
55-
56-
async function* fetchSearchStreamsResultFromTheGraph(
35+
export async function* searchStreams(
5736
term: string | undefined,
5837
permissionFilter: InternalSearchStreamsPermissionFilter | undefined,
5938
orderBy: SearchStreamsOrderBy,

packages/sdk/src/utils/CachingMap.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@ import { MapKey } from '@streamr/utils'
22
import pMemoize from 'p-memoize'
33
import LRU from '../../vendor/quick-lru'
44

5+
interface Options<P, K> {
6+
maxSize: number
7+
maxAge: number
8+
cacheKey: (args: P) => K
9+
}
10+
511
/**
612
* Caches into a LRU cache capped at options.maxSize. See documentation for mem/p-memoize.
713
* Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.invalidate() is called.
@@ -18,14 +24,11 @@ export class CachingMap<K extends MapKey, V, P extends any[]> {
1824

1925
private readonly cachedFn: (...args: P) => Promise<V>
2026
private readonly cache: LRU<K, { data: V, maxAge: number }>
27+
private readonly opts: Options<P, K>
2128

2229
constructor(
2330
asyncFn: (...args: P) => Promise<V>,
24-
opts: {
25-
maxSize: number
26-
maxAge: number
27-
cacheKey: (args: P) => K
28-
}
31+
opts: Options<P, K>
2932
) {
3033
this.cache = new LRU<K, { data: V, maxAge: number }>({
3134
maxSize: opts.maxSize,
@@ -36,12 +39,17 @@ export class CachingMap<K extends MapKey, V, P extends any[]> {
3639
cache: this.cache,
3740
cacheKey: opts.cacheKey
3841
})
42+
this.opts = opts
3943
}
4044

4145
get(...args: P): Promise<V> {
4246
return this.cachedFn(...args)
4347
}
4448

49+
set(args: P, value: V): void {
50+
this.cache.set(this.opts.cacheKey(args), { data: value, maxAge: this.opts.maxAge })
51+
}
52+
4553
invalidate(predicate: (key: K) => boolean): void {
4654
for (const key of this.cache.keys()) {
4755
if (predicate(key)) {

packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ export class FakeStreamRegistry implements Methods<StreamRegistry> {
167167
})
168168
}
169169

170+
// eslint-disable-next-line class-methods-use-this
171+
populateMetadataCache(): void {
172+
// no-op
173+
}
174+
170175
// eslint-disable-next-line class-methods-use-this
171176
invalidateMetadataCache(): void {
172177
// no-op
@@ -201,7 +206,7 @@ export class FakeStreamRegistry implements Methods<StreamRegistry> {
201206
}
202207

203208
// eslint-disable-next-line class-methods-use-this
204-
searchStreams(_term: string | undefined, _permissionFilter: InternalSearchStreamsPermissionFilter | undefined): AsyncIterable<Stream> {
209+
searchStreams(_term: string | undefined, _permissionFilter: InternalSearchStreamsPermissionFilter | undefined): AsyncGenerator<StreamID> {
205210
throw new Error('not implemented')
206211
}
207212

packages/sdk/test/unit/Stream.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ describe('Stream', () => {
66

77
it('getPartitionCount', () => {
88
const stream = new Stream(
9-
undefined as any,
109
undefined as any,
1110
{
1211
getStreamMetadata: async () => ({ partitions: 150 })

packages/sdk/test/unit/searchStreams.test.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import 'reflect-metadata'
33
import { randomEthereumAddress } from '@streamr/test-utils'
44
import { StreamID, TheGraphClient, collect, toStreamID } from '@streamr/utils'
55
import { SearchStreamsResultItem, searchStreams } from '../../src/contracts/searchStreams'
6-
import { mockLoggerFactory } from '../test-utils/utils'
76

87
const MOCK_USER = randomEthereumAddress()
98

@@ -44,9 +43,7 @@ describe('searchStreams', () => {
4443
'/',
4544
undefined,
4645
orderBy,
47-
theGraphClient as any,
48-
mockLoggerFactory().createLogger(module),
49-
undefined as any,
46+
theGraphClient as any
5047
))
5148

5249
const graphQLquery = ((theGraphClient as any).queryEntities as jest.Mock).mock.calls[0][0]()

0 commit comments

Comments
 (0)