Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import {
} from '../permission'
import { filter, map } from '../utils/GeneratorUtils'
import { LoggerFactory } from '../utils/LoggerFactory'
import { CacheAsyncFn, CacheAsyncFnType } from '../utils/CacheAsyncFn'
import { CachingMap } from '../utils/CachingMap'
import { until } from '../utils/promises'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
Expand Down Expand Up @@ -119,10 +119,10 @@ export class StreamRegistry {
private readonly config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>
private readonly authentication: Authentication
private readonly logger: Logger
private readonly getStreamMetadata_cached: CacheAsyncFnType<[StreamID], StreamMetadata, string>
private readonly isStreamPublisher_cached: CacheAsyncFnType<[StreamID, UserID], boolean, string>
private readonly isStreamSubscriber_cached: CacheAsyncFnType<[StreamID, UserID], boolean, string>
private readonly hasPublicSubscribePermission_cached: CacheAsyncFnType<[StreamID], boolean, string>
private readonly getStreamMetadata_cached: CachingMap<string, StreamMetadata, [StreamID]>
private readonly isStreamPublisher_cached: CachingMap<string, boolean, [StreamID, UserID]>
private readonly isStreamSubscriber_cached: CachingMap<string, boolean, [StreamID, UserID]>
private readonly hasPublicSubscribePermission_cached: CachingMap<string, boolean, [StreamID]>

/** @internal */
constructor(
Expand Down Expand Up @@ -163,31 +163,31 @@ export class StreamRegistry {
}),
loggerFactory
})
this.getStreamMetadata_cached = CacheAsyncFn((streamId: StreamID) => {
this.getStreamMetadata_cached = new CachingMap((streamId: StreamID) => {
return this.getStreamMetadata_nonCached(streamId)
}, {
...config.cache,
cacheKey: ([streamId]): string => {
return `${streamId}${CACHE_KEY_SEPARATOR}`
}
})
this.isStreamPublisher_cached = CacheAsyncFn((streamId: StreamID, userId: UserID) => {
this.isStreamPublisher_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisher(streamId, userId, false)
}, {
...config.cache,
cacheKey([streamId, userId]): string {
return [streamId, userId].join(CACHE_KEY_SEPARATOR)
}
})
this.isStreamSubscriber_cached = CacheAsyncFn((streamId: StreamID, userId: UserID) => {
this.isStreamSubscriber_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamSubscriber(streamId, userId, false)
}, {
...config.cache,
cacheKey([streamId, userId]): string {
return [streamId, userId].join(CACHE_KEY_SEPARATOR)
}
})
this.hasPublicSubscribePermission_cached = CacheAsyncFn((streamId: StreamID) => {
this.hasPublicSubscribePermission_cached = new CachingMap((streamId: StreamID) => {
return this.hasPermission({
streamId,
public: true,
Expand Down Expand Up @@ -264,7 +264,7 @@ export class StreamRegistry {
JSON.stringify(metadata),
ethersOverrides
))
this.clearStreamCache(streamId)
this.invalidateStreamCache(streamId)
}

async deleteStream(streamIdOrPath: string): Promise<void> {
Expand All @@ -275,7 +275,7 @@ export class StreamRegistry {
streamId,
ethersOverrides
))
this.clearStreamCache(streamId)
this.invalidateStreamCache(streamId)
}

private async streamExistsOnChain(streamIdOrPath: string): Promise<boolean> {
Expand Down Expand Up @@ -462,7 +462,7 @@ export class StreamRegistry {
...assignments: InternalPermissionAssignment[]
): Promise<void> {
const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath)
this.clearStreamCache(streamId)
this.invalidateStreamCache(streamId)
await this.connectToContract()
for (const assignment of assignments) {
for (const permission of assignment.permissions) {
Expand All @@ -484,7 +484,7 @@ export class StreamRegistry {
for (const item of items) {
validatePermissionAssignments(item.assignments)
const streamId = await this.streamIdBuilder.toStreamID(item.streamId)
this.clearStreamCache(streamId)
this.invalidateStreamCache(streamId)
streamIds.push(streamId)
targets.push(item.assignments.map((assignment) => {
return isPublicPermissionAssignment(assignment) ? PUBLIC_PERMISSION_USER_ID : assignment.userId
Expand Down Expand Up @@ -518,40 +518,40 @@ export class StreamRegistry {

getStreamMetadata(streamId: StreamID, useCache = true): Promise<StreamMetadata> {
if (useCache) {
return this.getStreamMetadata_cached(streamId)
return this.getStreamMetadata_cached.get(streamId)
} else {
return this.getStreamMetadata_nonCached(streamId)
}
}

isStreamPublisher(streamId: StreamID, userId: UserID, useCache = true): Promise<boolean> {
if (useCache) {
return this.isStreamPublisher_cached(streamId, userId)
return this.isStreamPublisher_cached.get(streamId, userId)
} else {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
}
}

isStreamSubscriber(streamId: StreamID, userId: UserID, useCache = true): Promise<boolean> {
if (useCache) {
return this.isStreamSubscriber_cached(streamId, userId)
return this.isStreamSubscriber_cached.get(streamId, userId)
} else {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
}
}

hasPublicSubscribePermission(streamId: StreamID): Promise<boolean> {
return this.hasPublicSubscribePermission_cached(streamId)
return this.hasPublicSubscribePermission_cached.get(streamId)
}

clearStreamCache(streamId: StreamID): void {
invalidateStreamCache(streamId: StreamID): void {
this.logger.debug('Clear caches matching stream', { streamId })
// include separator so startsWith(streamid) doesn't match streamid-something
const target = `${streamId}${CACHE_KEY_SEPARATOR}`
const matchTarget = (s: string) => s.startsWith(target)
this.getStreamMetadata_cached.clearMatching(matchTarget)
this.isStreamPublisher_cached.clearMatching(matchTarget)
this.isStreamSubscriber_cached.clearMatching(matchTarget)
this.getStreamMetadata_cached.invalidate(matchTarget)
this.isStreamPublisher_cached.invalidate(matchTarget)
this.isStreamSubscriber_cached.invalidate(matchTarget)
// TODO should also clear cache for hasPublicSubscribePermission?
}
}
6 changes: 3 additions & 3 deletions packages/sdk/src/publish/MessageFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { createMessageRef, createRandomMsgChainId } from './messageChain'
export interface MessageFactoryOptions {
streamId: StreamID
authentication: Authentication
streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'clearStreamCache'>
streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidateStreamCache'>
groupKeyQueue: GroupKeyQueue
signatureValidator: SignatureValidator
messageSigner: MessageSigner
Expand All @@ -40,7 +40,7 @@ export class MessageFactory {
private readonly defaultMessageChainIds: Mapping<[partition: number], string>
private readonly prevMsgRefs: Map<string, MessageRef> = new Map()
// eslint-disable-next-line max-len
private readonly streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'clearStreamCache'>
private readonly streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidateStreamCache'>
private readonly groupKeyQueue: GroupKeyQueue
private readonly signatureValidator: SignatureValidator
private readonly messageSigner: MessageSigner
Expand All @@ -66,7 +66,7 @@ export class MessageFactory {
const publisherId = await this.getPublisherId(metadata)
const isPublisher = await this.streamRegistry.isStreamPublisher(this.streamId, publisherId)
if (!isPublisher) {
this.streamRegistry.clearStreamCache(this.streamId)
this.streamRegistry.invalidateStreamCache(this.streamId)
throw new StreamrClientError(`You don't have permission to publish to this stream. Using address: ${publisherId}`, 'MISSING_PERMISSION')
}

Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/subscribe/messagePipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin
// TODO log this in onError? if we want to log all errors?
logger.debug('Failed to decrypt', { messageId: msg.messageId, err })
// clear cached permissions if cannot decrypt, likely permissions need updating
opts.streamRegistry.clearStreamCache(msg.getStreamId())
opts.streamRegistry.invalidateStreamCache(msg.getStreamId())
throw err
}
} else {
Expand Down
11 changes: 6 additions & 5 deletions packages/sdk/src/subscribe/ordering/OrderMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { StrictStreamrClientConfig } from '../../Config'
import { StreamMessage } from '../../protocol/StreamMessage'
import { Mapping } from '../../utils/Mapping'
import { PushBuffer } from '../../utils/PushBuffer'
import { CacheAsyncFn } from '../../utils/CacheAsyncFn'
import { CachingMap } from '../../utils/CachingMap'
import { Resends } from '../Resends'
import { GapFiller } from './GapFiller'
import { Gap, OrderedMessageChain, OrderedMessageChainContext } from './OrderedMessageChain'
Expand Down Expand Up @@ -38,13 +38,14 @@ const createMessageChain = (
}
const chain = new OrderedMessageChain(context, abortSignal)
chain.on('unfillableGap', (gap: Gap) => onUnfillableGap(gap))
// TODO maybe caching should be configurable, i.e. use client's config.cache instead of the constant
// - maybe the caching should be done at application level, e.g. with a new CacheStreamStorageRegistry class?
// - also note that this is a cache which contains just one item (as streamPartId always the same)
const storageNodeCache = new CachingMap(() => getStorageNodes(StreamPartIDUtils.getStreamID(context.streamPartId)), STORAGE_NODE_CACHE_OPTS)
const gapFiller = new GapFiller({
chain,
resend,
// TODO maybe caching should be configurable, i.e. use client's config.cache instead of the constant
// - maybe the caching should be done at application level, e.g. with a new CacheStreamStorageRegistry class?
// - also note that this is a cache which contains just one item (as streamPartId always the same)
getStorageNodeAddresses: CacheAsyncFn(() => getStorageNodes(StreamPartIDUtils.getStreamID(context.streamPartId)), STORAGE_NODE_CACHE_OPTS),
getStorageNodeAddresses: () => storageNodeCache.get(),
strategy: config.gapFillStrategy,
initialWaitTime: config.gapFillTimeout,
retryWaitTime: config.retryResendAfter,
Expand Down
54 changes: 0 additions & 54 deletions packages/sdk/src/utils/CacheAsyncFn.ts

This file was deleted.

52 changes: 52 additions & 0 deletions packages/sdk/src/utils/CachingMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { MapKey } from '@streamr/utils'
import pMemoize from 'p-memoize'
import LRU from '../../vendor/quick-lru'

/**
* Caches into a LRU cache capped at options.maxSize. See documentation for mem/p-memoize.
* Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.invalidate() is called.
* Won't cache rejections.
*
* ```js
* const cache = new CachingMap(asyncFn, opts)
* await cache.get(key)
* await cache.get(key)
* cache.invalidate(() => ...)
* ```
*/
export class CachingMap<K extends MapKey, V, P extends any[]> {

private readonly cachedFn: (...args: P) => Promise<V>
private readonly cache: LRU<K, { data: V, maxAge: number }>

constructor(
asyncFn: (...args: P) => Promise<V>,
opts: {
maxSize: number
maxAge: number
cacheKey: (args: P) => K
}
) {
this.cache = new LRU<K, { data: V, maxAge: number }>({
maxSize: opts.maxSize,
maxAge: opts.maxAge
})
this.cachedFn = pMemoize(asyncFn, {
cachePromiseRejection: false,
cache: this.cache,
cacheKey: opts.cacheKey
})
}

get(...args: P): Promise<V> {
return this.cachedFn(...args)
}

invalidate(predicate: (key: K) => boolean): void {
for (const key of this.cache.keys()) {
if (predicate(key)) {
this.cache.delete(key)
}
}
}
}
2 changes: 1 addition & 1 deletion packages/sdk/src/utils/addStreamToStorageNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export const addStreamToStorageNode = async (
'storage node did not respond'
)
} finally {
streamRegistry.clearStreamCache(streamId)
streamRegistry.invalidateStreamCache(streamId)
await assignmentSubscription?.unsubscribe() // should never reject...
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export class FakeStreamRegistry implements Methods<StreamRegistry> {
}

// eslint-disable-next-line class-methods-use-this
clearStreamCache(): void {
invalidateStreamCache(): void {
// no-op
}

Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/test-utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ export const createStreamRegistry = (opts?: {
isStreamSubscriber: async () => {
return opts?.isStreamSubscriber ?? true
},
clearStreamCache: () => {}
invalidateStreamCache: () => {}
} as any
}

Expand Down
Loading