diff --git a/packages/sdk/src/contracts/StreamRegistry.ts b/packages/sdk/src/contracts/StreamRegistry.ts index 2a3aca108c..8652a22e38 100644 --- a/packages/sdk/src/contracts/StreamRegistry.ts +++ b/packages/sdk/src/contracts/StreamRegistry.ts @@ -108,6 +108,11 @@ const formCacheKeyPrefix = (streamId: StreamID): string => { return `${streamId}|` } +const invalidateCache = (cache: CachingMap, streamId: StreamID): void => { + const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId)) + cache.invalidate(matchTarget) +} + @scoped(Lifecycle.ContainerScoped) export class StreamRegistry { @@ -121,10 +126,10 @@ export class StreamRegistry { private readonly config: Pick private readonly authentication: Authentication private readonly logger: Logger - private readonly getStreamMetadata_cached: CachingMap - private readonly isStreamPublisher_cached: CachingMap - private readonly isStreamSubscriber_cached: CachingMap - private readonly hasPublicSubscribePermission_cached: CachingMap + private readonly metadataCache: CachingMap + private readonly publisherCache: CachingMap + private readonly subscriberCache: CachingMap + private readonly publicSubscribePermissionCache: CachingMap /** @internal */ constructor( @@ -165,25 +170,25 @@ export class StreamRegistry { }), loggerFactory }) - this.getStreamMetadata_cached = new CachingMap((streamId: StreamID) => { + this.metadataCache = new CachingMap((streamId: StreamID) => { return this.getStreamMetadata_nonCached(streamId) }, { ...config.cache, cacheKey: ([streamId]) => formCacheKeyPrefix(streamId) }) - this.isStreamPublisher_cached = new CachingMap((streamId: StreamID, userId: UserID) => { + this.publisherCache = new CachingMap((streamId: StreamID, userId: UserID) => { return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH) }, { ...config.cache, cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}` }) - this.isStreamSubscriber_cached = new CachingMap((streamId: StreamID, userId: UserID) => { + this.subscriberCache = new CachingMap((streamId: StreamID, userId: UserID) => { return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE) }, { ...config.cache, cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}` }) - this.hasPublicSubscribePermission_cached = new CachingMap((streamId: StreamID) => { + this.publicSubscribePermissionCache = new CachingMap((streamId: StreamID) => { return this.hasPermission({ streamId, public: true, @@ -241,6 +246,7 @@ export class StreamRegistry { await this.ensureStreamIdInNamespaceOfAuthenticatedUser(domain, streamId) await waitForTx(this.streamRegistryContract!.createStream(path, JSON.stringify(metadata), ethersOverrides)) } + this.populateMetadataCache(streamId, metadata) } private async ensureStreamIdInNamespaceOfAuthenticatedUser(address: EthereumAddress, streamId: StreamID): Promise { @@ -258,7 +264,7 @@ export class StreamRegistry { JSON.stringify(metadata), ethersOverrides )) - this.invalidateMetadataCache(streamId) + this.populateMetadataCache(streamId, metadata) } async deleteStream(streamIdOrPath: string): Promise { @@ -269,7 +275,7 @@ export class StreamRegistry { streamId, ethersOverrides )) - this.invalidateMetadataCache(streamId) + invalidateCache(this.metadataCache, streamId) this.invalidatePermissionCaches(streamId) } @@ -512,31 +518,29 @@ export class StreamRegistry { // -------------------------------------------------------------------------------------------- getStreamMetadata(streamId: StreamID): Promise { - return this.getStreamMetadata_cached.get(streamId) + return this.metadataCache.get(streamId) } isStreamPublisher(streamId: StreamID, userId: UserID): Promise { - return this.isStreamPublisher_cached.get(streamId, userId) + return this.publisherCache.get(streamId, userId) } isStreamSubscriber(streamId: StreamID, userId: UserID): Promise { - return this.isStreamSubscriber_cached.get(streamId, userId) + return this.subscriberCache.get(streamId, userId) } hasPublicSubscribePermission(streamId: StreamID): Promise { - return this.hasPublicSubscribePermission_cached.get(streamId) + return this.publicSubscribePermissionCache.get(streamId) } - invalidateMetadataCache(streamId: StreamID): void { - this.logger.trace('Clear metadata cache for stream', { streamId }) - this.getStreamMetadata_cached.invalidate((s) => s.startsWith(formCacheKeyPrefix(streamId))) + private populateMetadataCache(streamId: StreamID, metadata: StreamMetadata): void { + this.metadataCache.set([streamId], metadata) } invalidatePermissionCaches(streamId: StreamID): void { this.logger.trace('Clear permission caches for stream', { streamId }) - const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId)) - this.isStreamPublisher_cached.invalidate(matchTarget) - this.isStreamSubscriber_cached.invalidate(matchTarget) + invalidateCache(this.publisherCache, streamId) + invalidateCache(this.subscriberCache, streamId) // TODO should also clear cache for hasPublicSubscribePermission? } } diff --git a/packages/sdk/src/utils/CachingMap.ts b/packages/sdk/src/utils/CachingMap.ts index 50a09a7575..8952e12f95 100644 --- a/packages/sdk/src/utils/CachingMap.ts +++ b/packages/sdk/src/utils/CachingMap.ts @@ -2,6 +2,12 @@ import { MapKey } from '@streamr/utils' import pMemoize from 'p-memoize' import LRU from '../../vendor/quick-lru' +interface Options { + maxSize: number + maxAge: number + cacheKey: (args: P) => K +} + /** * Caches into a LRU cache capped at options.maxSize. See documentation for mem/p-memoize. * Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.invalidate() is called. @@ -18,14 +24,11 @@ export class CachingMap { private readonly cachedFn: (...args: P) => Promise private readonly cache: LRU + private readonly opts: Options constructor( asyncFn: (...args: P) => Promise, - opts: { - maxSize: number - maxAge: number - cacheKey: (args: P) => K - } + opts: Options ) { this.cache = new LRU({ maxSize: opts.maxSize, @@ -36,12 +39,17 @@ export class CachingMap { cache: this.cache, cacheKey: opts.cacheKey }) + this.opts = opts } get(...args: P): Promise { return this.cachedFn(...args) } + set(args: P, value: V): void { + this.cache.set(this.opts.cacheKey(args), { data: value, maxAge: this.opts.maxAge }) + } + invalidate(predicate: (key: K) => boolean): void { for (const key of this.cache.keys()) { if (predicate(key)) { diff --git a/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts b/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts index cf7ab4395e..984f6df2f5 100644 --- a/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts +++ b/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts @@ -167,11 +167,6 @@ export class FakeStreamRegistry implements Methods { }) } - // eslint-disable-next-line class-methods-use-this - invalidateMetadataCache(): void { - // no-op - } - // eslint-disable-next-line class-methods-use-this invalidatePermissionCaches(): void { // no-op