Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 35 additions & 34 deletions packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ const formCacheKeyPrefix = (streamId: StreamID): string => {
return `${streamId}|`
}

const invalidateCache = (cache: CachingMap<string, any, any>, streamId: StreamID) => {
const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId))
cache.invalidate(matchTarget)
}

@scoped(Lifecycle.ContainerScoped)
export class StreamRegistry {

Expand All @@ -121,10 +126,10 @@ export class StreamRegistry {
private readonly config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>
private readonly authentication: Authentication
private readonly logger: Logger
private readonly getStreamMetadata_cached: CachingMap<string, StreamMetadata, [StreamID]>
private readonly isStreamPublisher_cached: CachingMap<string, boolean, [StreamID, UserID]>
private readonly isStreamSubscriber_cached: CachingMap<string, boolean, [StreamID, UserID]>
private readonly hasPublicSubscribePermission_cached: CachingMap<string, boolean, [StreamID]>
private readonly metadataCache: CachingMap<string, StreamMetadata, [StreamID]>
private readonly publisherCache: CachingMap<string, boolean, [StreamID, UserID]>
private readonly subscriberCache: CachingMap<string, boolean, [StreamID, UserID]>
private readonly publicSubscribePermissionCache: CachingMap<string, boolean, [StreamID]>

/** @internal */
constructor(
Expand Down Expand Up @@ -165,25 +170,25 @@ export class StreamRegistry {
}),
loggerFactory
})
this.getStreamMetadata_cached = new CachingMap((streamId: StreamID) => {
this.metadataCache = new CachingMap((streamId: StreamID) => {
return this.getStreamMetadata_nonCached(streamId)
}, {
...config.cache,
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
})
this.isStreamPublisher_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisher(streamId, userId, false)
this.publisherCache = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
}, {
...config.cache,
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
})
this.isStreamSubscriber_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamSubscriber(streamId, userId, false)
this.subscriberCache = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
}, {
...config.cache,
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
})
this.hasPublicSubscribePermission_cached = new CachingMap((streamId: StreamID) => {
this.publicSubscribePermissionCache = new CachingMap((streamId: StreamID) => {
return this.hasPermission({
streamId,
public: true,
Expand Down Expand Up @@ -241,6 +246,7 @@ export class StreamRegistry {
await this.ensureStreamIdInNamespaceOfAuthenticatedUser(domain, streamId)
await waitForTx(this.streamRegistryContract!.createStream(path, JSON.stringify(metadata), ethersOverrides))
}
this.populateMetadataCache(streamId, metadata)
}

private async ensureStreamIdInNamespaceOfAuthenticatedUser(address: EthereumAddress, streamId: StreamID): Promise<void> {
Expand All @@ -258,7 +264,7 @@ export class StreamRegistry {
JSON.stringify(metadata),
ethersOverrides
))
this.invalidateMetadataCache(streamId)
this.populateMetadataCache(streamId, metadata)
}

async deleteStream(streamIdOrPath: string): Promise<void> {
Expand All @@ -269,7 +275,7 @@ export class StreamRegistry {
streamId,
ethersOverrides
))
this.invalidateMetadataCache(streamId)
invalidateCache(this.metadataCache, streamId)
this.invalidatePermissionCaches(streamId)
}

Expand Down Expand Up @@ -511,44 +517,39 @@ export class StreamRegistry {
// Caching
// --------------------------------------------------------------------------------------------

getStreamMetadata(streamId: StreamID, useCache = true): Promise<StreamMetadata> {
if (useCache) {
return this.getStreamMetadata_cached.get(streamId)
} else {
return this.getStreamMetadata_nonCached(streamId)
async getStreamMetadata(streamId: StreamID, allowCached = true): Promise<StreamMetadata> {
if (!allowCached) {
invalidateCache(this.metadataCache, streamId)
}
return this.metadataCache.get(streamId)
}

isStreamPublisher(streamId: StreamID, userId: UserID, useCache = true): Promise<boolean> {
if (useCache) {
return this.isStreamPublisher_cached.get(streamId, userId)
} else {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
isStreamPublisher(streamId: StreamID, userId: UserID, allowCached = true): Promise<boolean> {
if (allowCached) {
invalidateCache(this.publisherCache, streamId)
}
return this.publisherCache.get(streamId, userId)
}

isStreamSubscriber(streamId: StreamID, userId: UserID, useCache = true): Promise<boolean> {
if (useCache) {
return this.isStreamSubscriber_cached.get(streamId, userId)
} else {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
isStreamSubscriber(streamId: StreamID, userId: UserID, allowCached = true): Promise<boolean> {
if (allowCached) {
invalidateCache(this.subscriberCache, streamId)
}
return this.subscriberCache.get(streamId, userId)
}

hasPublicSubscribePermission(streamId: StreamID): Promise<boolean> {
return this.hasPublicSubscribePermission_cached.get(streamId)
return this.publicSubscribePermissionCache.get(streamId)
}

invalidateMetadataCache(streamId: StreamID): void {
this.logger.trace('Clear metadata cache for stream', { streamId })
this.getStreamMetadata_cached.invalidate((s) => s.startsWith(formCacheKeyPrefix(streamId)))
private populateMetadataCache(streamId: StreamID, metadata: StreamMetadata): void {
this.metadataCache.set([streamId], metadata)
}

invalidatePermissionCaches(streamId: StreamID): void {
this.logger.trace('Clear permission caches for stream', { streamId })
const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId))
this.isStreamPublisher_cached.invalidate(matchTarget)
this.isStreamSubscriber_cached.invalidate(matchTarget)
invalidateCache(this.publisherCache, streamId)
invalidateCache(this.subscriberCache, streamId)
// TODO should also clear cache for hasPublicSubscribePermission?
}
}
18 changes: 13 additions & 5 deletions packages/sdk/src/utils/CachingMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ import { MapKey } from '@streamr/utils'
import pMemoize from 'p-memoize'
import LRU from '../../vendor/quick-lru'

interface Options<P, K> {
maxSize: number
maxAge: number
cacheKey: (args: P) => K
}

/**
* Caches into a LRU cache capped at options.maxSize. See documentation for mem/p-memoize.
* Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.invalidate() is called.
Expand All @@ -18,14 +24,11 @@ export class CachingMap<K extends MapKey, V, P extends any[]> {

private readonly cachedFn: (...args: P) => Promise<V>
private readonly cache: LRU<K, { data: V, maxAge: number }>
private readonly opts: Options<P, K>

constructor(
asyncFn: (...args: P) => Promise<V>,
opts: {
maxSize: number
maxAge: number
cacheKey: (args: P) => K
}
opts: Options<P, K>
) {
this.cache = new LRU<K, { data: V, maxAge: number }>({
maxSize: opts.maxSize,
Expand All @@ -36,12 +39,17 @@ export class CachingMap<K extends MapKey, V, P extends any[]> {
cache: this.cache,
cacheKey: opts.cacheKey
})
this.opts = opts
}

get(...args: P): Promise<V> {
return this.cachedFn(...args)
}

set(args: P, value: V): void {
this.cache.set(this.opts.cacheKey(args), { data: value, maxAge: this.opts.maxAge })
}

invalidate(predicate: (key: K) => boolean): void {
for (const key of this.cache.keys()) {
if (predicate(key)) {
Expand Down
5 changes: 0 additions & 5 deletions packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,6 @@ export class FakeStreamRegistry implements Methods<StreamRegistry> {
})
}

// eslint-disable-next-line class-methods-use-this
invalidateMetadataCache(): void {
// no-op
}

// eslint-disable-next-line class-methods-use-this
invalidatePermissionCaches(): void {
// no-op
Expand Down