Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions packages/sdk/src/contracts/ERC1271ContractFacade.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { BrandedString, EthereumAddress, MapWithTtl, UserID, hash, recoverSignerUserId, toUserId } from '@streamr/utils'
import { BrandedString, EthereumAddress, hash, MapWithTtl, recoverSignerUserId, toUserId, UserID } from '@streamr/utils'
import { Lifecycle, scoped } from 'tsyringe'
import { RpcProviderSource } from '../RpcProviderSource'
import type { IERC1271 as ERC1271Contract } from '../ethereumArtifacts/IERC1271'
import ERC1271ContractArtifact from '../ethereumArtifacts/IERC1271Abi.json'
import { Mapping } from '../utils/Mapping'
import { createLazyMap, Mapping } from '../utils/Mapping'
import { ContractFactory } from './ContractFactory'

export const SUCCESS_MAGIC_VALUE = '0x1626ba7e' // Magic value for success as defined by ERC-1271
Expand All @@ -18,20 +18,23 @@ function formCacheKey(contractAddress: EthereumAddress, signerUserId: UserID): C

@scoped(Lifecycle.ContainerScoped)
export class ERC1271ContractFacade {

private readonly contractsByAddress: Mapping<[EthereumAddress], ERC1271Contract>
private readonly publisherCache = new MapWithTtl<CacheKey, boolean>(() => CACHE_TTL)

constructor(
contractFactory: ContractFactory,
rpcProviderSource: RpcProviderSource
) {
this.contractsByAddress = new Mapping<[EthereumAddress], ERC1271Contract>(async (address) => {
return contractFactory.createReadContract(
address,
ERC1271ContractArtifact,
rpcProviderSource.getProvider(),
'erc1271Contract'
) as ERC1271Contract
this.contractsByAddress = createLazyMap<[EthereumAddress], ERC1271Contract>({
valueFactory: async (address) => {
return contractFactory.createReadContract(
address,
ERC1271ContractArtifact,
rpcProviderSource.getProvider(),
'erc1271Contract'
) as ERC1271Contract
}
})
}

Expand Down
6 changes: 4 additions & 2 deletions packages/sdk/src/publish/MessageFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ export class MessageFactory {
this.groupKeyQueue = opts.groupKeyQueue
this.signatureValidator = opts.signatureValidator
this.messageSigner = opts.messageSigner
this.defaultMessageChainIds = new Mapping(async () => {
return createRandomMsgChainId()
this.defaultMessageChainIds = new Mapping({
valueFactory: async () => {
return createRandomMsgChainId()
}
})
}

Expand Down
14 changes: 9 additions & 5 deletions packages/sdk/src/publish/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { StreamMessage } from '../protocol/StreamMessage'
import { MessageSigner } from '../signature/MessageSigner'
import { SignatureValidator } from '../signature/SignatureValidator'
import { StreamDefinition } from '../types'
import { Mapping } from '../utils/Mapping'
import { createLazyMap, Mapping } from '../utils/Mapping'
import { GroupKeyQueue } from './GroupKeyQueue'
import { MessageFactory } from './MessageFactory'

Expand Down Expand Up @@ -68,11 +68,15 @@ export class Publisher {
this.authentication = authentication
this.signatureValidator = signatureValidator
this.messageSigner = messageSigner
this.messageFactories = new Mapping(async (streamId: StreamID) => {
return this.createMessageFactory(streamId)
this.messageFactories = createLazyMap({
valueFactory: async (streamId: StreamID) => {
return this.createMessageFactory(streamId)
}
})
this.groupKeyQueues = new Mapping(async (streamId: StreamID) => {
return GroupKeyQueue.createInstance(streamId, this.authentication, groupKeyManager)
this.groupKeyQueues = createLazyMap({
valueFactory: async (streamId: StreamID) => {
return GroupKeyQueue.createInstance(streamId, this.authentication, groupKeyManager)
}
})
}

Expand Down
34 changes: 18 additions & 16 deletions packages/sdk/src/subscribe/ordering/OrderMessages.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EthereumAddress, StreamID, StreamPartID, StreamPartIDUtils, UserID, executeSafePromise } from '@streamr/utils'
import { StrictStreamrClientConfig } from '../../Config'
import { StreamMessage } from '../../protocol/StreamMessage'
import { Mapping } from '../../utils/Mapping'
import { createLazyMap, Mapping } from '../../utils/Mapping'
import { PushBuffer } from '../../utils/PushBuffer'
import { Resends } from '../Resends'
import { GapFiller } from './GapFiller'
Expand Down Expand Up @@ -62,21 +62,23 @@ export class OrderMessages {
resends: Resends,
config: Pick<StrictStreamrClientConfig, 'gapFillTimeout' | 'retryResendAfter' | 'maxGapRequests' | 'gapFill' | 'gapFillStrategy'>
) {
this.chains = new Mapping(async (publisherId: UserID, msgChainId: string) => {
const chain = createMessageChain(
{
streamPartId,
publisherId,
msgChainId
},
getStorageNodes,
onUnfillableGap,
resends,
config,
this.abortController.signal
)
chain.on('orderedMessageAdded', (msg: StreamMessage) => this.onOrdered(msg))
return chain
this.chains = createLazyMap({
valueFactory: async (publisherId: UserID, msgChainId: string) => {
const chain = createMessageChain(
{
streamPartId,
publisherId,
msgChainId
},
getStorageNodes,
onUnfillableGap,
resends,
config,
this.abortController.signal
)
chain.on('orderedMessageAdded', (msg: StreamMessage) => this.onOrdered(msg))
return chain
}
})
}

Expand Down
54 changes: 44 additions & 10 deletions packages/sdk/src/utils/Mapping.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,50 @@
import { formLookupKey } from './utils'
import LRU from '../../vendor/quick-lru'

type KeyType = (string | number)[]

interface BaseOptions<K extends KeyType, V> {
valueFactory: (...args: K) => Promise<V>
}

interface CacheMapOptions<K extends KeyType, V> extends BaseOptions<K, V> {
maxSize: number
maxAge?: number
}

type LazyMapOptions<K extends KeyType, V> = BaseOptions<K, V>

// an wrapper object is used so that we can store undefined values
interface ValueWrapper<V> {
value: V
}

/*
* A map data structure which lazily evaluates values. A factory function
* is called to create a value when when an item is queried for the first time.
* The map stores the value and any subsequent call to get() returns
* the same value.
* A map that lazily creates values. The factory function is called only when a key
* is accessed for the first time. Subsequent calls to `get()` return the cached value
* unless it has been evicted due to `maxSize` or `maxAge` limits.
*/
export class Mapping<K extends (string | number)[], V> {
export class Mapping<K extends KeyType, V> {

private readonly delegate: Map<string, ValueWrapper<V>> = new Map()
private readonly delegate: Map<string, ValueWrapper<V>>
private readonly pendingPromises: Map<string, Promise<V>> = new Map()
private readonly valueFactory: (...args: K) => Promise<V>
private readonly opts: CacheMapOptions<K, V> | LazyMapOptions<K, V>

constructor(valueFactory: (...args: K) => Promise<V>) {
this.valueFactory = valueFactory
/**
* Prefer constructing the class via createCacheMap() and createLazyMap()
*
* @internal
**/
constructor(opts: CacheMapOptions<K, V> | LazyMapOptions<K, V>) {
if ('maxSize' in opts) {
this.delegate = new LRU<string, ValueWrapper<V>>({
maxSize: opts.maxSize,
maxAge: opts.maxAge
})
} else {
this.delegate = new Map<string, ValueWrapper<V>>()
}
this.opts = opts
}

async get(...args: K): Promise<V> {
Expand All @@ -29,7 +55,7 @@ export class Mapping<K extends (string | number)[], V> {
} else {
let valueWrapper = this.delegate.get(key)
if (valueWrapper === undefined) {
const promise = this.valueFactory(...args)
const promise = this.opts.valueFactory(...args)
this.pendingPromises.set(key, promise)
let value
try {
Expand All @@ -52,3 +78,11 @@ export class Mapping<K extends (string | number)[], V> {
return result
}
}

export const createCacheMap = <K extends KeyType, V>(opts: CacheMapOptions<K, V>): Mapping<K, V> => {
return new Mapping<K, V>(opts)
}

export const createLazyMap = <K extends KeyType, V>(opts: LazyMapOptions<K, V>): Mapping<K, V> => {
return new Mapping<K, V>(opts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ describe('parallel key exchange', () => {
}),
groupKeyQueue: await createGroupKeyQueue(authentication, publisher.groupKey),
signatureValidator: mock<SignatureValidator>(),
messageSigner: new MessageSigner(authentication),
messageSigner: new MessageSigner(authentication)
})
for (let i = 0; i < MESSAGE_COUNT_PER_PUBLISHER; i++) {
const msg = await messageFactory.createMessage({
Expand Down
60 changes: 50 additions & 10 deletions packages/sdk/test/unit/Mapping.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import { wait } from '@streamr/utils'
import { Mapping } from '../../src/utils/Mapping'
import { createCacheMap, createLazyMap } from '../../src/utils/Mapping'
import { range } from 'lodash'

describe('Mapping', () => {

it('create', async () => {
const mapping = new Mapping(async (p1: string, p2: number) => `${p1}${p2}`)
const mapping = createLazyMap({
valueFactory: async (p1: string, p2: number) => `${p1}${p2}`
})
expect(await mapping.get('foo', 1)).toBe('foo1')
expect(await mapping.get('bar', 2)).toBe('bar2')
})

it('memorize', async () => {
let evaluationIndex = 0
const mapping = new Mapping(async (_p: string) => {
const result = evaluationIndex
evaluationIndex++
return result
const mapping = createLazyMap({
valueFactory: async (_p: string) => {
const result = evaluationIndex
evaluationIndex++
return result
}
})
expect(await mapping.get('foo')).toBe(0)
expect(await mapping.get('foo')).toBe(0)
Expand All @@ -25,7 +30,7 @@ describe('Mapping', () => {

it('undefined', async () => {
const valueFactory = jest.fn().mockResolvedValue(undefined)
const mapping = new Mapping(valueFactory)
const mapping = createLazyMap({ valueFactory })
expect(await mapping.get('foo')).toBe(undefined)
expect(await mapping.get('foo')).toBe(undefined)
expect(valueFactory).toHaveBeenCalledTimes(1)
Expand All @@ -35,7 +40,7 @@ describe('Mapping', () => {
const valueFactory = jest.fn().mockImplementation(async (p1: string, p2: number) => {
throw new Error(`error ${p1}-${p2}`)
})
const mapping = new Mapping(valueFactory)
const mapping = createLazyMap({ valueFactory })
await expect(mapping.get('foo', 1)).rejects.toEqual(new Error('error foo-1'))
await expect(mapping.get('foo', 1)).rejects.toEqual(new Error('error foo-1'))
expect(valueFactory).toHaveBeenCalledTimes(2)
Expand All @@ -45,7 +50,7 @@ describe('Mapping', () => {
const valueFactory = jest.fn().mockImplementation((p1: string, p2: number) => {
throw new Error(`error ${p1}-${p2}`)
})
const mapping = new Mapping(valueFactory)
const mapping = createLazyMap({ valueFactory })
await expect(mapping.get('foo', 1)).rejects.toEqual(new Error('error foo-1'))
await expect(mapping.get('foo', 1)).rejects.toEqual(new Error('error foo-1'))
expect(valueFactory).toHaveBeenCalledTimes(2)
Expand All @@ -56,7 +61,7 @@ describe('Mapping', () => {
await wait(50)
return `${p1}${p2}`
})
const mapping = new Mapping(valueFactory)
const mapping = createLazyMap({ valueFactory })
const results = await Promise.all([
mapping.get('foo', 1),
mapping.get('foo', 2),
Expand All @@ -73,4 +78,39 @@ describe('Mapping', () => {
'foo1'
])
})

it('max size', async () => {
const SIZE = 3
const valueFactory = jest.fn().mockImplementation(async (p1: string, p2: number) => {
return `${p1}${p2}`
})
const mapping = createCacheMap({ valueFactory, maxSize: 3 })
const ids = range(SIZE)
for (const id of ids) {
await mapping.get('foo', id)
}
expect(valueFactory).toHaveBeenCalledTimes(3)
// add a value which is not in cache
await mapping.get('foo', -1)
expect(valueFactory).toHaveBeenCalledTimes(4)
// one of the items was removed from cache when -1 was added, now we is re-add that
// (we don't know which item it was)
for (const id of ids) {
await mapping.get('foo', id)
}
expect(valueFactory).toHaveBeenCalledTimes(5)
})

it('max age', async () => {
const MAX_AGE = 100
const JITTER = 50
const valueFactory = jest.fn().mockImplementation(async (p1: string, p2: number) => {
return `${p1}${p2}`
})
const mapping = createCacheMap({ valueFactory, maxSize: 999999, maxAge: MAX_AGE })
await mapping.get('foo', 1)
await wait(MAX_AGE + JITTER)
await mapping.get('foo', 1)
expect(valueFactory).toHaveBeenCalledTimes(2)
})
})
Loading