diff --git a/packages/sdk/src/contracts/StreamRegistry.ts b/packages/sdk/src/contracts/StreamRegistry.ts index 990304fefe..712d8e0400 100644 --- a/packages/sdk/src/contracts/StreamRegistry.ts +++ b/packages/sdk/src/contracts/StreamRegistry.ts @@ -42,7 +42,7 @@ import { } from '../permission' import { filter, map } from '../utils/GeneratorUtils' import { LoggerFactory } from '../utils/LoggerFactory' -import { CacheAsyncFn, CacheAsyncFnType } from '../utils/caches' +import { CacheAsyncFn, CacheAsyncFnType } from '../utils/CacheAsyncFn' import { until } from '../utils/promises' import { ChainEventPoller } from './ChainEventPoller' import { ContractFactory } from './ContractFactory' diff --git a/packages/sdk/src/subscribe/ordering/OrderMessages.ts b/packages/sdk/src/subscribe/ordering/OrderMessages.ts index 3099f690cc..cb54736d97 100644 --- a/packages/sdk/src/subscribe/ordering/OrderMessages.ts +++ b/packages/sdk/src/subscribe/ordering/OrderMessages.ts @@ -3,11 +3,18 @@ import { StrictStreamrClientConfig } from '../../Config' import { StreamMessage } from '../../protocol/StreamMessage' import { Mapping } from '../../utils/Mapping' import { PushBuffer } from '../../utils/PushBuffer' -import { CacheAsyncFn } from '../../utils/caches' +import { CacheAsyncFn } from '../../utils/CacheAsyncFn' import { Resends } from '../Resends' import { GapFiller } from './GapFiller' import { Gap, OrderedMessageChain, OrderedMessageChainContext } from './OrderedMessageChain' +const STORAGE_NODE_CACHE_KEY = Symbol('STORAGE_NODE_CACHE_KEY') +const STORAGE_NODE_CACHE_OPTS = { + maxSize: 10000, + maxAge: 30 * 60 * 1000, + cacheKey: () => STORAGE_NODE_CACHE_KEY +} + const createMessageChain = ( context: OrderedMessageChainContext, getStorageNodes: (streamId: StreamID) => Promise, @@ -34,10 +41,10 @@ const createMessageChain = ( const gapFiller = new GapFiller({ chain, resend, - // TODO maybe caching should be configurable? (now uses 30 min maxAge, which is the default of CacheAsyncFn) + // TODO maybe caching should be configurable, i.e. use client's config.cache instead of the constant // - maybe the caching should be done at application level, e.g. with a new CacheStreamStorageRegistry class? - // - also not that this is a cache which contains just one item (as streamPartId always the same) - getStorageNodeAddresses: CacheAsyncFn(() => getStorageNodes(StreamPartIDUtils.getStreamID(context.streamPartId))), + // - also note that this is a cache which contains just one item (as streamPartId always the same) + getStorageNodeAddresses: CacheAsyncFn(() => getStorageNodes(StreamPartIDUtils.getStreamID(context.streamPartId)), STORAGE_NODE_CACHE_OPTS), strategy: config.gapFillStrategy, initialWaitTime: config.gapFillTimeout, retryWaitTime: config.retryResendAfter, diff --git a/packages/sdk/src/utils/caches.ts b/packages/sdk/src/utils/CacheAsyncFn.ts similarity index 53% rename from packages/sdk/src/utils/caches.ts rename to packages/sdk/src/utils/CacheAsyncFn.ts index 8a3afccff4..2d4be6846f 100644 --- a/packages/sdk/src/utils/caches.ts +++ b/packages/sdk/src/utils/CacheAsyncFn.ts @@ -1,3 +1,4 @@ +import { MapKey } from '@streamr/utils' import pMemoize from 'p-memoize' import LRU from '../../vendor/quick-lru' @@ -14,48 +15,40 @@ function clearMatching(cache: Collection, matchFn: (key: K) => bo } } -export type CacheAsyncFnType = ((...args: ArgsType) => Promise) +export type CacheAsyncFnType = ((...args: ArgsType) => Promise) & { clearMatching: (matchFn: (key: KeyType) => boolean) => void } /** - * Returns a cached async fn, cached keyed on first argument passed. See documentation for mem/p-memoize. + * Returns a cached async fn. See documentation for mem/p-memoize. * Caches into a LRU cache capped at options.maxSize - * Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.clear() is called. - * Won't cache rejections by default. Override with options.cachePromiseRejection = true. + * Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.clearMatching() is called. + * Won't cache rejections. * * ```js * const cachedAsyncFn = CacheAsyncFn(asyncFn, options) * await cachedAsyncFn(key) * await cachedAsyncFn(key) - * cachedAsyncFn.clear() + * cachedAsyncFn.clearMatching(() => ...) * ``` */ -export function CacheAsyncFn(asyncFn: (...args: ArgsType) => PromiseLike, { - maxSize = 10000, - maxAge = 30 * 60 * 1000, // 30 minutes - cachePromiseRejection = false, - onEviction = () => {}, - cacheKey = (args: ArgsType) => args[0], // type+provide default so we can infer KeyType -}: { - maxSize?: number - maxAge?: number - cachePromiseRejection?: boolean - onEviction?: (...args: any[]) => void - cacheKey?: (args: ArgsType) => KeyType -} = {}): CacheAsyncFnType { +export function CacheAsyncFn( + asyncFn: (...args: ArgsType) => Promise, + opts: { + maxSize: number + maxAge: number + cacheKey: (args: ArgsType) => KeyType + } +): CacheAsyncFnType { const cache = new LRU({ - maxSize, - maxAge, - onEviction, + maxSize: opts.maxSize, + maxAge: opts.maxAge }) - const cachedFn = Object.assign(pMemoize(asyncFn, { - cachePromiseRejection, + cachePromiseRejection: false, cache, - cacheKey + cacheKey: opts.cacheKey }), { clearMatching: (matchFn: ((key: KeyType) => boolean)) => clearMatching(cache, matchFn), }) - return cachedFn } diff --git a/packages/sdk/test/unit/CacheAsyncFn.test.ts b/packages/sdk/test/unit/CacheAsyncFn.test.ts index bebd940f5e..c6395b1fc9 100644 --- a/packages/sdk/test/unit/CacheAsyncFn.test.ts +++ b/packages/sdk/test/unit/CacheAsyncFn.test.ts @@ -1,4 +1,4 @@ -import { CacheAsyncFn } from '../../src/utils/caches' +import { CacheAsyncFn } from '../../src/utils/CacheAsyncFn' import { wait } from '@streamr/utils' describe('CacheAsyncFn', () => { @@ -13,6 +13,8 @@ describe('CacheAsyncFn', () => { return `${key1}${key2}`.toUpperCase() }) cachedFn = CacheAsyncFn(plainFn as any, { + maxSize: 10000, + maxAge: 30 * 60 * 1000, cacheKey: ([key1, key2]) => `${key1};${key2}` }) }) diff --git a/packages/sdk/test/unit/CacheAsyncFn2.test.ts b/packages/sdk/test/unit/CacheAsyncFn2.test.ts new file mode 100644 index 0000000000..63e5852cd9 --- /dev/null +++ b/packages/sdk/test/unit/CacheAsyncFn2.test.ts @@ -0,0 +1,140 @@ +import { wait } from '@streamr/utils' +import { CacheAsyncFn } from '../../src/utils/CacheAsyncFn' + +const DEFAULT_OPTS = { + maxSize: 10000, + maxAge: 30 * 60 * 1000, + cacheKey: (args: any[]) => args[0] +} + +describe('CacheAsyncFn', () => { + it('caches & be cleared', async () => { + const fn = jest.fn() + const cachedFn = CacheAsyncFn(fn, DEFAULT_OPTS) + await cachedFn() + expect(fn).toHaveBeenCalledTimes(1) + await cachedFn() + expect(fn).toHaveBeenCalledTimes(1) + await cachedFn(1) + expect(fn).toHaveBeenCalledTimes(2) + await cachedFn(1) + expect(fn).toHaveBeenCalledTimes(2) + await cachedFn(2) + expect(fn).toHaveBeenCalledTimes(3) + await cachedFn(1) + expect(fn).toHaveBeenCalledTimes(3) + await cachedFn(2) + expect(fn).toHaveBeenCalledTimes(3) + cachedFn.clearMatching((v) => v === 1) + await cachedFn(1) + expect(fn).toHaveBeenCalledTimes(4) + cachedFn.clearMatching((v) => v === 1) + await cachedFn(1) + expect(fn).toHaveBeenCalledTimes(5) + }) + + it('adopts type of wrapped function', async () => { + // actually checking via ts-expect-error + // assertions don't matter, + async function fn(_s: string): Promise { + return 3 + } + + const cachedFn = CacheAsyncFn(fn, DEFAULT_OPTS) + const a: number = await cachedFn('abc') // ok + expect(a).toEqual(3) + // @ts-expect-error not enough args + await cachedFn() + // @ts-expect-error too many args + await cachedFn('abc', 3) + // @ts-expect-error wrong argument type + await cachedFn(3) + + // @ts-expect-error wrong return type + const c: string = await cachedFn('abc') + expect(c).toEqual(3) + cachedFn.clearMatching((_d: string) => true) + const cachedFn2 = CacheAsyncFn(fn, { + ...DEFAULT_OPTS, + cacheKey: ([s]) => { + return s.length + } + }) + + cachedFn2.clearMatching((_d: number) => true) + }) + + it('does memoize consecutive calls', async () => { + let i = 0 + const fn = async () => { + i += 1 + return i + } + const memoized = CacheAsyncFn(fn, DEFAULT_OPTS) + const firstCall = memoized() + const secondCall = memoized() + + expect(await Promise.all([firstCall, secondCall])).toEqual([1, 1]) + }) + + it('can not be executed in parallel', async () => { + const taskId1 = '0xbe406f5e1b7e951cd8e42ab28598671e5b73c3dd/test/75712/Encryption-0' + const taskId2 = 'd/e/f' + const calledWith: string[] = [] + const fn = jest.fn(async (key: string) => { + calledWith.push(key) + await wait(100) + return key + }) + + const cachedFn = CacheAsyncFn(fn, { + maxSize: 10000, + maxAge: 1800000, + cacheKey: ([v]) => { + return v + } + }) + const task = Promise.all([ + cachedFn(taskId1), + cachedFn(taskId2), + cachedFn(taskId1), + cachedFn(taskId2), + ]) + task.catch(() => {}) + setImmediate(() => { + cachedFn(taskId1) + cachedFn(taskId1) + cachedFn(taskId2) + cachedFn(taskId2) + }) + process.nextTick(() => { + cachedFn(taskId1) + cachedFn(taskId2) + cachedFn(taskId1) + cachedFn(taskId2) + }) + setTimeout(() => { + cachedFn(taskId1) + cachedFn(taskId1) + cachedFn(taskId2) + cachedFn(taskId2) + }) + await wait(10) + cachedFn(taskId2) + cachedFn(taskId2) + cachedFn(taskId1) + cachedFn(taskId1) + await Promise.all([ + cachedFn(taskId1), + cachedFn(taskId2), + cachedFn(taskId1), + cachedFn(taskId2), + ]) + await task + expect(fn).toHaveBeenCalledTimes(2) + expect(calledWith).toEqual([taskId1, taskId2]) + await wait(200) + expect(fn).toHaveBeenCalledTimes(2) + expect(calledWith).toEqual([taskId1, taskId2]) + }) +}) diff --git a/packages/sdk/test/unit/promises.test.ts b/packages/sdk/test/unit/promises.test.ts index f061516e46..9901427e5c 100644 --- a/packages/sdk/test/unit/promises.test.ts +++ b/packages/sdk/test/unit/promises.test.ts @@ -1,6 +1,5 @@ import { wait } from '@streamr/utils' -import { pOnce, pLimitFn, pOne, until } from '../../src/utils/promises' -import { CacheAsyncFn } from '../../src/utils/caches' +import { pLimitFn, pOnce, pOne, until } from '../../src/utils/promises' const WAIT_TIME = 25 @@ -81,141 +80,6 @@ describe('pLimitFn', () => { }) }) -describe('CacheAsyncFn', () => { - it('caches & be cleared', async () => { - const fn = jest.fn() - const cachedFn = CacheAsyncFn(fn) - await cachedFn() - expect(fn).toHaveBeenCalledTimes(1) - await cachedFn() - expect(fn).toHaveBeenCalledTimes(1) - await cachedFn(1) - expect(fn).toHaveBeenCalledTimes(2) - await cachedFn(1) - expect(fn).toHaveBeenCalledTimes(2) - await cachedFn(2) - expect(fn).toHaveBeenCalledTimes(3) - await cachedFn(1) - expect(fn).toHaveBeenCalledTimes(3) - await cachedFn(2) - expect(fn).toHaveBeenCalledTimes(3) - cachedFn.clearMatching((v) => v === 1) - await cachedFn(1) - expect(fn).toHaveBeenCalledTimes(4) - cachedFn.clearMatching((v) => v === 1) - await cachedFn(1) - expect(fn).toHaveBeenCalledTimes(5) - }) - - it('adopts type of wrapped function', async () => { - // actually checking via ts-expect-error - // assertions don't matter, - async function fn(_s: string): Promise { - return 3 - } - - const cachedFn = CacheAsyncFn(fn) - const a: number = await cachedFn('abc') // ok - expect(a).toEqual(3) - // @ts-expect-error not enough args - await cachedFn() - // @ts-expect-error too many args - await cachedFn('abc', 3) - // @ts-expect-error wrong argument type - await cachedFn(3) - - // @ts-expect-error wrong return type - const c: string = await cachedFn('abc') - expect(c).toEqual(3) - cachedFn.clearMatching((_d: string) => true) - // @ts-expect-error wrong match argument type - cachedFn.clearMatching((_d: number) => true) - const cachedFn2 = CacheAsyncFn(fn, { - cacheKey: ([s]) => { - return s.length - } - }) - - cachedFn2.clearMatching((_d: number) => true) - // @ts-expect-error wrong match argument type - cachedFn2.clearMatching((_d: string) => true) - }) - - it('does memoize consecutive calls', async () => { - let i = 0 - const fn = async () => { - i += 1 - return i - } - const memoized = CacheAsyncFn(fn) - const firstCall = memoized() - const secondCall = memoized() - - expect(await Promise.all([firstCall, secondCall])).toEqual([1, 1]) - }) - - it('can not be executed in parallel', async () => { - const taskId1 = '0xbe406f5e1b7e951cd8e42ab28598671e5b73c3dd/test/75712/Encryption-0' - const taskId2 = 'd/e/f' - const calledWith: string[] = [] - const fn = jest.fn(async (key: string) => { - calledWith.push(key) - await wait(100) - return key - }) - - const cachedFn = CacheAsyncFn(fn, { - maxSize: 10000, - maxAge: 1800000, - cacheKey: ([v]) => { - return v - } - }) - const task = Promise.all([ - cachedFn(taskId1), - cachedFn(taskId2), - cachedFn(taskId1), - cachedFn(taskId2), - ]) - task.catch(() => {}) - setImmediate(() => { - cachedFn(taskId1) - cachedFn(taskId1) - cachedFn(taskId2) - cachedFn(taskId2) - }) - process.nextTick(() => { - cachedFn(taskId1) - cachedFn(taskId2) - cachedFn(taskId1) - cachedFn(taskId2) - }) - setTimeout(() => { - cachedFn(taskId1) - cachedFn(taskId1) - cachedFn(taskId2) - cachedFn(taskId2) - }) - await wait(10) - cachedFn(taskId2) - cachedFn(taskId2) - cachedFn(taskId1) - cachedFn(taskId1) - await Promise.all([ - cachedFn(taskId1), - cachedFn(taskId2), - cachedFn(taskId1), - cachedFn(taskId2), - ]) - await task - expect(fn).toHaveBeenCalledTimes(2) - expect(calledWith).toEqual([taskId1, taskId2]) - await wait(200) - expect(fn).toHaveBeenCalledTimes(2) - expect(calledWith).toEqual([taskId1, taskId2]) - }) -}) - describe('pOnce', () => { it('works', async () => { let count = 0 diff --git a/packages/utils/src/exports.ts b/packages/utils/src/exports.ts index 9d71679ebc..1e76ddccb4 100644 --- a/packages/utils/src/exports.ts +++ b/packages/utils/src/exports.ts @@ -127,4 +127,4 @@ export { DEFAULT_PARTITION_COUNT, MAX_PARTITION_COUNT, ensureValidStreamPartitio export { StreamPartID, toStreamPartID, StreamPartIDUtils } from './StreamPartID' export { UserID, UserIDRaw, toUserId, toUserIdRaw, isValidUserId, isEthereumAddressUserId } from './UserID' export { HexString } from './HexString' -export { ChangeFieldType } from './types' +export { ChangeFieldType, MapKey } from './types' diff --git a/packages/utils/src/types.ts b/packages/utils/src/types.ts index 9da2e3d55e..1ba5f6b78a 100644 --- a/packages/utils/src/types.ts +++ b/packages/utils/src/types.ts @@ -3,3 +3,5 @@ export type BrandedString = string & { __brand: T } export type Events = { [K in keyof T]: (payload: any) => void } export type ChangeFieldType = Omit & Record + +export type MapKey = string | number | boolean | symbol | bigint | object