Skip to content

Commit f593f7c

Browse files
authored
refactor(sdk): StreamRegistry cache invalidation (#2885)
Refactor `StreamRegistry` caching: - added private `populateMetadataCache()` method to populate the cache - added `invalidateCache()` helper function to simplify cache invalidation - renamed cache fields to be nouns instead of method verbs ## Small functionality changes - The metadata is in cache after `setStreamMetadata()` and `createStream()` calls (we can easily populate the cache as the metadata is a method parameter)
1 parent dce1fc2 commit f593f7c

File tree

3 files changed

+37
-30
lines changed

3 files changed

+37
-30
lines changed

packages/sdk/src/contracts/StreamRegistry.ts

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ const formCacheKeyPrefix = (streamId: StreamID): string => {
108108
return `${streamId}|`
109109
}
110110

111+
const invalidateCache = (cache: CachingMap<string, any, any>, streamId: StreamID): void => {
112+
const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId))
113+
cache.invalidate(matchTarget)
114+
}
115+
111116
@scoped(Lifecycle.ContainerScoped)
112117
export class StreamRegistry {
113118

@@ -121,10 +126,10 @@ export class StreamRegistry {
121126
private readonly config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>
122127
private readonly authentication: Authentication
123128
private readonly logger: Logger
124-
private readonly getStreamMetadata_cached: CachingMap<string, StreamMetadata, [StreamID]>
125-
private readonly isStreamPublisher_cached: CachingMap<string, boolean, [StreamID, UserID]>
126-
private readonly isStreamSubscriber_cached: CachingMap<string, boolean, [StreamID, UserID]>
127-
private readonly hasPublicSubscribePermission_cached: CachingMap<string, boolean, [StreamID]>
129+
private readonly metadataCache: CachingMap<string, StreamMetadata, [StreamID]>
130+
private readonly publisherCache: CachingMap<string, boolean, [StreamID, UserID]>
131+
private readonly subscriberCache: CachingMap<string, boolean, [StreamID, UserID]>
132+
private readonly publicSubscribePermissionCache: CachingMap<string, boolean, [StreamID]>
128133

129134
/** @internal */
130135
constructor(
@@ -165,25 +170,25 @@ export class StreamRegistry {
165170
}),
166171
loggerFactory
167172
})
168-
this.getStreamMetadata_cached = new CachingMap((streamId: StreamID) => {
173+
this.metadataCache = new CachingMap((streamId: StreamID) => {
169174
return this.getStreamMetadata_nonCached(streamId)
170175
}, {
171176
...config.cache,
172177
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
173178
})
174-
this.isStreamPublisher_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
179+
this.publisherCache = new CachingMap((streamId: StreamID, userId: UserID) => {
175180
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
176181
}, {
177182
...config.cache,
178183
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
179184
})
180-
this.isStreamSubscriber_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
185+
this.subscriberCache = new CachingMap((streamId: StreamID, userId: UserID) => {
181186
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
182187
}, {
183188
...config.cache,
184189
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
185190
})
186-
this.hasPublicSubscribePermission_cached = new CachingMap((streamId: StreamID) => {
191+
this.publicSubscribePermissionCache = new CachingMap((streamId: StreamID) => {
187192
return this.hasPermission({
188193
streamId,
189194
public: true,
@@ -241,6 +246,7 @@ export class StreamRegistry {
241246
await this.ensureStreamIdInNamespaceOfAuthenticatedUser(domain, streamId)
242247
await waitForTx(this.streamRegistryContract!.createStream(path, JSON.stringify(metadata), ethersOverrides))
243248
}
249+
this.populateMetadataCache(streamId, metadata)
244250
}
245251

246252
private async ensureStreamIdInNamespaceOfAuthenticatedUser(address: EthereumAddress, streamId: StreamID): Promise<void> {
@@ -258,7 +264,7 @@ export class StreamRegistry {
258264
JSON.stringify(metadata),
259265
ethersOverrides
260266
))
261-
this.invalidateMetadataCache(streamId)
267+
this.populateMetadataCache(streamId, metadata)
262268
}
263269

264270
async deleteStream(streamIdOrPath: string): Promise<void> {
@@ -269,7 +275,7 @@ export class StreamRegistry {
269275
streamId,
270276
ethersOverrides
271277
))
272-
this.invalidateMetadataCache(streamId)
278+
invalidateCache(this.metadataCache, streamId)
273279
this.invalidatePermissionCaches(streamId)
274280
}
275281

@@ -512,31 +518,29 @@ export class StreamRegistry {
512518
// --------------------------------------------------------------------------------------------
513519

514520
getStreamMetadata(streamId: StreamID): Promise<StreamMetadata> {
515-
return this.getStreamMetadata_cached.get(streamId)
521+
return this.metadataCache.get(streamId)
516522
}
517523

518524
isStreamPublisher(streamId: StreamID, userId: UserID): Promise<boolean> {
519-
return this.isStreamPublisher_cached.get(streamId, userId)
525+
return this.publisherCache.get(streamId, userId)
520526
}
521527

522528
isStreamSubscriber(streamId: StreamID, userId: UserID): Promise<boolean> {
523-
return this.isStreamSubscriber_cached.get(streamId, userId)
529+
return this.subscriberCache.get(streamId, userId)
524530
}
525531

526532
hasPublicSubscribePermission(streamId: StreamID): Promise<boolean> {
527-
return this.hasPublicSubscribePermission_cached.get(streamId)
533+
return this.publicSubscribePermissionCache.get(streamId)
528534
}
529535

530-
invalidateMetadataCache(streamId: StreamID): void {
531-
this.logger.trace('Clear metadata cache for stream', { streamId })
532-
this.getStreamMetadata_cached.invalidate((s) => s.startsWith(formCacheKeyPrefix(streamId)))
536+
private populateMetadataCache(streamId: StreamID, metadata: StreamMetadata): void {
537+
this.metadataCache.set([streamId], metadata)
533538
}
534539

535540
invalidatePermissionCaches(streamId: StreamID): void {
536541
this.logger.trace('Clear permission caches for stream', { streamId })
537-
const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId))
538-
this.isStreamPublisher_cached.invalidate(matchTarget)
539-
this.isStreamSubscriber_cached.invalidate(matchTarget)
542+
invalidateCache(this.publisherCache, streamId)
543+
invalidateCache(this.subscriberCache, streamId)
540544
// TODO should also clear cache for hasPublicSubscribePermission?
541545
}
542546
}

packages/sdk/src/utils/CachingMap.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@ import { MapKey } from '@streamr/utils'
22
import pMemoize from 'p-memoize'
33
import LRU from '../../vendor/quick-lru'
44

5+
interface Options<P, K> {
6+
maxSize: number
7+
maxAge: number
8+
cacheKey: (args: P) => K
9+
}
10+
511
/**
612
* Caches into a LRU cache capped at options.maxSize. See documentation for mem/p-memoize.
713
* Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.invalidate() is called.
@@ -18,14 +24,11 @@ export class CachingMap<K extends MapKey, V, P extends any[]> {
1824

1925
private readonly cachedFn: (...args: P) => Promise<V>
2026
private readonly cache: LRU<K, { data: V, maxAge: number }>
27+
private readonly opts: Options<P, K>
2128

2229
constructor(
2330
asyncFn: (...args: P) => Promise<V>,
24-
opts: {
25-
maxSize: number
26-
maxAge: number
27-
cacheKey: (args: P) => K
28-
}
31+
opts: Options<P, K>
2932
) {
3033
this.cache = new LRU<K, { data: V, maxAge: number }>({
3134
maxSize: opts.maxSize,
@@ -36,12 +39,17 @@ export class CachingMap<K extends MapKey, V, P extends any[]> {
3639
cache: this.cache,
3740
cacheKey: opts.cacheKey
3841
})
42+
this.opts = opts
3943
}
4044

4145
get(...args: P): Promise<V> {
4246
return this.cachedFn(...args)
4347
}
4448

49+
set(args: P, value: V): void {
50+
this.cache.set(this.opts.cacheKey(args), { data: value, maxAge: this.opts.maxAge })
51+
}
52+
4553
invalidate(predicate: (key: K) => boolean): void {
4654
for (const key of this.cache.keys()) {
4755
if (predicate(key)) {

packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,6 @@ export class FakeStreamRegistry implements Methods<StreamRegistry> {
167167
})
168168
}
169169

170-
// eslint-disable-next-line class-methods-use-this
171-
invalidateMetadataCache(): void {
172-
// no-op
173-
}
174-
175170
// eslint-disable-next-line class-methods-use-this
176171
invalidatePermissionCaches(): void {
177172
// no-op

0 commit comments

Comments
 (0)