Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions packages/sdk/src/contracts/ERC1271ContractFacade.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { BrandedString, EthereumAddress, MapWithTtl, UserID, hash, recoverSignerUserId, toUserId } from '@streamr/utils'
import { Lifecycle, scoped } from 'tsyringe'
import { inject, Lifecycle, scoped } from 'tsyringe'
import { RpcProviderSource } from '../RpcProviderSource'
import type { IERC1271 as ERC1271Contract } from '../ethereumArtifacts/IERC1271'
import ERC1271ContractArtifact from '../ethereumArtifacts/IERC1271Abi.json'
import { Mapping } from '../utils/Mapping'
import { ContractFactory } from './ContractFactory'
import { ConfigInjectionToken, StrictStreamrClientConfig } from '../Config'

export const SUCCESS_MAGIC_VALUE = '0x1626ba7e' // Magic value for success as defined by ERC-1271

Expand All @@ -18,20 +19,25 @@ function formCacheKey(contractAddress: EthereumAddress, signerUserId: UserID): C

@scoped(Lifecycle.ContainerScoped)
export class ERC1271ContractFacade {

private readonly contractsByAddress: Mapping<[EthereumAddress], ERC1271Contract>
private readonly publisherCache = new MapWithTtl<CacheKey, boolean>(() => CACHE_TTL)

constructor(
contractFactory: ContractFactory,
rpcProviderSource: RpcProviderSource
rpcProviderSource: RpcProviderSource,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'cache'>
) {
this.contractsByAddress = new Mapping<[EthereumAddress], ERC1271Contract>(async (address) => {
return contractFactory.createReadContract(
address,
ERC1271ContractArtifact,
rpcProviderSource.getProvider(),
'erc1271Contract'
) as ERC1271Contract
this.contractsByAddress = new Mapping<[EthereumAddress], ERC1271Contract>({
valueFactory: async (address) => {
return contractFactory.createReadContract(
address,
ERC1271ContractArtifact,
rpcProviderSource.getProvider(),
'erc1271Contract'
) as ERC1271Contract
},
maxSize: config.cache.maxSize
})
}

Expand Down
66 changes: 31 additions & 35 deletions packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ import {
isPublicPermissionQuery,
streamPermissionToSolidityType
} from '../permission'
import { CachingMap } from '../utils/CachingMap'
import { filter, map } from '../utils/GeneratorUtils'
import { LoggerFactory } from '../utils/LoggerFactory'
import { Mapping } from '../utils/Mapping'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
import { ObservableContract, initContractEventGateway, waitForTx } from './contract'
Expand Down Expand Up @@ -102,12 +102,8 @@ const streamContractErrorProcessor = (err: any, streamId: StreamID, registry: st
}
}

const formCacheKeyPrefix = (streamId: StreamID): string => {
return `${streamId}|`
}

const invalidateCache = (cache: CachingMap<string, any, any>, streamId: StreamID): void => {
const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId))
const invalidateCache = (cache: Mapping<[StreamID, ...args: any[]], any>, streamId: StreamID): void => {
const matchTarget = (s: string) => s.startsWith(streamId)
cache.invalidate(matchTarget)
}

Expand All @@ -124,10 +120,10 @@ export class StreamRegistry {
private readonly config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>
private readonly authentication: Authentication
private readonly logger: Logger
private readonly metadataCache: CachingMap<string, StreamMetadata, [StreamID]>
private readonly publisherCache: CachingMap<string, boolean, [StreamID, UserID]>
private readonly subscriberCache: CachingMap<string, boolean, [StreamID, UserID]>
private readonly publicSubscribePermissionCache: CachingMap<string, boolean, [StreamID]>
private readonly metadataCache: Mapping<[StreamID], StreamMetadata>
private readonly publisherCache: Mapping<[StreamID, UserID], boolean>
private readonly subscriberCache: Mapping<[StreamID, UserID], boolean>
private readonly publicSubscribePermissionCache: Mapping<[StreamID], boolean>

/** @internal */
constructor(
Expand Down Expand Up @@ -168,33 +164,33 @@ export class StreamRegistry {
}),
loggerFactory
})
this.metadataCache = new CachingMap((streamId: StreamID) => {
return this.getStreamMetadata_nonCached(streamId)
}, {
...config.cache,
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
this.metadataCache = new Mapping({
valueFactory: (streamId: StreamID) => {
return this.getStreamMetadata_nonCached(streamId)
},
...config.cache
})
this.publisherCache = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
}, {
...config.cache,
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
this.publisherCache = new Mapping({
valueFactory: (streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
},
...config.cache
})
this.subscriberCache = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
}, {
...config.cache,
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
this.subscriberCache = new Mapping({
valueFactory: (streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
},
...config.cache
})
this.publicSubscribePermissionCache = new CachingMap((streamId: StreamID) => {
return this.hasPermission({
streamId,
public: true,
permission: StreamPermission.SUBSCRIBE
})
}, {
...config.cache,
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
this.publicSubscribePermissionCache = new Mapping({
valueFactory: (streamId: StreamID) => {
return this.hasPermission({
streamId,
public: true,
permission: StreamPermission.SUBSCRIBE
})
},
...config.cache
})
}

Expand Down
28 changes: 13 additions & 15 deletions packages/sdk/src/contracts/StreamStorageRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { LoggerFactory } from '../utils/LoggerFactory'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
import { initContractEventGateway, waitForTx } from './contract'
import { CachingMap } from '../utils/CachingMap'
import { Mapping } from '../utils/Mapping'

export interface StorageNodeAssignmentEvent {
readonly streamId: StreamID
Expand Down Expand Up @@ -45,7 +45,7 @@ export class StreamStorageRegistry {
private readonly config: Pick<StrictStreamrClientConfig, 'contracts' | '_timeouts'>
private readonly authentication: Authentication
private readonly logger: Logger
private readonly getStorageNodes_cached: CachingMap<string | typeof GET_ALL_STORAGE_NODES, EthereumAddress[], [string | undefined]>
private readonly storageNodesCache: Mapping<[string | typeof GET_ALL_STORAGE_NODES], EthereumAddress[]>

constructor(
streamIdBuilder: StreamIDBuilder,
Expand Down Expand Up @@ -78,13 +78,11 @@ export class StreamStorageRegistry {
)
}), config.contracts.pollInterval)
this.initStreamAssignmentEventListeners(eventEmitter, chainEventPoller, loggerFactory)
this.getStorageNodes_cached = new CachingMap((streamIdOrPath?: string) => {
return this.getStorageNodes_nonCached(streamIdOrPath)
}, {
...config.cache,
cacheKey: ([streamIdOrPath]): string | typeof GET_ALL_STORAGE_NODES => {
return streamIdOrPath ?? GET_ALL_STORAGE_NODES
}
this.storageNodesCache = new Mapping({
valueFactory: (query: string | typeof GET_ALL_STORAGE_NODES) => {
return this.getStorageNodes_nonCached(query)
},
...config.cache
})
}

Expand Down Expand Up @@ -135,7 +133,7 @@ export class StreamStorageRegistry {
await this.connectToContract()
const ethersOverrides = await getEthersOverrides(this.rpcProviderSource, this.config)
await waitForTx(this.streamStorageRegistryContract!.addStorageNode(streamId, nodeAddress, ethersOverrides))
this.getStorageNodes_cached.invalidate((key) => key === streamId)
this.storageNodesCache.invalidate((key) => key === streamId)
}

async removeStreamFromStorageNode(streamIdOrPath: string, nodeAddress: EthereumAddress): Promise<void> {
Expand All @@ -144,7 +142,7 @@ export class StreamStorageRegistry {
await this.connectToContract()
const ethersOverrides = await getEthersOverrides(this.rpcProviderSource, this.config)
await waitForTx(this.streamStorageRegistryContract!.removeStorageNode(streamId, nodeAddress, ethersOverrides))
this.getStorageNodes_cached.invalidate((key) => key === streamId)
this.storageNodesCache.invalidate((key) => key === streamId)
}

async isStoredStream(streamIdOrPath: string, nodeAddress: EthereumAddress): Promise<boolean> {
Expand Down Expand Up @@ -192,13 +190,13 @@ export class StreamStorageRegistry {
}

async getStorageNodes(streamIdOrPath?: string): Promise<EthereumAddress[]> {
return this.getStorageNodes_cached.get(streamIdOrPath)
return this.storageNodesCache.get(streamIdOrPath ?? GET_ALL_STORAGE_NODES)
}

private async getStorageNodes_nonCached(streamIdOrPath?: string): Promise<EthereumAddress[]> {
private async getStorageNodes_nonCached(query: string | typeof GET_ALL_STORAGE_NODES): Promise<EthereumAddress[]> {
let queryResults: NodeQueryResult[]
if (streamIdOrPath !== undefined) {
const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath)
if (query !== GET_ALL_STORAGE_NODES) {
const streamId = await this.streamIdBuilder.toStreamID(query)
this.logger.debug('Get storage nodes of stream', { streamId })
queryResults = await collect(this.theGraphClient.queryEntities<NodeQueryResult>(
(lastId: string, pageSize: number) => {
Expand Down
8 changes: 6 additions & 2 deletions packages/sdk/src/publish/MessageFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export interface MessageFactoryOptions {
groupKeyQueue: GroupKeyQueue
signatureValidator: SignatureValidator
messageSigner: MessageSigner
cacheMaxSize: number
}

export class MessageFactory {
Expand All @@ -53,8 +54,11 @@ export class MessageFactory {
this.groupKeyQueue = opts.groupKeyQueue
this.signatureValidator = opts.signatureValidator
this.messageSigner = opts.messageSigner
this.defaultMessageChainIds = new Mapping(async () => {
return createRandomMsgChainId()
this.defaultMessageChainIds = new Mapping({
valueFactory: async (_partition: number) => {
return createRandomMsgChainId()
},
maxSize: opts.cacheMaxSize
})
}

Expand Down
21 changes: 16 additions & 5 deletions packages/sdk/src/publish/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { StreamDefinition } from '../types'
import { Mapping } from '../utils/Mapping'
import { GroupKeyQueue } from './GroupKeyQueue'
import { MessageFactory } from './MessageFactory'
import { ConfigInjectionToken, StrictStreamrClientConfig } from '../Config'

export interface PublishMetadata {
timestamp?: string | number | Date
Expand Down Expand Up @@ -52,12 +53,14 @@ export class Publisher {
private readonly authentication: Authentication
private readonly signatureValidator: SignatureValidator
private readonly messageSigner: MessageSigner
private readonly config: Pick<StrictStreamrClientConfig, 'cache'>

constructor(
node: NetworkNodeFacade,
streamRegistry: StreamRegistry,
groupKeyManager: GroupKeyManager,
streamIdBuilder: StreamIDBuilder,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'cache'>,
@inject(AuthenticationInjectionToken) authentication: Authentication,
signatureValidator: SignatureValidator,
messageSigner: MessageSigner
Expand All @@ -68,11 +71,18 @@ export class Publisher {
this.authentication = authentication
this.signatureValidator = signatureValidator
this.messageSigner = messageSigner
this.messageFactories = new Mapping(async (streamId: StreamID) => {
return this.createMessageFactory(streamId)
this.config = config
this.messageFactories = new Mapping({
valueFactory: async (streamId: StreamID) => {
return this.createMessageFactory(streamId)
},
maxSize: config.cache.maxSize
})
this.groupKeyQueues = new Mapping(async (streamId: StreamID) => {
return GroupKeyQueue.createInstance(streamId, this.authentication, groupKeyManager)
this.groupKeyQueues = new Mapping({
valueFactory: async (streamId: StreamID) => {
return GroupKeyQueue.createInstance(streamId, this.authentication, groupKeyManager)
},
maxSize: config.cache.maxSize
})
}

Expand Down Expand Up @@ -130,7 +140,8 @@ export class Publisher {
streamRegistry: this.streamRegistry,
groupKeyQueue: await this.groupKeyQueues.get(streamId),
signatureValidator: this.signatureValidator,
messageSigner: this.messageSigner
messageSigner: this.messageSigner,
cacheMaxSize: this.config.cache.maxSize
})
}
}
2 changes: 1 addition & 1 deletion packages/sdk/src/subscribe/messagePipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export interface MessagePipelineOptions {
signatureValidator: SignatureValidator
groupKeyManager: GroupKeyManager
// eslint-disable-next-line max-len
config: Pick<StrictStreamrClientConfig, 'orderMessages' | 'gapFillTimeout' | 'retryResendAfter' | 'maxGapRequests' | 'gapFill' | 'gapFillStrategy'>
config: Pick<StrictStreamrClientConfig, 'orderMessages' | 'gapFillTimeout' | 'retryResendAfter' | 'maxGapRequests' | 'gapFill' | 'gapFillStrategy' | 'cache'>
destroySignal: DestroySignal
loggerFactory: LoggerFactory
}
Expand Down
36 changes: 20 additions & 16 deletions packages/sdk/src/subscribe/ordering/OrderMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,27 @@ export class OrderMessages {
getStorageNodes: (streamId: StreamID) => Promise<EthereumAddress[]>,
onUnfillableGap: ((gap: Gap) => void),
resends: Resends,
config: Pick<StrictStreamrClientConfig, 'gapFillTimeout' | 'retryResendAfter' | 'maxGapRequests' | 'gapFill' | 'gapFillStrategy'>
// eslint-disable-next-line max-len
config: Pick<StrictStreamrClientConfig, 'gapFillTimeout' | 'retryResendAfter' | 'maxGapRequests' | 'gapFill' | 'gapFillStrategy'> & { cache: { maxSize: number } }
) {
this.chains = new Mapping(async (publisherId: UserID, msgChainId: string) => {
const chain = createMessageChain(
{
streamPartId,
publisherId,
msgChainId
},
getStorageNodes,
onUnfillableGap,
resends,
config,
this.abortController.signal
)
chain.on('orderedMessageAdded', (msg: StreamMessage) => this.onOrdered(msg))
return chain
this.chains = new Mapping({
valueFactory: async (publisherId: UserID, msgChainId: string) => {
const chain = createMessageChain(
{
streamPartId,
publisherId,
msgChainId
},
getStorageNodes,
onUnfillableGap,
resends,
config,
this.abortController.signal
)
chain.on('orderedMessageAdded', (msg: StreamMessage) => this.onOrdered(msg))
return chain
},
maxSize: config.cache.maxSize
})
}

Expand Down
Loading