diff --git a/packages/sdk/src/contracts/StreamRegistry.ts b/packages/sdk/src/contracts/StreamRegistry.ts index 712d8e0400..94a1678533 100644 --- a/packages/sdk/src/contracts/StreamRegistry.ts +++ b/packages/sdk/src/contracts/StreamRegistry.ts @@ -42,7 +42,7 @@ import { } from '../permission' import { filter, map } from '../utils/GeneratorUtils' import { LoggerFactory } from '../utils/LoggerFactory' -import { CacheAsyncFn, CacheAsyncFnType } from '../utils/CacheAsyncFn' +import { CachingMap } from '../utils/CachingMap' import { until } from '../utils/promises' import { ChainEventPoller } from './ChainEventPoller' import { ContractFactory } from './ContractFactory' @@ -119,10 +119,10 @@ export class StreamRegistry { private readonly config: Pick private readonly authentication: Authentication private readonly logger: Logger - private readonly getStreamMetadata_cached: CacheAsyncFnType<[StreamID], StreamMetadata, string> - private readonly isStreamPublisher_cached: CacheAsyncFnType<[StreamID, UserID], boolean, string> - private readonly isStreamSubscriber_cached: CacheAsyncFnType<[StreamID, UserID], boolean, string> - private readonly hasPublicSubscribePermission_cached: CacheAsyncFnType<[StreamID], boolean, string> + private readonly getStreamMetadata_cached: CachingMap + private readonly isStreamPublisher_cached: CachingMap + private readonly isStreamSubscriber_cached: CachingMap + private readonly hasPublicSubscribePermission_cached: CachingMap /** @internal */ constructor( @@ -163,7 +163,7 @@ export class StreamRegistry { }), loggerFactory }) - this.getStreamMetadata_cached = CacheAsyncFn((streamId: StreamID) => { + this.getStreamMetadata_cached = new CachingMap((streamId: StreamID) => { return this.getStreamMetadata_nonCached(streamId) }, { ...config.cache, @@ -171,7 +171,7 @@ export class StreamRegistry { return `${streamId}${CACHE_KEY_SEPARATOR}` } }) - this.isStreamPublisher_cached = CacheAsyncFn((streamId: StreamID, userId: UserID) => { + this.isStreamPublisher_cached = new CachingMap((streamId: StreamID, userId: UserID) => { return this.isStreamPublisher(streamId, userId, false) }, { ...config.cache, @@ -179,7 +179,7 @@ export class StreamRegistry { return [streamId, userId].join(CACHE_KEY_SEPARATOR) } }) - this.isStreamSubscriber_cached = CacheAsyncFn((streamId: StreamID, userId: UserID) => { + this.isStreamSubscriber_cached = new CachingMap((streamId: StreamID, userId: UserID) => { return this.isStreamSubscriber(streamId, userId, false) }, { ...config.cache, @@ -187,7 +187,7 @@ export class StreamRegistry { return [streamId, userId].join(CACHE_KEY_SEPARATOR) } }) - this.hasPublicSubscribePermission_cached = CacheAsyncFn((streamId: StreamID) => { + this.hasPublicSubscribePermission_cached = new CachingMap((streamId: StreamID) => { return this.hasPermission({ streamId, public: true, @@ -264,7 +264,7 @@ export class StreamRegistry { JSON.stringify(metadata), ethersOverrides )) - this.clearStreamCache(streamId) + this.invalidateStreamCache(streamId) } async deleteStream(streamIdOrPath: string): Promise { @@ -275,7 +275,7 @@ export class StreamRegistry { streamId, ethersOverrides )) - this.clearStreamCache(streamId) + this.invalidateStreamCache(streamId) } private async streamExistsOnChain(streamIdOrPath: string): Promise { @@ -462,7 +462,7 @@ export class StreamRegistry { ...assignments: InternalPermissionAssignment[] ): Promise { const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath) - this.clearStreamCache(streamId) + this.invalidateStreamCache(streamId) await this.connectToContract() for (const assignment of assignments) { for (const permission of assignment.permissions) { @@ -484,7 +484,7 @@ export class StreamRegistry { for (const item of items) { validatePermissionAssignments(item.assignments) const streamId = await this.streamIdBuilder.toStreamID(item.streamId) - this.clearStreamCache(streamId) + this.invalidateStreamCache(streamId) streamIds.push(streamId) targets.push(item.assignments.map((assignment) => { return isPublicPermissionAssignment(assignment) ? PUBLIC_PERMISSION_USER_ID : assignment.userId @@ -518,7 +518,7 @@ export class StreamRegistry { getStreamMetadata(streamId: StreamID, useCache = true): Promise { if (useCache) { - return this.getStreamMetadata_cached(streamId) + return this.getStreamMetadata_cached.get(streamId) } else { return this.getStreamMetadata_nonCached(streamId) } @@ -526,7 +526,7 @@ export class StreamRegistry { isStreamPublisher(streamId: StreamID, userId: UserID, useCache = true): Promise { if (useCache) { - return this.isStreamPublisher_cached(streamId, userId) + return this.isStreamPublisher_cached.get(streamId, userId) } else { return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH) } @@ -534,24 +534,24 @@ export class StreamRegistry { isStreamSubscriber(streamId: StreamID, userId: UserID, useCache = true): Promise { if (useCache) { - return this.isStreamSubscriber_cached(streamId, userId) + return this.isStreamSubscriber_cached.get(streamId, userId) } else { return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE) } } hasPublicSubscribePermission(streamId: StreamID): Promise { - return this.hasPublicSubscribePermission_cached(streamId) + return this.hasPublicSubscribePermission_cached.get(streamId) } - clearStreamCache(streamId: StreamID): void { + invalidateStreamCache(streamId: StreamID): void { this.logger.debug('Clear caches matching stream', { streamId }) // include separator so startsWith(streamid) doesn't match streamid-something const target = `${streamId}${CACHE_KEY_SEPARATOR}` const matchTarget = (s: string) => s.startsWith(target) - this.getStreamMetadata_cached.clearMatching(matchTarget) - this.isStreamPublisher_cached.clearMatching(matchTarget) - this.isStreamSubscriber_cached.clearMatching(matchTarget) + this.getStreamMetadata_cached.invalidate(matchTarget) + this.isStreamPublisher_cached.invalidate(matchTarget) + this.isStreamSubscriber_cached.invalidate(matchTarget) // TODO should also clear cache for hasPublicSubscribePermission? } } diff --git a/packages/sdk/src/publish/MessageFactory.ts b/packages/sdk/src/publish/MessageFactory.ts index 3ee44321e6..27883c0a2c 100644 --- a/packages/sdk/src/publish/MessageFactory.ts +++ b/packages/sdk/src/publish/MessageFactory.ts @@ -26,7 +26,7 @@ import { createMessageRef, createRandomMsgChainId } from './messageChain' export interface MessageFactoryOptions { streamId: StreamID authentication: Authentication - streamRegistry: Pick + streamRegistry: Pick groupKeyQueue: GroupKeyQueue signatureValidator: SignatureValidator messageSigner: MessageSigner @@ -40,7 +40,7 @@ export class MessageFactory { private readonly defaultMessageChainIds: Mapping<[partition: number], string> private readonly prevMsgRefs: Map = new Map() // eslint-disable-next-line max-len - private readonly streamRegistry: Pick + private readonly streamRegistry: Pick private readonly groupKeyQueue: GroupKeyQueue private readonly signatureValidator: SignatureValidator private readonly messageSigner: MessageSigner @@ -66,7 +66,7 @@ export class MessageFactory { const publisherId = await this.getPublisherId(metadata) const isPublisher = await this.streamRegistry.isStreamPublisher(this.streamId, publisherId) if (!isPublisher) { - this.streamRegistry.clearStreamCache(this.streamId) + this.streamRegistry.invalidateStreamCache(this.streamId) throw new StreamrClientError(`You don't have permission to publish to this stream. Using address: ${publisherId}`, 'MISSING_PERMISSION') } diff --git a/packages/sdk/src/subscribe/messagePipeline.ts b/packages/sdk/src/subscribe/messagePipeline.ts index 9be0009a4a..baba7c0fa5 100644 --- a/packages/sdk/src/subscribe/messagePipeline.ts +++ b/packages/sdk/src/subscribe/messagePipeline.ts @@ -57,7 +57,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin // TODO log this in onError? if we want to log all errors? logger.debug('Failed to decrypt', { messageId: msg.messageId, err }) // clear cached permissions if cannot decrypt, likely permissions need updating - opts.streamRegistry.clearStreamCache(msg.getStreamId()) + opts.streamRegistry.invalidateStreamCache(msg.getStreamId()) throw err } } else { diff --git a/packages/sdk/src/subscribe/ordering/OrderMessages.ts b/packages/sdk/src/subscribe/ordering/OrderMessages.ts index cb54736d97..000fd94b7f 100644 --- a/packages/sdk/src/subscribe/ordering/OrderMessages.ts +++ b/packages/sdk/src/subscribe/ordering/OrderMessages.ts @@ -3,7 +3,7 @@ import { StrictStreamrClientConfig } from '../../Config' import { StreamMessage } from '../../protocol/StreamMessage' import { Mapping } from '../../utils/Mapping' import { PushBuffer } from '../../utils/PushBuffer' -import { CacheAsyncFn } from '../../utils/CacheAsyncFn' +import { CachingMap } from '../../utils/CachingMap' import { Resends } from '../Resends' import { GapFiller } from './GapFiller' import { Gap, OrderedMessageChain, OrderedMessageChainContext } from './OrderedMessageChain' @@ -38,13 +38,14 @@ const createMessageChain = ( } const chain = new OrderedMessageChain(context, abortSignal) chain.on('unfillableGap', (gap: Gap) => onUnfillableGap(gap)) + // TODO maybe caching should be configurable, i.e. use client's config.cache instead of the constant + // - maybe the caching should be done at application level, e.g. with a new CacheStreamStorageRegistry class? + // - also note that this is a cache which contains just one item (as streamPartId always the same) + const storageNodeCache = new CachingMap(() => getStorageNodes(StreamPartIDUtils.getStreamID(context.streamPartId)), STORAGE_NODE_CACHE_OPTS) const gapFiller = new GapFiller({ chain, resend, - // TODO maybe caching should be configurable, i.e. use client's config.cache instead of the constant - // - maybe the caching should be done at application level, e.g. with a new CacheStreamStorageRegistry class? - // - also note that this is a cache which contains just one item (as streamPartId always the same) - getStorageNodeAddresses: CacheAsyncFn(() => getStorageNodes(StreamPartIDUtils.getStreamID(context.streamPartId)), STORAGE_NODE_CACHE_OPTS), + getStorageNodeAddresses: () => storageNodeCache.get(), strategy: config.gapFillStrategy, initialWaitTime: config.gapFillTimeout, retryWaitTime: config.retryResendAfter, diff --git a/packages/sdk/src/utils/CacheAsyncFn.ts b/packages/sdk/src/utils/CacheAsyncFn.ts deleted file mode 100644 index 2d4be6846f..0000000000 --- a/packages/sdk/src/utils/CacheAsyncFn.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { MapKey } from '@streamr/utils' -import pMemoize from 'p-memoize' -import LRU from '../../vendor/quick-lru' - -interface Collection { - keys: Map['keys'] - delete: Map['delete'] -} - -function clearMatching(cache: Collection, matchFn: (key: K) => boolean): void { - for (const key of cache.keys()) { - if (matchFn(key)) { - cache.delete(key) - } - } -} - -export type CacheAsyncFnType = ((...args: ArgsType) => Promise) - & { clearMatching: (matchFn: (key: KeyType) => boolean) => void } - -/** - * Returns a cached async fn. See documentation for mem/p-memoize. - * Caches into a LRU cache capped at options.maxSize - * Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.clearMatching() is called. - * Won't cache rejections. - * - * ```js - * const cachedAsyncFn = CacheAsyncFn(asyncFn, options) - * await cachedAsyncFn(key) - * await cachedAsyncFn(key) - * cachedAsyncFn.clearMatching(() => ...) - * ``` - */ -export function CacheAsyncFn( - asyncFn: (...args: ArgsType) => Promise, - opts: { - maxSize: number - maxAge: number - cacheKey: (args: ArgsType) => KeyType - } -): CacheAsyncFnType { - const cache = new LRU({ - maxSize: opts.maxSize, - maxAge: opts.maxAge - }) - const cachedFn = Object.assign(pMemoize(asyncFn, { - cachePromiseRejection: false, - cache, - cacheKey: opts.cacheKey - }), { - clearMatching: (matchFn: ((key: KeyType) => boolean)) => clearMatching(cache, matchFn), - }) - return cachedFn -} diff --git a/packages/sdk/src/utils/CachingMap.ts b/packages/sdk/src/utils/CachingMap.ts new file mode 100644 index 0000000000..50a09a7575 --- /dev/null +++ b/packages/sdk/src/utils/CachingMap.ts @@ -0,0 +1,52 @@ +import { MapKey } from '@streamr/utils' +import pMemoize from 'p-memoize' +import LRU from '../../vendor/quick-lru' + +/** + * Caches into a LRU cache capped at options.maxSize. See documentation for mem/p-memoize. + * Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.invalidate() is called. + * Won't cache rejections. + * + * ```js + * const cache = new CachingMap(asyncFn, opts) + * await cache.get(key) + * await cache.get(key) + * cache.invalidate(() => ...) + * ``` + */ +export class CachingMap { + + private readonly cachedFn: (...args: P) => Promise + private readonly cache: LRU + + constructor( + asyncFn: (...args: P) => Promise, + opts: { + maxSize: number + maxAge: number + cacheKey: (args: P) => K + } + ) { + this.cache = new LRU({ + maxSize: opts.maxSize, + maxAge: opts.maxAge + }) + this.cachedFn = pMemoize(asyncFn, { + cachePromiseRejection: false, + cache: this.cache, + cacheKey: opts.cacheKey + }) + } + + get(...args: P): Promise { + return this.cachedFn(...args) + } + + invalidate(predicate: (key: K) => boolean): void { + for (const key of this.cache.keys()) { + if (predicate(key)) { + this.cache.delete(key) + } + } + } +} diff --git a/packages/sdk/src/utils/addStreamToStorageNode.ts b/packages/sdk/src/utils/addStreamToStorageNode.ts index 6d6f03599d..43e0c1298c 100644 --- a/packages/sdk/src/utils/addStreamToStorageNode.ts +++ b/packages/sdk/src/utils/addStreamToStorageNode.ts @@ -52,7 +52,7 @@ export const addStreamToStorageNode = async ( 'storage node did not respond' ) } finally { - streamRegistry.clearStreamCache(streamId) + streamRegistry.invalidateStreamCache(streamId) await assignmentSubscription?.unsubscribe() // should never reject... } } else { diff --git a/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts b/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts index c09fd85fcd..42e6a5e6e3 100644 --- a/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts +++ b/packages/sdk/test/test-utils/fake/FakeStreamRegistry.ts @@ -168,7 +168,7 @@ export class FakeStreamRegistry implements Methods { } // eslint-disable-next-line class-methods-use-this - clearStreamCache(): void { + invalidateStreamCache(): void { // no-op } diff --git a/packages/sdk/test/test-utils/utils.ts b/packages/sdk/test/test-utils/utils.ts index 4ae2ff3e68..b71ea12c7e 100644 --- a/packages/sdk/test/test-utils/utils.ts +++ b/packages/sdk/test/test-utils/utils.ts @@ -201,7 +201,7 @@ export const createStreamRegistry = (opts?: { isStreamSubscriber: async () => { return opts?.isStreamSubscriber ?? true }, - clearStreamCache: () => {} + invalidateStreamCache: () => {} } as any } diff --git a/packages/sdk/test/unit/CacheAsyncFn.test.ts b/packages/sdk/test/unit/CachingMap.test.ts similarity index 65% rename from packages/sdk/test/unit/CacheAsyncFn.test.ts rename to packages/sdk/test/unit/CachingMap.test.ts index c6395b1fc9..c38dbedd9e 100644 --- a/packages/sdk/test/unit/CacheAsyncFn.test.ts +++ b/packages/sdk/test/unit/CachingMap.test.ts @@ -1,10 +1,10 @@ -import { CacheAsyncFn } from '../../src/utils/CacheAsyncFn' +import { CachingMap } from '../../src/utils/CachingMap' import { wait } from '@streamr/utils' -describe('CacheAsyncFn', () => { +describe('CachingMap', () => { let plainFn: jest.Mock, [key1: string, key2: string]> - let cachedFn: (key1: string, key2: string) => Promise + let cache: CachingMap beforeEach(() => { plainFn = jest.fn() @@ -12,7 +12,7 @@ describe('CacheAsyncFn', () => { await wait(100) return `${key1}${key2}`.toUpperCase() }) - cachedFn = CacheAsyncFn(plainFn as any, { + cache = new CachingMap(plainFn as any, { maxSize: 10000, maxAge: 30 * 60 * 1000, cacheKey: ([key1, key2]) => `${key1};${key2}` @@ -20,16 +20,16 @@ describe('CacheAsyncFn', () => { }) it('happy path', async () => { - const result1 = await cachedFn('foo', 'bar') - const result2 = await cachedFn('foo', 'bar') + const result1 = await cache.get('foo', 'bar') + const result2 = await cache.get('foo', 'bar') expect(result1).toBe('FOOBAR') expect(result2).toBe('FOOBAR') expect(plainFn).toBeCalledTimes(1) }) it('miss', async () => { - const result1 = await cachedFn('foo', 'x') - const result2 = await cachedFn('foo', 'y') + const result1 = await cache.get('foo', 'x') + const result2 = await cache.get('foo', 'y') expect(result1).toBe('FOOX') expect(result2).toBe('FOOY') expect(plainFn).toBeCalledTimes(2) @@ -37,8 +37,8 @@ describe('CacheAsyncFn', () => { it('concurrency', async () => { const [result1, result2] = await Promise.all([ - cachedFn('foo', 'bar'), - cachedFn('foo', 'bar') + cache.get('foo', 'bar'), + cache.get('foo', 'bar') ]) expect(result1).toBe('FOOBAR') expect(result2).toBe('FOOBAR') @@ -49,8 +49,8 @@ describe('CacheAsyncFn', () => { plainFn.mockImplementation(async (key1: string, key2: string) => { throw new Error(`error ${key1}-${key2}`) }) - await expect(cachedFn('foo', 'x')).rejects.toEqual(new Error('error foo-x')) - await expect(cachedFn('foo', 'x')).rejects.toEqual(new Error('error foo-x')) + await expect(cache.get('foo', 'x')).rejects.toEqual(new Error('error foo-x')) + await expect(cache.get('foo', 'x')).rejects.toEqual(new Error('error foo-x')) expect(plainFn).toBeCalledTimes(2) // would be 1 if rejections were cached }) @@ -59,8 +59,8 @@ describe('CacheAsyncFn', () => { plainFn.mockImplementation((key1: string, key2: string) => { throw new Error(`error ${key1}-${key2}`) }) - await expect(cachedFn('foo', 'x')).rejects.toEqual(new Error('error foo-x')) - await expect(cachedFn('foo', 'x')).rejects.toEqual(new Error('error foo-x')) + await expect(cache.get('foo', 'x')).rejects.toEqual(new Error('error foo-x')) + await expect(cache.get('foo', 'x')).rejects.toEqual(new Error('error foo-x')) expect(plainFn).toBeCalledTimes(2) // would be 1 if throws were cached }) diff --git a/packages/sdk/test/unit/CacheAsyncFn2.test.ts b/packages/sdk/test/unit/CachingMap2.test.ts similarity index 58% rename from packages/sdk/test/unit/CacheAsyncFn2.test.ts rename to packages/sdk/test/unit/CachingMap2.test.ts index 63e5852cd9..1493c97368 100644 --- a/packages/sdk/test/unit/CacheAsyncFn2.test.ts +++ b/packages/sdk/test/unit/CachingMap2.test.ts @@ -1,5 +1,5 @@ import { wait } from '@streamr/utils' -import { CacheAsyncFn } from '../../src/utils/CacheAsyncFn' +import { CachingMap } from '../../src/utils/CachingMap' const DEFAULT_OPTS = { maxSize: 10000, @@ -7,29 +7,29 @@ const DEFAULT_OPTS = { cacheKey: (args: any[]) => args[0] } -describe('CacheAsyncFn', () => { +describe('CachingMap', () => { it('caches & be cleared', async () => { const fn = jest.fn() - const cachedFn = CacheAsyncFn(fn, DEFAULT_OPTS) - await cachedFn() + const cache = new CachingMap(fn, DEFAULT_OPTS) + await cache.get() expect(fn).toHaveBeenCalledTimes(1) - await cachedFn() + await cache.get() expect(fn).toHaveBeenCalledTimes(1) - await cachedFn(1) + await cache.get(1) expect(fn).toHaveBeenCalledTimes(2) - await cachedFn(1) + await cache.get(1) expect(fn).toHaveBeenCalledTimes(2) - await cachedFn(2) + await cache.get(2) expect(fn).toHaveBeenCalledTimes(3) - await cachedFn(1) + await cache.get(1) expect(fn).toHaveBeenCalledTimes(3) - await cachedFn(2) + await cache.get(2) expect(fn).toHaveBeenCalledTimes(3) - cachedFn.clearMatching((v) => v === 1) - await cachedFn(1) + cache.invalidate((v) => v === 1) + await cache.get(1) expect(fn).toHaveBeenCalledTimes(4) - cachedFn.clearMatching((v) => v === 1) - await cachedFn(1) + cache.invalidate((v) => v === 1) + await cache.get(1) expect(fn).toHaveBeenCalledTimes(5) }) @@ -40,28 +40,28 @@ describe('CacheAsyncFn', () => { return 3 } - const cachedFn = CacheAsyncFn(fn, DEFAULT_OPTS) - const a: number = await cachedFn('abc') // ok + const cache = new CachingMap(fn, DEFAULT_OPTS) + const a: number = await cache.get('abc') // ok expect(a).toEqual(3) // @ts-expect-error not enough args - await cachedFn() + await cache.get() // @ts-expect-error too many args - await cachedFn('abc', 3) + await cache.get('abc', 3) // @ts-expect-error wrong argument type - await cachedFn(3) + await cache.get(3) // @ts-expect-error wrong return type - const c: string = await cachedFn('abc') + const c: string = await cache.get('abc') expect(c).toEqual(3) - cachedFn.clearMatching((_d: string) => true) - const cachedFn2 = CacheAsyncFn(fn, { + cache.invalidate((_d: string) => true) + const cache2 = new CachingMap(fn, { ...DEFAULT_OPTS, cacheKey: ([s]) => { return s.length } }) - cachedFn2.clearMatching((_d: number) => true) + cache2.invalidate((_d: number) => true) }) it('does memoize consecutive calls', async () => { @@ -70,9 +70,9 @@ describe('CacheAsyncFn', () => { i += 1 return i } - const memoized = CacheAsyncFn(fn, DEFAULT_OPTS) - const firstCall = memoized() - const secondCall = memoized() + const memoized = new CachingMap(fn, DEFAULT_OPTS) + const firstCall = memoized.get() + const secondCall = memoized.get() expect(await Promise.all([firstCall, secondCall])).toEqual([1, 1]) }) @@ -87,7 +87,7 @@ describe('CacheAsyncFn', () => { return key }) - const cachedFn = CacheAsyncFn(fn, { + const cache = new CachingMap(fn, { maxSize: 10000, maxAge: 1800000, cacheKey: ([v]) => { @@ -95,40 +95,40 @@ describe('CacheAsyncFn', () => { } }) const task = Promise.all([ - cachedFn(taskId1), - cachedFn(taskId2), - cachedFn(taskId1), - cachedFn(taskId2), + cache.get(taskId1), + cache.get(taskId2), + cache.get(taskId1), + cache.get(taskId2), ]) task.catch(() => {}) setImmediate(() => { - cachedFn(taskId1) - cachedFn(taskId1) - cachedFn(taskId2) - cachedFn(taskId2) + cache.get(taskId1) + cache.get(taskId1) + cache.get(taskId2) + cache.get(taskId2) }) process.nextTick(() => { - cachedFn(taskId1) - cachedFn(taskId2) - cachedFn(taskId1) - cachedFn(taskId2) + cache.get(taskId1) + cache.get(taskId2) + cache.get(taskId1) + cache.get(taskId2) }) setTimeout(() => { - cachedFn(taskId1) - cachedFn(taskId1) - cachedFn(taskId2) - cachedFn(taskId2) + cache.get(taskId1) + cache.get(taskId1) + cache.get(taskId2) + cache.get(taskId2) }) await wait(10) - cachedFn(taskId2) - cachedFn(taskId2) - cachedFn(taskId1) - cachedFn(taskId1) + cache.get(taskId2) + cache.get(taskId2) + cache.get(taskId1) + cache.get(taskId1) await Promise.all([ - cachedFn(taskId1), - cachedFn(taskId2), - cachedFn(taskId1), - cachedFn(taskId2), + cache.get(taskId1), + cache.get(taskId2), + cache.get(taskId1), + cache.get(taskId2), ]) await task expect(fn).toHaveBeenCalledTimes(2) diff --git a/packages/sdk/test/unit/Publisher.test.ts b/packages/sdk/test/unit/Publisher.test.ts index c8a68d55a7..5132157a56 100644 --- a/packages/sdk/test/unit/Publisher.test.ts +++ b/packages/sdk/test/unit/Publisher.test.ts @@ -13,7 +13,7 @@ describe('Publisher', () => { const streamIdBuilder = new StreamIDBuilder(authentication) const streamRegistry = { isStreamPublisher: async () => false, - clearStreamCache: () => {} + invalidateStreamCache: () => {} } const publisher = new Publisher( undefined as any, diff --git a/packages/sdk/test/unit/messagePipeline.test.ts b/packages/sdk/test/unit/messagePipeline.test.ts index b18af6eca6..e146ff6c3a 100644 --- a/packages/sdk/test/unit/messagePipeline.test.ts +++ b/packages/sdk/test/unit/messagePipeline.test.ts @@ -77,7 +77,7 @@ describe('messagePipeline', () => { streamRegistry = { getStreamMetadata: async () => ({ partitions: 1 }), isStreamPublisher: async () => true, - clearStreamCache: jest.fn() + invalidateStreamCache: jest.fn() } pipeline = createMessagePipeline({ streamPartId, @@ -171,8 +171,8 @@ describe('messagePipeline', () => { expect(error).toBeInstanceOf(DecryptError) expect(error.message).toMatch(/timed out/) expect(output).toEqual([]) - expect(streamRegistry.clearStreamCache).toBeCalledTimes(1) - expect(streamRegistry.clearStreamCache).toBeCalledWith(StreamPartIDUtils.getStreamID(streamPartId)) + expect(streamRegistry.invalidateStreamCache).toBeCalledTimes(1) + expect(streamRegistry.invalidateStreamCache).toBeCalledWith(StreamPartIDUtils.getStreamID(streamPartId)) }) it('error: exception', async () => {