diff --git a/packages/sdk/src/contracts/ERC1271ContractFacade.ts b/packages/sdk/src/contracts/ERC1271ContractFacade.ts index 1d0ea206c9..83e6e0959e 100644 --- a/packages/sdk/src/contracts/ERC1271ContractFacade.ts +++ b/packages/sdk/src/contracts/ERC1271ContractFacade.ts @@ -1,9 +1,9 @@ -import { BrandedString, EthereumAddress, MapWithTtl, UserID, hash, recoverSignerUserId, toUserId } from '@streamr/utils' +import { BrandedString, EthereumAddress, hash, MapWithTtl, recoverSignerUserId, toUserId, UserID } from '@streamr/utils' import { Lifecycle, scoped } from 'tsyringe' import { RpcProviderSource } from '../RpcProviderSource' import type { IERC1271 as ERC1271Contract } from '../ethereumArtifacts/IERC1271' import ERC1271ContractArtifact from '../ethereumArtifacts/IERC1271Abi.json' -import { Mapping } from '../utils/Mapping' +import { createLazyMap, Mapping } from '../utils/Mapping' import { ContractFactory } from './ContractFactory' export const SUCCESS_MAGIC_VALUE = '0x1626ba7e' // Magic value for success as defined by ERC-1271 @@ -18,6 +18,7 @@ function formCacheKey(contractAddress: EthereumAddress, signerUserId: UserID): C @scoped(Lifecycle.ContainerScoped) export class ERC1271ContractFacade { + private readonly contractsByAddress: Mapping<[EthereumAddress], ERC1271Contract> private readonly publisherCache = new MapWithTtl(() => CACHE_TTL) @@ -25,13 +26,15 @@ export class ERC1271ContractFacade { contractFactory: ContractFactory, rpcProviderSource: RpcProviderSource ) { - this.contractsByAddress = new Mapping<[EthereumAddress], ERC1271Contract>(async (address) => { - return contractFactory.createReadContract( - address, - ERC1271ContractArtifact, - rpcProviderSource.getProvider(), - 'erc1271Contract' - ) as ERC1271Contract + this.contractsByAddress = createLazyMap<[EthereumAddress], ERC1271Contract>({ + valueFactory: async (address) => { + return contractFactory.createReadContract( + address, + ERC1271ContractArtifact, + rpcProviderSource.getProvider(), + 'erc1271Contract' + ) as ERC1271Contract + } }) } diff --git a/packages/sdk/src/publish/MessageFactory.ts b/packages/sdk/src/publish/MessageFactory.ts index f0337d2aaa..ea980ce682 100644 --- a/packages/sdk/src/publish/MessageFactory.ts +++ b/packages/sdk/src/publish/MessageFactory.ts @@ -53,8 +53,10 @@ export class MessageFactory { this.groupKeyQueue = opts.groupKeyQueue this.signatureValidator = opts.signatureValidator this.messageSigner = opts.messageSigner - this.defaultMessageChainIds = new Mapping(async () => { - return createRandomMsgChainId() + this.defaultMessageChainIds = new Mapping({ + valueFactory: async () => { + return createRandomMsgChainId() + } }) } diff --git a/packages/sdk/src/publish/Publisher.ts b/packages/sdk/src/publish/Publisher.ts index b78c535617..7d47d2e46b 100644 --- a/packages/sdk/src/publish/Publisher.ts +++ b/packages/sdk/src/publish/Publisher.ts @@ -12,7 +12,7 @@ import { StreamMessage } from '../protocol/StreamMessage' import { MessageSigner } from '../signature/MessageSigner' import { SignatureValidator } from '../signature/SignatureValidator' import { StreamDefinition } from '../types' -import { Mapping } from '../utils/Mapping' +import { createLazyMap, Mapping } from '../utils/Mapping' import { GroupKeyQueue } from './GroupKeyQueue' import { MessageFactory } from './MessageFactory' @@ -68,11 +68,15 @@ export class Publisher { this.authentication = authentication this.signatureValidator = signatureValidator this.messageSigner = messageSigner - this.messageFactories = new Mapping(async (streamId: StreamID) => { - return this.createMessageFactory(streamId) + this.messageFactories = createLazyMap({ + valueFactory: async (streamId: StreamID) => { + return this.createMessageFactory(streamId) + } }) - this.groupKeyQueues = new Mapping(async (streamId: StreamID) => { - return GroupKeyQueue.createInstance(streamId, this.authentication, groupKeyManager) + this.groupKeyQueues = createLazyMap({ + valueFactory: async (streamId: StreamID) => { + return GroupKeyQueue.createInstance(streamId, this.authentication, groupKeyManager) + } }) } diff --git a/packages/sdk/src/subscribe/ordering/OrderMessages.ts b/packages/sdk/src/subscribe/ordering/OrderMessages.ts index e144d48330..1ed9076a6a 100644 --- a/packages/sdk/src/subscribe/ordering/OrderMessages.ts +++ b/packages/sdk/src/subscribe/ordering/OrderMessages.ts @@ -1,7 +1,7 @@ import { EthereumAddress, StreamID, StreamPartID, StreamPartIDUtils, UserID, executeSafePromise } from '@streamr/utils' import { StrictStreamrClientConfig } from '../../Config' import { StreamMessage } from '../../protocol/StreamMessage' -import { Mapping } from '../../utils/Mapping' +import { createLazyMap, Mapping } from '../../utils/Mapping' import { PushBuffer } from '../../utils/PushBuffer' import { Resends } from '../Resends' import { GapFiller } from './GapFiller' @@ -62,21 +62,23 @@ export class OrderMessages { resends: Resends, config: Pick ) { - this.chains = new Mapping(async (publisherId: UserID, msgChainId: string) => { - const chain = createMessageChain( - { - streamPartId, - publisherId, - msgChainId - }, - getStorageNodes, - onUnfillableGap, - resends, - config, - this.abortController.signal - ) - chain.on('orderedMessageAdded', (msg: StreamMessage) => this.onOrdered(msg)) - return chain + this.chains = createLazyMap({ + valueFactory: async (publisherId: UserID, msgChainId: string) => { + const chain = createMessageChain( + { + streamPartId, + publisherId, + msgChainId + }, + getStorageNodes, + onUnfillableGap, + resends, + config, + this.abortController.signal + ) + chain.on('orderedMessageAdded', (msg: StreamMessage) => this.onOrdered(msg)) + return chain + } }) } diff --git a/packages/sdk/src/utils/Mapping.ts b/packages/sdk/src/utils/Mapping.ts index 6f0f239f86..1018c65b76 100644 --- a/packages/sdk/src/utils/Mapping.ts +++ b/packages/sdk/src/utils/Mapping.ts @@ -1,4 +1,18 @@ import { formLookupKey } from './utils' +import LRU from '../../vendor/quick-lru' + +type KeyType = (string | number)[] + +interface BaseOptions { + valueFactory: (...args: K) => Promise +} + +interface CacheMapOptions extends BaseOptions { + maxSize: number + maxAge?: number +} + +type LazyMapOptions = BaseOptions // an wrapper object is used so that we can store undefined values interface ValueWrapper { @@ -6,19 +20,31 @@ interface ValueWrapper { } /* - * A map data structure which lazily evaluates values. A factory function - * is called to create a value when when an item is queried for the first time. - * The map stores the value and any subsequent call to get() returns - * the same value. + * A map that lazily creates values. The factory function is called only when a key + * is accessed for the first time. Subsequent calls to `get()` return the cached value + * unless it has been evicted due to `maxSize` or `maxAge` limits. */ -export class Mapping { +export class Mapping { - private readonly delegate: Map> = new Map() + private readonly delegate: Map> private readonly pendingPromises: Map> = new Map() - private readonly valueFactory: (...args: K) => Promise + private readonly opts: CacheMapOptions | LazyMapOptions - constructor(valueFactory: (...args: K) => Promise) { - this.valueFactory = valueFactory + /** + * Prefer constructing the class via createCacheMap() and createLazyMap() + * + * @internal + **/ + constructor(opts: CacheMapOptions | LazyMapOptions) { + if ('maxSize' in opts) { + this.delegate = new LRU>({ + maxSize: opts.maxSize, + maxAge: opts.maxAge + }) + } else { + this.delegate = new Map>() + } + this.opts = opts } async get(...args: K): Promise { @@ -29,7 +55,7 @@ export class Mapping { } else { let valueWrapper = this.delegate.get(key) if (valueWrapper === undefined) { - const promise = this.valueFactory(...args) + const promise = this.opts.valueFactory(...args) this.pendingPromises.set(key, promise) let value try { @@ -52,3 +78,11 @@ export class Mapping { return result } } + +export const createCacheMap = (opts: CacheMapOptions): Mapping => { + return new Mapping(opts) +} + +export const createLazyMap = (opts: LazyMapOptions): Mapping => { + return new Mapping(opts) +} diff --git a/packages/sdk/test/integration/parallel-key-exchange.test.ts b/packages/sdk/test/integration/parallel-key-exchange.test.ts index 796b92af0c..15203f4029 100644 --- a/packages/sdk/test/integration/parallel-key-exchange.test.ts +++ b/packages/sdk/test/integration/parallel-key-exchange.test.ts @@ -74,7 +74,7 @@ describe('parallel key exchange', () => { }), groupKeyQueue: await createGroupKeyQueue(authentication, publisher.groupKey), signatureValidator: mock(), - messageSigner: new MessageSigner(authentication), + messageSigner: new MessageSigner(authentication) }) for (let i = 0; i < MESSAGE_COUNT_PER_PUBLISHER; i++) { const msg = await messageFactory.createMessage({ diff --git a/packages/sdk/test/unit/Mapping.test.ts b/packages/sdk/test/unit/Mapping.test.ts index f30676478e..60ab14aaae 100644 --- a/packages/sdk/test/unit/Mapping.test.ts +++ b/packages/sdk/test/unit/Mapping.test.ts @@ -1,20 +1,25 @@ import { wait } from '@streamr/utils' -import { Mapping } from '../../src/utils/Mapping' +import { createCacheMap, createLazyMap } from '../../src/utils/Mapping' +import { range } from 'lodash' describe('Mapping', () => { it('create', async () => { - const mapping = new Mapping(async (p1: string, p2: number) => `${p1}${p2}`) + const mapping = createLazyMap({ + valueFactory: async (p1: string, p2: number) => `${p1}${p2}` + }) expect(await mapping.get('foo', 1)).toBe('foo1') expect(await mapping.get('bar', 2)).toBe('bar2') }) it('memorize', async () => { let evaluationIndex = 0 - const mapping = new Mapping(async (_p: string) => { - const result = evaluationIndex - evaluationIndex++ - return result + const mapping = createLazyMap({ + valueFactory: async (_p: string) => { + const result = evaluationIndex + evaluationIndex++ + return result + } }) expect(await mapping.get('foo')).toBe(0) expect(await mapping.get('foo')).toBe(0) @@ -25,7 +30,7 @@ describe('Mapping', () => { it('undefined', async () => { const valueFactory = jest.fn().mockResolvedValue(undefined) - const mapping = new Mapping(valueFactory) + const mapping = createLazyMap({ valueFactory }) expect(await mapping.get('foo')).toBe(undefined) expect(await mapping.get('foo')).toBe(undefined) expect(valueFactory).toHaveBeenCalledTimes(1) @@ -35,7 +40,7 @@ describe('Mapping', () => { const valueFactory = jest.fn().mockImplementation(async (p1: string, p2: number) => { throw new Error(`error ${p1}-${p2}`) }) - const mapping = new Mapping(valueFactory) + const mapping = createLazyMap({ valueFactory }) await expect(mapping.get('foo', 1)).rejects.toEqual(new Error('error foo-1')) await expect(mapping.get('foo', 1)).rejects.toEqual(new Error('error foo-1')) expect(valueFactory).toHaveBeenCalledTimes(2) @@ -45,7 +50,7 @@ describe('Mapping', () => { const valueFactory = jest.fn().mockImplementation((p1: string, p2: number) => { throw new Error(`error ${p1}-${p2}`) }) - const mapping = new Mapping(valueFactory) + const mapping = createLazyMap({ valueFactory }) await expect(mapping.get('foo', 1)).rejects.toEqual(new Error('error foo-1')) await expect(mapping.get('foo', 1)).rejects.toEqual(new Error('error foo-1')) expect(valueFactory).toHaveBeenCalledTimes(2) @@ -56,7 +61,7 @@ describe('Mapping', () => { await wait(50) return `${p1}${p2}` }) - const mapping = new Mapping(valueFactory) + const mapping = createLazyMap({ valueFactory }) const results = await Promise.all([ mapping.get('foo', 1), mapping.get('foo', 2), @@ -73,4 +78,39 @@ describe('Mapping', () => { 'foo1' ]) }) + + it('max size', async () => { + const SIZE = 3 + const valueFactory = jest.fn().mockImplementation(async (p1: string, p2: number) => { + return `${p1}${p2}` + }) + const mapping = createCacheMap({ valueFactory, maxSize: 3 }) + const ids = range(SIZE) + for (const id of ids) { + await mapping.get('foo', id) + } + expect(valueFactory).toHaveBeenCalledTimes(3) + // add a value which is not in cache + await mapping.get('foo', -1) + expect(valueFactory).toHaveBeenCalledTimes(4) + // one of the items was removed from cache when -1 was added, now we is re-add that + // (we don't know which item it was) + for (const id of ids) { + await mapping.get('foo', id) + } + expect(valueFactory).toHaveBeenCalledTimes(5) + }) + + it('max age', async () => { + const MAX_AGE = 100 + const JITTER = 50 + const valueFactory = jest.fn().mockImplementation(async (p1: string, p2: number) => { + return `${p1}${p2}` + }) + const mapping = createCacheMap({ valueFactory, maxSize: 999999, maxAge: MAX_AGE }) + await mapping.get('foo', 1) + await wait(MAX_AGE + JITTER) + await mapping.get('foo', 1) + expect(valueFactory).toHaveBeenCalledTimes(2) + }) })