Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import {
} from '../permission'
import { filter, map } from '../utils/GeneratorUtils'
import { LoggerFactory } from '../utils/LoggerFactory'
import { CacheAsyncFn, CacheAsyncFnType } from '../utils/caches'
import { CacheAsyncFn, CacheAsyncFnType } from '../utils/CacheAsyncFn'
import { until } from '../utils/promises'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
Expand Down
15 changes: 11 additions & 4 deletions packages/sdk/src/subscribe/ordering/OrderMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@ import { StrictStreamrClientConfig } from '../../Config'
import { StreamMessage } from '../../protocol/StreamMessage'
import { Mapping } from '../../utils/Mapping'
import { PushBuffer } from '../../utils/PushBuffer'
import { CacheAsyncFn } from '../../utils/caches'
import { CacheAsyncFn } from '../../utils/CacheAsyncFn'
import { Resends } from '../Resends'
import { GapFiller } from './GapFiller'
import { Gap, OrderedMessageChain, OrderedMessageChainContext } from './OrderedMessageChain'

const STORAGE_NODE_CACHE_KEY = Symbol('STORAGE_NODE_CACHE_KEY')
const STORAGE_NODE_CACHE_OPTS = {
maxSize: 10000,
maxAge: 30 * 60 * 1000,
cacheKey: () => STORAGE_NODE_CACHE_KEY
}

