Skip to content

Commit 75ce81c

Browse files
authored
refactor(sdk): Convert CacheAsyncFn to class (#2871)
Converted the utility function to a class and renamed it to `CachingMap`. Renamed `clearMatching()` to `invalidate()` and reordered generics. Also renamed `StreamRegistry#clearStreamCache()`. ## Future improvements Do we need multiple caching classes: `CachingMap`, `Mapping` and `Cache`?
1 parent 14599b8 commit 75ce81c

File tree

13 files changed

+155
-156
lines changed

13 files changed

+155
-156
lines changed

packages/sdk/src/contracts/StreamRegistry.ts

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import {
4242
} from '../permission'
4343
import { filter, map } from '../utils/GeneratorUtils'
4444
import { LoggerFactory } from '../utils/LoggerFactory'
45-
import { CacheAsyncFn, CacheAsyncFnType } from '../utils/CacheAsyncFn'
45+
import { CachingMap } from '../utils/CachingMap'
4646
import { until } from '../utils/promises'
4747
import { ChainEventPoller } from './ChainEventPoller'
4848
import { ContractFactory } from './ContractFactory'
@@ -119,10 +119,10 @@ export class StreamRegistry {
119119
private readonly config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>
120120
private readonly authentication: Authentication
121121
private readonly logger: Logger
122-
private readonly getStreamMetadata_cached: CacheAsyncFnType<[StreamID], StreamMetadata, string>
123-
private readonly isStreamPublisher_cached: CacheAsyncFnType<[StreamID, UserID], boolean, string>
124-
private readonly isStreamSubscriber_cached: CacheAsyncFnType<[StreamID, UserID], boolean, string>
125-
private readonly hasPublicSubscribePermission_cached: CacheAsyncFnType<[StreamID], boolean, string>
122+
private readonly getStreamMetadata_cached: CachingMap<string, StreamMetadata, [StreamID]>
123+
private readonly isStreamPublisher_cached: CachingMap<string, boolean, [StreamID, UserID]>
124+
private readonly isStreamSubscriber_cached: CachingMap<string, boolean, [StreamID, UserID]>
125+
private readonly hasPublicSubscribePermission_cached: CachingMap<string, boolean, [StreamID]>
126126

127127
/** @internal */
128128
constructor(
@@ -163,31 +163,31 @@ export class StreamRegistry {
163163
}),
164164
loggerFactory
165165
})
166-
this.getStreamMetadata_cached = CacheAsyncFn((streamId: StreamID) => {
166+
this.getStreamMetadata_cached = new CachingMap((streamId: StreamID) => {
167167
return this.getStreamMetadata_nonCached(streamId)
168168
}, {
169169
...config.cache,
170170
cacheKey: ([streamId]): string => {
171171
return `${streamId}${CACHE_KEY_SEPARATOR}`
172172
}
173173
})
174-
this.isStreamPublisher_cached = CacheAsyncFn((streamId: StreamID, userId: UserID) => {
174+
this.isStreamPublisher_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
175175
return this.isStreamPublisher(streamId, userId, false)
176176
}, {
177177
...config.cache,
178178
cacheKey([streamId, userId]): string {
179179
return [streamId, userId].join(CACHE_KEY_SEPARATOR)
180180
}
181181
})
182-
this.isStreamSubscriber_cached = CacheAsyncFn((streamId: StreamID, userId: UserID) => {
182+
this.isStreamSubscriber_cached = new CachingMap((streamId: StreamID, userId: UserID) => {
183183
return this.isStreamSubscriber(streamId, userId, false)
184184
}, {
185185
...config.cache,
186186
cacheKey([streamId, userId]): string {
187187
return [streamId, userId].join(CACHE_KEY_SEPARATOR)
188188
}
189189
})
190-
this.hasPublicSubscribePermission_cached = CacheAsyncFn((streamId: StreamID) => {
190+
this.hasPublicSubscribePermission_cached = new CachingMap((streamId: StreamID) => {
191191
return this.hasPermission({
192192
streamId,
193193
public: true,
@@ -264,7 +264,7 @@ export class StreamRegistry {
264264
JSON.stringify(metadata),
265265
ethersOverrides
266266
))
267-
this.clearStreamCache(streamId)
267+
this.invalidateStreamCache(streamId)
268268
}
269269

270270
async deleteStream(streamIdOrPath: string): Promise<void> {
@@ -275,7 +275,7 @@ export class StreamRegistry {
275275
streamId,
276276
ethersOverrides
277277
))
278-
this.clearStreamCache(streamId)
278+
this.invalidateStreamCache(streamId)
279279
}
280280

281281
private async streamExistsOnChain(streamIdOrPath: string): Promise<boolean> {
@@ -462,7 +462,7 @@ export class StreamRegistry {
462462
...assignments: InternalPermissionAssignment[]
463463
): Promise<void> {
464464
const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath)
465-
this.clearStreamCache(streamId)
465+
this.invalidateStreamCache(streamId)
466466
await this.connectToContract()
467467
for (const assignment of assignments) {
468468
for (const permission of assignment.permissions) {
@@ -484,7 +484,7 @@ export class StreamRegistry {
484484
for (const item of items) {
485485
validatePermissionAssignments(item.assignments)
486486
const streamId = await this.streamIdBuilder.toStreamID(item.streamId)
487-
this.clearStreamCache(streamId)
487+
this.invalidateStreamCache(streamId)
488488
streamIds.push(streamId)
489489
targets.push(item.assignments.map((assignment) => {
490490
return isPublicPermissionAssignment(assignment) ? PUBLIC_PERMISSION_USER_ID : assignment.userId
@@ -518,40 +518,40 @@ export class StreamRegistry {
518518

519519
getStreamMetadata(streamId: StreamID, useCache = true): Promise<StreamMetadata> {
520520
if (useCache) {
521-
return this.getStreamMetadata_cached(streamId)
521+
return this.getStreamMetadata_cached.get(streamId)
522522
} else {
523523
return this.getStreamMetadata_nonCached(streamId)
524524
}
525525
}
526526

527527
isStreamPublisher(streamId: StreamID, userId: UserID, useCache = true): Promise<boolean> {
528528
if (useCache) {
529-
return this.isStreamPublisher_cached(streamId, userId)
529+
return this.isStreamPublisher_cached.get(streamId, userId)
530530
} else {
531531
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
532532
}
533533
}
534534

535535
isStreamSubscriber(streamId: StreamID, userId: UserID, useCache = true): Promise<boolean> {
536536
if (useCache) {
537-
return this.isStreamSubscriber_cached(streamId, userId)
537+
return this.isStreamSubscriber_cached.get(streamId, userId)
538538
} else {
539539
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
540540
}
541541
}
542542

543543
hasPublicSubscribePermission(streamId: StreamID): Promise<boolean> {
544-
return this.hasPublicSubscribePermission_cached(streamId)
544+
return this.hasPublicSubscribePermission_cached.get(streamId)
545545
}
546546

547-
clearStreamCache(streamId: StreamID): void {
547+
invalidateStreamCache(streamId: StreamID): void {
548548
this.logger.debug('Clear caches matching stream', { streamId })
549549
// include separator so startsWith(streamid) doesn't match streamid-something
550550
const target = `${streamId}${CACHE_KEY_SEPARATOR}`
551551
const matchTarget = (s: string) => s.startsWith(target)
552-
this.getStreamMetadata_cached.clearMatching(matchTarget)
553-
this.isStreamPublisher_cached.clearMatching(matchTarget)
554-
this.isStreamSubscriber_cached.clearMatching(matchTarget)
552+
this.getStreamMetadata_cached.invalidate(matchTarget)
553+
this.isStreamPublisher_cached.invalidate(matchTarget)
554+
this.isStreamSubscriber_cached.invalidate(matchTarget)
555555
// TODO should also clear cache for hasPublicSubscribePermission?
556556
}
557557
}

packages/sdk/src/publish/MessageFactory.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import { createMessageRef, createRandomMsgChainId } from './messageChain'
2626
export interface MessageFactoryOptions {
2727
streamId: StreamID
2828
authentication: Authentication
29-
streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'clearStreamCache'>
29+
streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidateStreamCache'>
3030
groupKeyQueue: GroupKeyQueue
3131
signatureValidator: SignatureValidator
3232
messageSigner: MessageSigner
@@ -40,7 +40,7 @@ export class MessageFactory {
4040
private readonly defaultMessageChainIds: Mapping<[partition: number], string>
4141
private readonly prevMsgRefs: Map<string, MessageRef> = new Map()
4242
// eslint-disable-next-line max-len
43-
private readonly streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'clearStreamCache'>
43+
private readonly streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidateStreamCache'>
4444
private readonly groupKeyQueue: GroupKeyQueue
4545
private readonly signatureValidator: SignatureValidator
4646
private readonly messageSigner: MessageSigner
@@ -66,7 +66,7 @@ export class MessageFactory {
6666
const publisherId = await this.getPublisherId(metadata)
6767
const isPublisher = await this.streamRegistry.isStreamPublisher(this.streamId, publisherId)
6868
if (!isPublisher) {
69-
this.streamRegistry.clearStreamCache(this.streamId)
69+
this.streamRegistry.invalidateStreamCache(this.streamId)
7070
throw new StreamrClientError(`You don't have permission to publish to this stream. Using address: ${publisherId}`, 'MISSING_PERMISSION')
7171
}
7272

packages/sdk/src/subscribe/messagePipeline.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin
5757
// TODO log this in onError? if we want to log all errors?
5858
logger.debug('Failed to decrypt', { messageId: msg.messageId, err })
5959
// clear cached permissions if cannot decrypt, likely permissions need updating
60-
opts.streamRegistry.clearStreamCache(msg.getStreamId())
60+
opts.streamRegistry.invalidateStreamCache(msg.getStreamId())
6161
throw err
6262
}
6363
} else {

packages/sdk/src/subscribe/ordering/OrderMessages.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { StrictStreamrClientConfig } from '../../Config'
33
import { StreamMessage } from '../../protocol/StreamMessage'
44
import { Mapping } from '../../utils/Mapping'
55
import { PushBuffer } from '../../utils/PushBuffer'
6-
import { CacheAsyncFn } from '../../utils/CacheAsyncFn'
6+
import { CachingMap } from '../../utils/CachingMap'
77
import { Resends } from '../Resends'
88
import { GapFiller } from './GapFiller'
99
import { Gap, OrderedMessageChain, OrderedMessageChainContext } from './OrderedMessageChain'
@@ -38,13 +38,14 @@ const createMessageChain = (
3838
}
3939
const chain = new OrderedMessageChain(context, abortSignal)
4040
chain.on('unfillableGap', (gap: Gap) => onUnfillableGap(gap))
41+
// TODO maybe caching should be configurable, i.e. use client's config.cache instead of the constant
42+
// - maybe the caching should be done at application level, e.g. with a new CacheStreamStorageRegistry class?
43+
// - also note that this is a cache which contains just one item (as streamPartId always the same)
44+
const storageNodeCache = new CachingMap(() => getStorageNodes(StreamPartIDUtils.getStreamID(context.streamPartId)), STORAGE_NODE_CACHE_OPTS)
4145
const gapFiller = new GapFiller({
4246
chain,
4347
resend,
44-
// TODO maybe caching should be configurable, i.e. use client's config.cache instead of the constant
45-
// - maybe the caching should be done at application level, e.g. with a new CacheStreamStorageRegistry class?
46-
// - also note that this is a cache which contains just one item (as streamPartId always the same)
47-
getStorageNodeAddresses: CacheAsyncFn(() => getStorageNodes(StreamPartIDUtils.getStreamID(context.streamPartId)), STORAGE_NODE_CACHE_OPTS),
48+
getStorageNodeAddresses: () => storageNodeCache.get(),
4849
strategy: config.gapFillStrategy,
4950
initialWaitTime: config.gapFillTimeout,
5051
retryWaitTime: config.retryResendAfter,

packages/sdk/src/utils/CacheAsyncFn.ts

Lines changed: 0 additions & 54 deletions
This file was deleted.
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { MapKey } from '@streamr/utils'
2+
import pMemoize from 'p-memoize'
3+
import LRU from '../../vendor/quick-lru'
4+
5+
/**
6+
* Caches into a LRU cache capped at options.maxSize. See documentation for mem/p-memoize.
7+
* Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.invalidate() is called.
8+
* Won't cache rejections.
9+
*
10+
* ```js
11+
* const cache = new CachingMap(asyncFn, opts)
12+
* await cache.get(key)
13+
* await cache.get(key)
14+
* cache.invalidate(() => ...)
15+
* ```
16+
*/
17+
export class CachingMap<K extends MapKey, V, P extends any[]> {
18+
19+
private readonly cachedFn: (...args: P) => Promise<V>
20+
private readonly cache: LRU<K, { data: V, maxAge: number }>
21+
22+
constructor(
23+
asyncFn: (...args: P) => Promise<V>,
24+
opts: {
25+
maxSize: number
26+
maxAge: number
27+
cacheKey: (args: P) => K
28+
}
29+
) {
30+
this.cache = new LRU<K, { data: V, maxAge: number }>({
31+
maxSize: opts.maxSize,
32+
maxAge: opts.maxAge
33+
})
34+
this.cachedFn = pMemoize(asyncFn, {
35+
cachePromiseRejection: false,
36+
cache: this.cache,
37+
cacheKey: opts.cacheKey
38+
})
39+
}
40+
41+
get(...args: P): Promise<V> {
42+
return this.cachedFn(...args)
43+
}
44+
45+
invalidate(predicate: (key: K) => boolean): void {
46+
for (const key of this.cache.keys()) {
47+
if (predicate(key)) {
48+
this.cache.delete(key)
49+
}
50+
}
51+
}
52+
}

packages/sdk/src/utils/addStreamToStorageNode.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ export const addStreamToStorageNode = async (
5252
'storage node did not respond'
5353
)
5454
} finally {
55-
streamRegistry.clearStreamCache(streamId)
55+
streamRegistry.invalidateStreamCache(streamId)
5656
await assignmentSubscription?.unsubscribe() // should never reject...
5757
}
5858
} else {

packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ export class FakeStreamRegistry implements Methods<StreamRegistry> {
168168
}
169169

170170
// eslint-disable-next-line class-methods-use-this
171-
clearStreamCache(): void {
171+
invalidateStreamCache(): void {
172172
// no-op
173173
}
174174

packages/sdk/test/test-utils/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ export const createStreamRegistry = (opts?: {
201201
isStreamSubscriber: async () => {
202202
return opts?.isStreamSubscriber ?? true
203203
},
204-
clearStreamCache: () => {}
204+
invalidateStreamCache: () => {}
205205
} as any
206206
}
207207

0 commit comments

Comments
 (0)