Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1486e63
add maxSize and maxAge options to Mapping
teogeb Nov 22, 2024
57a273b
configure Mapping cache constraints
teogeb Nov 22, 2024
24d3475
use Mapping instead of CacheMap
teogeb Nov 22, 2024
dfdee70
support symbol in formLookupKey()
teogeb Nov 22, 2024
2102214
fix invalidateCache()
teogeb Nov 22, 2024
63ab5d2
improve Mapping#invalidate type safety
teogeb Nov 22, 2024
6068e8a
return type of Mapping#values()
teogeb Nov 22, 2024
8a992ca
add test
teogeb Nov 22, 2024
62aef39
simplify invalidateCache()
teogeb Nov 22, 2024
42e6897
no not use size constraints
teogeb Nov 25, 2024
1a64eb3
Merge branch 'main' into sdk-mapping-cache-constraints
teogeb Nov 25, 2024
3bcddf4
builder functions
teogeb Nov 25, 2024
d98f91a
revert field name
teogeb Nov 25, 2024
6ecc3b6
fix variable name
teogeb Nov 25, 2024
7ccb819
builder functions
teogeb Nov 25, 2024
eb30e6d
revert field name
teogeb Nov 26, 2024
2f31d6b
fix variable name
teogeb Nov 26, 2024
25bca38
Merge branch 'sdk-mapping-cache-constraints' into sdk-use-Mapping-ins…
teogeb Nov 26, 2024
011bbff
Merge branch 'main' into sdk-use-Mapping-instead-of-CacheMap-in-regis…
teogeb Nov 26, 2024
fee4695
simplify field types
teogeb Nov 26, 2024
4a35a1a
TypeScript v5.1 compatibility
teogeb Nov 26, 2024
e5bfdcf
tmp: use createCacheMap() in CachingMap tests
teogeb Dec 2, 2024
cb3602e
Revert "tmp: use createCacheMap() in CachingMap tests"
teogeb Dec 2, 2024
8ef1d9c
Merge branch 'main' into sdk-use-Mapping-instead-of-CacheMap-in-regis…
teogeb Dec 3, 2024
9715473
Merge branch 'main' into sdk-use-Mapping-instead-of-CacheMap-in-regis…
teogeb Dec 3, 2024
ec762dc
get() uses array
teogeb Dec 3, 2024
ce37b3a
valueFactory() uses array
teogeb Dec 3, 2024
92a2fe4
use factory functions
teogeb Dec 3, 2024
860e412
allow primitive key
teogeb Dec 3, 2024
3b42314
fix publicSubscribePermissionCache valueFactory
teogeb Dec 3, 2024
639e477
add test
teogeb Dec 3, 2024
6605289
fix invalidateCache()
teogeb Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/sdk/src/contracts/ERC1271ContractFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ function formCacheKey(contractAddress: EthereumAddress, signerUserId: UserID): C
@scoped(Lifecycle.ContainerScoped)
export class ERC1271ContractFacade {

private readonly contractsByAddress: Mapping<[EthereumAddress], ERC1271Contract>
private readonly contractsByAddress: Mapping<EthereumAddress, ERC1271Contract>
private readonly publisherCache = new MapWithTtl<CacheKey, boolean>(() => CACHE_TTL)

constructor(
contractFactory: ContractFactory,
rpcProviderSource: RpcProviderSource
) {
this.contractsByAddress = createLazyMap<[EthereumAddress], ERC1271Contract>({
this.contractsByAddress = createLazyMap<EthereumAddress, ERC1271Contract>({
valueFactory: async (address) => {
return contractFactory.createReadContract(
address,
Expand Down
79 changes: 40 additions & 39 deletions packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ import {
isPublicPermissionQuery,
streamPermissionToSolidityType
} from '../permission'
import { CachingMap } from '../utils/CachingMap'
import { filter, map } from '../utils/GeneratorUtils'
import { LoggerFactory } from '../utils/LoggerFactory'
import { createCacheMap, Mapping } from '../utils/Mapping'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
import { ObservableContract, initContractEventGateway, waitForTx } from './contract'
Expand Down Expand Up @@ -102,13 +102,14 @@ const streamContractErrorProcessor = (err: any, streamId: StreamID, registry: st
}
}

const formCacheKeyPrefix = (streamId: StreamID): string => {
return `${streamId}|`
}

const invalidateCache = (cache: CachingMap<string, any, any>, streamId: StreamID): void => {
const matchTarget = (s: string) => s.startsWith(formCacheKeyPrefix(streamId))
cache.invalidate(matchTarget)
const invalidateCache = (
cache: { invalidate: (predicate: (key: StreamID | [StreamID, ...any[]]) => boolean) => void },
streamId: StreamID
): void => {
cache.invalidate((key) => {
const cachedStreamId = Array.isArray(key) ? key[0] : key
return cachedStreamId === streamId
})
}

@scoped(Lifecycle.ContainerScoped)
Expand All @@ -124,10 +125,10 @@ export class StreamRegistry {
private readonly config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>
private readonly authentication: Authentication
private readonly logger: Logger
private readonly metadataCache: CachingMap<string, StreamMetadata, [StreamID]>
private readonly publisherCache: CachingMap<string, boolean, [StreamID, UserID]>
private readonly subscriberCache: CachingMap<string, boolean, [StreamID, UserID]>
private readonly publicSubscribePermissionCache: CachingMap<string, boolean, [StreamID]>
private readonly metadataCache: Mapping<StreamID, StreamMetadata>
private readonly publisherCache: Mapping<[StreamID, UserID], boolean>
private readonly subscriberCache: Mapping<[StreamID, UserID], boolean>
private readonly publicSubscribePermissionCache: Mapping<StreamID, boolean>

/** @internal */
constructor(
Expand Down Expand Up @@ -168,33 +169,33 @@ export class StreamRegistry {
}),
loggerFactory
})
this.metadataCache = new CachingMap((streamId: StreamID) => {
return this.getStreamMetadata_nonCached(streamId)
}, {
...config.cache,
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
this.metadataCache = createCacheMap({
valueFactory: (streamId) => {
return this.getStreamMetadata_nonCached(streamId)
},
...config.cache
})
this.publisherCache = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
}, {
...config.cache,
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
this.publisherCache = createCacheMap({
valueFactory: ([streamId, userId]) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
},
...config.cache
})
this.subscriberCache = new CachingMap((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
}, {
...config.cache,
cacheKey: ([streamId, userId]) =>`${formCacheKeyPrefix(streamId)}${userId}`
this.subscriberCache = createCacheMap({
valueFactory: ([streamId, userId]) => {
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
},
...config.cache
})
this.publicSubscribePermissionCache = new CachingMap((streamId: StreamID) => {
return this.hasPermission({
streamId,
public: true,
permission: StreamPermission.SUBSCRIBE
})
}, {
...config.cache,
cacheKey: ([streamId]) => formCacheKeyPrefix(streamId)
this.publicSubscribePermissionCache = createCacheMap({
valueFactory: (streamId) => {
return this.hasPermission({
streamId,
public: true,
permission: StreamPermission.SUBSCRIBE
})
},
...config.cache
})
}

Expand Down Expand Up @@ -522,19 +523,19 @@ export class StreamRegistry {
}

isStreamPublisher(streamId: StreamID, userId: UserID): Promise<boolean> {
return this.publisherCache.get(streamId, userId)
return this.publisherCache.get([streamId, userId])
}

isStreamSubscriber(streamId: StreamID, userId: UserID): Promise<boolean> {
return this.subscriberCache.get(streamId, userId)
return this.subscriberCache.get([streamId, userId])
}

hasPublicSubscribePermission(streamId: StreamID): Promise<boolean> {
return this.publicSubscribePermissionCache.get(streamId)
}

populateMetadataCache(streamId: StreamID, metadata: StreamMetadata): void {
this.metadataCache.set([streamId], metadata)
this.metadataCache.set(streamId, metadata)
}

invalidatePermissionCaches(streamId: StreamID): void {
Expand Down
29 changes: 14 additions & 15 deletions packages/sdk/src/contracts/StreamStorageRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { LoggerFactory } from '../utils/LoggerFactory'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
import { initContractEventGateway, waitForTx } from './contract'
import { CachingMap } from '../utils/CachingMap'
import { createCacheMap, Mapping } from '../utils/Mapping'

export interface StorageNodeAssignmentEvent {
readonly streamId: StreamID
Expand Down Expand Up @@ -45,7 +45,7 @@ export class StreamStorageRegistry {
private readonly config: Pick<StrictStreamrClientConfig, 'contracts' | '_timeouts'>
private readonly authentication: Authentication
private readonly logger: Logger
private readonly getStorageNodes_cached: CachingMap<string | typeof GET_ALL_STORAGE_NODES, EthereumAddress[], [string | undefined]>
private readonly storageNodesCache: Mapping<StreamID | typeof GET_ALL_STORAGE_NODES, EthereumAddress[]>

constructor(
streamIdBuilder: StreamIDBuilder,
Expand Down Expand Up @@ -78,13 +78,11 @@ export class StreamStorageRegistry {
)
}), config.contracts.pollInterval)
this.initStreamAssignmentEventListeners(eventEmitter, chainEventPoller, loggerFactory)
this.getStorageNodes_cached = new CachingMap((streamIdOrPath?: string) => {
return this.getStorageNodes_nonCached(streamIdOrPath)
}, {
...config.cache,
cacheKey: ([streamIdOrPath]): string | typeof GET_ALL_STORAGE_NODES => {
return streamIdOrPath ?? GET_ALL_STORAGE_NODES
}
this.storageNodesCache = createCacheMap({
valueFactory: (query) => {
return this.getStorageNodes_nonCached(query)
},
...config.cache
})
}

Expand Down Expand Up @@ -135,7 +133,7 @@ export class StreamStorageRegistry {
await this.connectToContract()
const ethersOverrides = await getEthersOverrides(this.rpcProviderSource, this.config)
await waitForTx(this.streamStorageRegistryContract!.addStorageNode(streamId, nodeAddress, ethersOverrides))
this.getStorageNodes_cached.invalidate((key) => key === streamId)
this.storageNodesCache.invalidate((key) => key === streamId)
}

async removeStreamFromStorageNode(streamIdOrPath: string, nodeAddress: EthereumAddress): Promise<void> {
Expand All @@ -144,7 +142,7 @@ export class StreamStorageRegistry {
await this.connectToContract()
const ethersOverrides = await getEthersOverrides(this.rpcProviderSource, this.config)
await waitForTx(this.streamStorageRegistryContract!.removeStorageNode(streamId, nodeAddress, ethersOverrides))
this.getStorageNodes_cached.invalidate((key) => key === streamId)
this.storageNodesCache.invalidate((key) => key === streamId)
}

async isStoredStream(streamIdOrPath: string, nodeAddress: EthereumAddress): Promise<boolean> {
Expand Down Expand Up @@ -192,13 +190,14 @@ export class StreamStorageRegistry {
}

async getStorageNodes(streamIdOrPath?: string): Promise<EthereumAddress[]> {
return this.getStorageNodes_cached.get(streamIdOrPath)
const query = (streamIdOrPath !== undefined) ? await this.streamIdBuilder.toStreamID(streamIdOrPath) : GET_ALL_STORAGE_NODES
return this.storageNodesCache.get(query)
}

private async getStorageNodes_nonCached(streamIdOrPath?: string): Promise<EthereumAddress[]> {
private async getStorageNodes_nonCached(query: StreamID | typeof GET_ALL_STORAGE_NODES): Promise<EthereumAddress[]> {
let queryResults: NodeQueryResult[]
if (streamIdOrPath !== undefined) {
const streamId = await this.streamIdBuilder.toStreamID(streamIdOrPath)
if (query !== GET_ALL_STORAGE_NODES) {
const streamId = query
this.logger.debug('Get storage nodes of stream', { streamId })
queryResults = await collect(this.theGraphClient.queryEntities<NodeQueryResult>(
(lastId: string, pageSize: number) => {
Expand Down
8 changes: 4 additions & 4 deletions packages/sdk/src/publish/MessageFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
} from '../protocol/StreamMessage'
import { MessageSigner } from '../signature/MessageSigner'
import { SignatureValidator } from '../signature/SignatureValidator'
import { Mapping } from '../utils/Mapping'
import { createLazyMap, Mapping } from '../utils/Mapping'
import { formLookupKey } from '../utils/utils'
import { GroupKeyQueue } from './GroupKeyQueue'
import { PublishMetadata } from './Publisher'
Expand All @@ -37,7 +37,7 @@ export class MessageFactory {
private readonly streamId: StreamID
private readonly authentication: Authentication
private defaultPartition: number | undefined
private readonly defaultMessageChainIds: Mapping<[partition: number], string>
private readonly defaultMessageChainIds: Mapping<number, string>
private readonly prevMsgRefs: Map<string, MessageRef> = new Map()
// eslint-disable-next-line max-len
private readonly streamRegistry: Pick<StreamRegistry, 'getStreamMetadata' | 'hasPublicSubscribePermission' | 'isStreamPublisher' | 'invalidatePermissionCaches'>
Expand All @@ -53,7 +53,7 @@ export class MessageFactory {
this.groupKeyQueue = opts.groupKeyQueue
this.signatureValidator = opts.signatureValidator
this.messageSigner = opts.messageSigner
this.defaultMessageChainIds = new Mapping({
this.defaultMessageChainIds = createLazyMap<number, string>({
valueFactory: async () => {
return createRandomMsgChainId()
}
Expand Down Expand Up @@ -90,7 +90,7 @@ export class MessageFactory {
}

const msgChainId = metadata.msgChainId ?? await this.defaultMessageChainIds.get(partition)
const msgChainKey = formLookupKey(partition, msgChainId)
const msgChainKey = formLookupKey([partition, msgChainId])
const prevMsgRef = this.prevMsgRefs.get(msgChainKey)
const msgRef = createMessageRef(metadata.timestamp, prevMsgRef)
this.prevMsgRefs.set(msgChainKey, msgRef)
Expand Down
10 changes: 5 additions & 5 deletions packages/sdk/src/publish/Publisher.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { StreamID } from '@streamr/utils'
import isString from 'lodash/isString'
import pLimit from 'p-limit'
import { Lifecycle, inject, scoped } from 'tsyringe'
import { inject, Lifecycle, scoped } from 'tsyringe'
import { Authentication, AuthenticationInjectionToken } from '../Authentication'
import { NetworkNodeFacade } from '../NetworkNodeFacade'
import { StreamIDBuilder } from '../StreamIDBuilder'
Expand Down Expand Up @@ -43,8 +43,8 @@ const parseTimestamp = (metadata?: PublishMetadata): number => {
@scoped(Lifecycle.ContainerScoped)
export class Publisher {

private readonly messageFactories: Mapping<[streamId: StreamID], MessageFactory>
private readonly groupKeyQueues: Mapping<[streamId: StreamID], GroupKeyQueue>
private readonly messageFactories: Mapping<StreamID, MessageFactory>
private readonly groupKeyQueues: Mapping<StreamID, GroupKeyQueue>
private readonly concurrencyLimit = pLimit(1)
private readonly node: NetworkNodeFacade
private readonly streamRegistry: StreamRegistry
Expand All @@ -69,12 +69,12 @@ export class Publisher {
this.signatureValidator = signatureValidator
this.messageSigner = messageSigner
this.messageFactories = createLazyMap({
valueFactory: async (streamId: StreamID) => {
valueFactory: async (streamId) => {
return this.createMessageFactory(streamId)
}
})
this.groupKeyQueues = createLazyMap({
valueFactory: async (streamId: StreamID) => {
valueFactory: async (streamId) => {
return GroupKeyQueue.createInstance(streamId, this.authentication, groupKeyManager)
}
})
Expand Down
6 changes: 3 additions & 3 deletions packages/sdk/src/subscribe/ordering/OrderMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export class OrderMessages {
config: Pick<StrictStreamrClientConfig, 'gapFillTimeout' | 'retryResendAfter' | 'maxGapRequests' | 'gapFill' | 'gapFillStrategy'>
) {
this.chains = createLazyMap({
valueFactory: async (publisherId: UserID, msgChainId: string) => {
valueFactory: async ([publisherId, msgChainId]) => {
const chain = createMessageChain(
{
streamPartId,
Expand Down Expand Up @@ -99,10 +99,10 @@ export class OrderMessages {
if (this.abortController.signal.aborted) {
return
}
const chain = await this.chains.get(msg.getPublisherId(), msg.getMsgChainId())
const chain = await this.chains.get([msg.getPublisherId(), msg.getMsgChainId()])
chain.addMessage(msg)
}
await Promise.all(this.chains.values().map((chain) => chain.waitUntilIdle()))
await Promise.all([...this.chains.values()].map((chain) => chain.waitUntilIdle()))
this.outBuffer.endWrite()
} catch (err) {
this.outBuffer.endWrite(err)
Expand Down
Loading
Loading