const createMessageChain = (
context: OrderedMessageChainContext,
getStorageNodes: (streamId: StreamID) => Promise<EthereumAddress[]>,
Expand All @@ -34,10 +41,10 @@ const createMessageChain = (
const gapFiller = new GapFiller({
chain,
resend,
// TODO maybe caching should be configurable? (now uses 30 min maxAge, which is the default of CacheAsyncFn)
// TODO maybe caching should be configurable, i.e. use client's config.cache instead of the constant
// - maybe the caching should be done at application level, e.g. with a new CacheStreamStorageRegistry class?
// - also not that this is a cache which contains just one item (as streamPartId always the same)
getStorageNodeAddresses: CacheAsyncFn(() => getStorageNodes(StreamPartIDUtils.getStreamID(context.streamPartId))),
// - also note that this is a cache which contains just one item (as streamPartId always the same)
getStorageNodeAddresses: CacheAsyncFn(() => getStorageNodes(StreamPartIDUtils.getStreamID(context.streamPartId)), STORAGE_NODE_CACHE_OPTS),
strategy: config.gapFillStrategy,
initialWaitTime: config.gapFillTimeout,
retryWaitTime: config.retryResendAfter,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { MapKey } from '@streamr/utils'
import pMemoize from 'p-memoize'
import LRU from '../../vendor/quick-lru'

Expand All @@ -14,48 +15,40 @@ function clearMatching<K>(cache: Collection<K, unknown>, matchFn: (key: K) => bo
}
}

export type CacheAsyncFnType<ArgsType extends any[], ReturnType, KeyType = ArgsType[0]> = ((...args: ArgsType) => Promise<ReturnType>)
export type CacheAsyncFnType<ArgsType extends any[], ReturnType, KeyType extends MapKey> = ((...args: ArgsType) => Promise<ReturnType>)
& { clearMatching: (matchFn: (key: KeyType) => boolean) => void }

/**
* Returns a cached async fn, cached keyed on first argument passed. See documentation for mem/p-memoize.
* Returns a cached async fn. See documentation for mem/p-memoize.
* Caches into a LRU cache capped at options.maxSize
* Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.clear() is called.
* Won't cache rejections by default. Override with options.cachePromiseRejection = true.
* Won't call asyncFn again until options.maxAge or options.maxSize exceeded, or cachedAsyncFn.clearMatching() is called.
* Won't cache rejections.
*
* ```js
* const cachedAsyncFn = CacheAsyncFn(asyncFn, options)
* await cachedAsyncFn(key)
* await cachedAsyncFn(key)
* cachedAsyncFn.clear()
* cachedAsyncFn.clearMatching(() => ...)
* ```
*/
export function CacheAsyncFn<ArgsType extends any[], ReturnType, KeyType = ArgsType[0]>(asyncFn: (...args: ArgsType) => PromiseLike<ReturnType>, {
maxSize = 10000,
maxAge = 30 * 60 * 1000, // 30 minutes
cachePromiseRejection = false,
onEviction = () => {},
cacheKey = (args: ArgsType) => args[0], // type+provide default so we can infer KeyType
}: {
maxSize?: number
maxAge?: number
cachePromiseRejection?: boolean
onEviction?: (...args: any[]) => void
cacheKey?: (args: ArgsType) => KeyType
} = {}): CacheAsyncFnType<ArgsType, ReturnType, KeyType> {
export function CacheAsyncFn<ArgsType extends any[], ReturnType, KeyType extends MapKey>(
asyncFn: (...args: ArgsType) => Promise<ReturnType>,
opts: {
maxSize: number
maxAge: number
cacheKey: (args: ArgsType) => KeyType
}
): CacheAsyncFnType<ArgsType, ReturnType, KeyType> {
const cache = new LRU<KeyType, { data: ReturnType, maxAge: number }>({
maxSize,
maxAge,
onEviction,
maxSize: opts.maxSize,
maxAge: opts.maxAge
})

const cachedFn = Object.assign(pMemoize(asyncFn, {
cachePromiseRejection,
cachePromiseRejection: false,
cache,
cacheKey
cacheKey: opts.cacheKey
}), {
clearMatching: (matchFn: ((key: KeyType) => boolean)) => clearMatching(cache, matchFn),
})

return cachedFn
}
4 changes: 3 additions & 1 deletion packages/sdk/test/unit/CacheAsyncFn.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CacheAsyncFn } from '../../src/utils/caches'
import { CacheAsyncFn } from '../../src/utils/CacheAsyncFn'
import { wait } from '@streamr/utils'

describe('CacheAsyncFn', () => {
Expand All @@ -13,6 +13,8 @@ describe('CacheAsyncFn', () => {
return `${key1}${key2}`.toUpperCase()
})
cachedFn = CacheAsyncFn(plainFn as any, {
maxSize: 10000,
maxAge: 30 * 60 * 1000,
cacheKey: ([key1, key2]) => `${key1};${key2}`
})
})
Expand Down
140 changes: 140 additions & 0 deletions packages/sdk/test/unit/CacheAsyncFn2.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import { wait } from '@streamr/utils'
import { CacheAsyncFn } from '../../src/utils/CacheAsyncFn'

const DEFAULT_OPTS = {
maxSize: 10000,
maxAge: 30 * 60 * 1000,
cacheKey: (args: any[]) => args[0]
}

describe('CacheAsyncFn', () => {
it('caches & be cleared', async () => {
const fn = jest.fn()
const cachedFn = CacheAsyncFn(fn, DEFAULT_OPTS)
await cachedFn()
expect(fn).toHaveBeenCalledTimes(1)
await cachedFn()
expect(fn).toHaveBeenCalledTimes(1)
await cachedFn(1)
expect(fn).toHaveBeenCalledTimes(2)
await cachedFn(1)
expect(fn).toHaveBeenCalledTimes(2)
await cachedFn(2)
expect(fn).toHaveBeenCalledTimes(3)
await cachedFn(1)
expect(fn).toHaveBeenCalledTimes(3)
await cachedFn(2)
expect(fn).toHaveBeenCalledTimes(3)
cachedFn.clearMatching((v) => v === 1)
await cachedFn(1)
expect(fn).toHaveBeenCalledTimes(4)
cachedFn.clearMatching((v) => v === 1)
await cachedFn(1)
expect(fn).toHaveBeenCalledTimes(5)
})

it('adopts type of wrapped function', async () => {
// actually checking via ts-expect-error
// assertions don't matter,
async function fn(_s: string): Promise<number> {
return 3
}

const cachedFn = CacheAsyncFn(fn, DEFAULT_OPTS)
const a: number = await cachedFn('abc') // ok
expect(a).toEqual(3)
// @ts-expect-error not enough args
await cachedFn()
// @ts-expect-error too many args
await cachedFn('abc', 3)
// @ts-expect-error wrong argument type
await cachedFn(3)

// @ts-expect-error wrong return type
const c: string = await cachedFn('abc')
expect(c).toEqual(3)
cachedFn.clearMatching((_d: string) => true)
const cachedFn2 = CacheAsyncFn(fn, {
...DEFAULT_OPTS,
cacheKey: ([s]) => {
return s.length
}
})

cachedFn2.clearMatching((_d: number) => true)
})

it('does memoize consecutive calls', async () => {
let i = 0
const fn = async () => {
i += 1
return i
}
const memoized = CacheAsyncFn(fn, DEFAULT_OPTS)
const firstCall = memoized()
const secondCall = memoized()

expect(await Promise.all([firstCall, secondCall])).toEqual([1, 1])
})

it('can not be executed in parallel', async () => {
const taskId1 = '0xbe406f5e1b7e951cd8e42ab28598671e5b73c3dd/test/75712/Encryption-0'
const taskId2 = 'd/e/f'
const calledWith: string[] = []
const fn = jest.fn(async (key: string) => {
calledWith.push(key)
await wait(100)
return key
})

const cachedFn = CacheAsyncFn(fn, {
maxSize: 10000,
maxAge: 1800000,
cacheKey: ([v]) => {
return v
}
})
const task = Promise.all([
cachedFn(taskId1),
cachedFn(taskId2),
cachedFn(taskId1),
cachedFn(taskId2),
])
task.catch(() => {})
setImmediate(() => {
cachedFn(taskId1)
cachedFn(taskId1)
cachedFn(taskId2)
cachedFn(taskId2)
})
process.nextTick(() => {
cachedFn(taskId1)
cachedFn(taskId2)
cachedFn(taskId1)
cachedFn(taskId2)
})
setTimeout(() => {
cachedFn(taskId1)
cachedFn(taskId1)
cachedFn(taskId2)
cachedFn(taskId2)
})
await wait(10)
cachedFn(taskId2)
cachedFn(taskId2)
cachedFn(taskId1)
cachedFn(taskId1)
await Promise.all([
cachedFn(taskId1),
cachedFn(taskId2),
cachedFn(taskId1),
cachedFn(taskId2),
])
await task
expect(fn).toHaveBeenCalledTimes(2)
expect(calledWith).toEqual([taskId1, taskId2])
await wait(200)
expect(fn).toHaveBeenCalledTimes(2)
expect(calledWith).toEqual([taskId1, taskId2])
})
})
Loading