Skip to content
11 changes: 7 additions & 4 deletions packages/sdk/src/StreamrClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { Stream } from './Stream'
import { StreamIDBuilder } from './StreamIDBuilder'
import { StreamMetadata, getPartitionCount } from './StreamMetadata'
import { StreamrClientError } from './StreamrClientError'
import { ChainEventPoller } from './contracts/ChainEventPoller'
import { ContractFactory } from './contracts/ContractFactory'
import { Operator } from './contracts/Operator'
import { OperatorRegistry } from './contracts/OperatorRegistry'
Expand All @@ -48,11 +49,11 @@ import { Subscription, SubscriptionEvents } from './subscribe/Subscription'
import { initResendSubscription } from './subscribe/resendSubscription'
import { waitForStorage } from './subscribe/waitForStorage'
import { StreamDefinition } from './types'
import { map } from './utils/GeneratorUtils'
import { LoggerFactory } from './utils/LoggerFactory'
import { addStreamToStorageNode } from './utils/addStreamToStorageNode'
import { pOnce } from './utils/promises'
import { convertPeerDescriptorToNetworkPeerDescriptor, createTheGraphClient } from './utils/utils'
import { addStreamToStorageNode } from './utils/addStreamToStorageNode'
import { map } from './utils/GeneratorUtils'

// TODO: this type only exists to enable tsdoc to generate proper documentation
export type SubscribeOptions = StreamDefinition & ExtraSubscribeOptions
Expand Down Expand Up @@ -96,6 +97,7 @@ export class StreamrClient {
private readonly operatorRegistry: OperatorRegistry
private readonly contractFactory: ContractFactory
private readonly localGroupKeyStore: LocalGroupKeyStore
private readonly chainEventPoller: ChainEventPoller
private readonly theGraphClient: TheGraphClient
private readonly streamIdBuilder: StreamIDBuilder
private readonly config: StrictStreamrClientConfig
Expand Down Expand Up @@ -132,6 +134,7 @@ export class StreamrClient {
this.operatorRegistry = container.resolve<OperatorRegistry>(OperatorRegistry)
this.contractFactory = container.resolve<ContractFactory>(ContractFactory)
this.localGroupKeyStore = container.resolve<LocalGroupKeyStore>(LocalGroupKeyStore)
this.chainEventPoller = container.resolve<ChainEventPoller>(ChainEventPoller)
this.streamIdBuilder = container.resolve<StreamIDBuilder>(StreamIDBuilder)
this.eventEmitter = container.resolve<StreamrClientEventEmitter>(StreamrClientEventEmitter)
this.destroySignal = container.resolve<DestroySignal>(DestroySignal)
Expand Down Expand Up @@ -770,12 +773,12 @@ export class StreamrClient {
operatorContractAddress,
this.contractFactory,
this.rpcProviderSource,
this.chainEventPoller,
this.theGraphClient,
this.authentication,
this.destroySignal,
this.loggerFactory,
() => this.getEthersOverrides(),
this.config.contracts.pollInterval
() => this.getEthersOverrides()
)
}

Expand Down
99 changes: 65 additions & 34 deletions packages/sdk/src/contracts/ChainEventPoller.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,51 @@
import { Logger, Multimap, randomString, scheduleAtInterval, wait } from '@streamr/utils'
import { Contract, EventLog, Provider } from 'ethers'
import { sample } from 'lodash'

type EventName = string
type Listener = (...args: any[]) => void
import { EthereumAddress, Logger, randomString, scheduleAtInterval, toEthereumAddress, wait } from '@streamr/utils'
import { AbstractProvider, EventFragment, Interface } from 'ethers'
import { remove, sample, uniq } from 'lodash'
import { inject, Lifecycle, scoped } from 'tsyringe'
import { ConfigInjectionToken, StrictStreamrClientConfig } from '../Config'
import { RpcProviderSource } from '../RpcProviderSource'

export interface EventListenerDefinition {
onEvent: (...args: any[]) => void
contractInterfaceFragment: EventFragment
contractAddress: EthereumAddress
}

const BLOCK_NUMBER_QUERY_RETRY_DELAY = 1000
export const POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD = 30

@scoped(Lifecycle.ContainerScoped)
export class ChainEventPoller {

private listeners: Multimap<EventName, Listener> = new Multimap()
private abortController?: AbortController
private contracts: Contract[]
private listeners: EventListenerDefinition[] = []
private providers: AbstractProvider[]
private pollInterval: number
private abortController?: AbortController

// all these contracts are actually the same chain contract (i.e. StreamRegistry), but have different providers
// connected to them
constructor(contracts: Contract[], pollInterval: number) {
this.contracts = contracts
this.pollInterval = pollInterval
constructor(
rpcProviderSource: RpcProviderSource,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'contracts'>
) {
this.providers = rpcProviderSource.getSubProviders()
this.pollInterval = config.contracts.pollInterval
}

on(eventName: string, listener: Listener): void {
const started = !this.listeners.isEmpty()
this.listeners.add(eventName, listener)
on(definition: EventListenerDefinition): void {
const started = this.listeners.length > 0
this.listeners.push(definition)
if (!started) {
this.start()
}
}

off(eventName: string, listener: Listener): void {
const started = !this.listeners.isEmpty()
this.listeners.remove(eventName, listener)
if (started && this.listeners.isEmpty()) {
off(definition: EventListenerDefinition): void {
const started = this.listeners.length > 0
remove(this.listeners, (l) => {
return (l.contractAddress === definition.contractAddress)
&& (l.contractInterfaceFragment.topicHash === definition.contractInterfaceFragment.topicHash)
&& (l.onEvent == definition.onEvent)
})
if (started && this.listeners.length === 0) {
this.abortController!.abort()
}
}
Expand All @@ -48,7 +60,7 @@ export class ChainEventPoller {
let fromBlock: number | undefined = undefined
do {
try {
fromBlock = await sample(this.getProviders())!.getBlockNumber()
fromBlock = await sample(this.providers)!.getBlockNumber()
} catch (err) {
logger.debug('Failed to query block number', { err })
await wait(BLOCK_NUMBER_QUERY_RETRY_DELAY) // TODO: pass signal?
Expand All @@ -57,25 +69,46 @@ export class ChainEventPoller {

let pollsSinceFromBlockUpdate = 0
await scheduleAtInterval(async () => {
const contract = sample(this.contracts)!
const eventNames = [...this.listeners.keys()]
const provider = sample(this.providers)!
const eventNames = this.listeners.map((l) => l.contractInterfaceFragment.name)
let newFromBlock = 0
let events: EventLog[] | undefined = undefined
let events: { contractAddress: EthereumAddress, name: string, args: any[], blockNumber: number }[] | undefined = undefined

try {
// If we haven't updated `fromBlock` for a while, fetch the latest block number explicitly. If
// `fromBlock` falls too much behind the current block number, the RPCs may start rejecting our
// eth_getLogs requests (presumably for performance reasons).
if (pollsSinceFromBlockUpdate >= POLLS_SINCE_LAST_FROM_BLOCK_UPDATE_THRESHOLD) {
newFromBlock = await contract.runner!.provider!.getBlockNumber() + 1
newFromBlock = await provider.getBlockNumber() + 1
logger.debug('Fetch next block number explicitly', { newFromBlock } )
if (abortController.signal.aborted) {
return
}
}

logger.debug('Polling', { fromBlock, eventNames })
events = await contract.queryFilter([eventNames], fromBlock) as EventLog[]
const filter = {
address: uniq(this.listeners.map((l) => l.contractAddress)),
topics: [uniq(this.listeners.map((l) => l.contractInterfaceFragment.topicHash))],
fromBlock
}
const logItems = await provider.getLogs(filter)
events = []
for (const logItem of logItems) {
const definition = this.listeners.find((l) => {
return (l.contractAddress === toEthereumAddress(logItem.address))
&& (l.contractInterfaceFragment.topicHash === logItem.topics[0])
})
if (definition !== undefined) {
const contractInterface = new Interface([definition.contractInterfaceFragment.format('minimal')])
const args = contractInterface.decodeEventLog(definition.contractInterfaceFragment.name, logItem.data, logItem.topics)
events.push({
contractAddress: definition.contractAddress,
name: definition.contractInterfaceFragment.name,
args,
blockNumber: logItem.blockNumber
})
}
}
logger.debug('Polled', { fromBlock, events: events.length })
} catch (err) {
logger.debug('Failed to poll', { reason: err?.reason, eventNames, fromBlock })
Expand All @@ -87,9 +120,11 @@ export class ChainEventPoller {

if (events !== undefined && events.length > 0) {
for (const event of events) {
const listeners = this.listeners.get(event.fragment.name)
const listeners = this.listeners.filter(
(l) => (l.contractAddress === event.contractAddress) && (l.contractInterfaceFragment.name === event.name)
)
for (const listener of listeners) {
listener(...event.args, event.blockNumber)
listener.onEvent(...event.args, event.blockNumber)
}
}
newFromBlock = Math.max(...events.map((e) => e.blockNumber)) + 1
Expand All @@ -108,8 +143,4 @@ export class ChainEventPoller {
}, this.pollInterval, true, abortController.signal)
})
}

private getProviders(): Provider[] {
return this.contracts.map((c) => c.runner!.provider!)
}
}
29 changes: 18 additions & 11 deletions packages/sdk/src/contracts/Operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
ObservableEventEmitter, StreamID, TheGraphClient,
collect, ensureValidStreamPartitionIndex, toEthereumAddress, toStreamID
} from '@streamr/utils'
import { Overrides } from 'ethers'
import { Interface, Overrides } from 'ethers'
import { z } from 'zod'
import { Authentication } from '../Authentication'
import { DestroySignal } from '../DestroySignal'
Expand Down Expand Up @@ -150,12 +150,12 @@ export class Operator {
contractAddress: EthereumAddress,
contractFactory: ContractFactory,
rpcProviderSource: RpcProviderSource,
chainEventPoller: ChainEventPoller,
theGraphClient: TheGraphClient,
authentication: Authentication,
destroySignal: DestroySignal,
loggerFactory: LoggerFactory,
getEthersOverrides: () => Promise<Overrides>,
eventPollInterval: number
) {
this.contractAddress = contractAddress
this.contractFactory = contractFactory
Expand All @@ -169,33 +169,37 @@ export class Operator {
this.theGraphClient = theGraphClient
this.authentication = authentication
this.getEthersOverrides = getEthersOverrides
this.initEventGateways(contractAddress, loggerFactory, eventPollInterval)
this.initEventGateways(contractAddress, chainEventPoller, loggerFactory)
destroySignal.onDestroy.listen(() => {
this.eventEmitter.removeAllListeners()
})
}

private initEventGateways(
contractAddress: EthereumAddress,
loggerFactory: LoggerFactory,
eventPollInterval: number
chainEventPoller: ChainEventPoller,
loggerFactory: LoggerFactory
): void {
const chainEventPoller = new ChainEventPoller(this.rpcProviderSource.getSubProviders().map((p) => {
return this.contractFactory.createEventContract(contractAddress, OperatorArtifact, p)
}), eventPollInterval)
const contractInterface = new Interface(OperatorArtifact)
const stakeEventTransformation = (sponsorship: string) => ({
sponsorship: toEthereumAddress(sponsorship)
})
initContractEventGateway({
sourceName: 'Staked',
sourceDefinition: {
contractInterfaceFragment: contractInterface.getEvent('Staked')!,
contractAddress
},
sourceEmitter: chainEventPoller,
targetName: 'staked',
targetEmitter: this.eventEmitter,
transformation: stakeEventTransformation,
loggerFactory
})
initContractEventGateway({
sourceName: 'Unstaked',
sourceDefinition: {
contractInterfaceFragment: contractInterface.getEvent('Unstaked')!,
contractAddress
},
sourceEmitter: chainEventPoller,
targetName: 'unstaked',
targetEmitter: this.eventEmitter,
Expand All @@ -219,7 +223,10 @@ export class Operator {
}
}
initContractEventGateway({
sourceName: 'ReviewRequest',
sourceDefinition: {
contractInterfaceFragment: contractInterface.getEvent('ReviewRequest')!,
contractAddress
},
sourceEmitter: chainEventPoller,
targetName: 'reviewRequested',
targetEmitter: this.eventEmitter,
Expand Down
11 changes: 6 additions & 5 deletions packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
toUserId,
until
} from '@streamr/utils'
import { ContractTransactionResponse } from 'ethers'
import { ContractTransactionResponse, Interface } from 'ethers'
import { intersection } from 'lodash'
import { Lifecycle, inject, scoped } from 'tsyringe'
import { Authentication, AuthenticationInjectionToken } from '../Authentication'
Expand Down Expand Up @@ -134,6 +134,7 @@ export class StreamRegistry {
constructor(
contractFactory: ContractFactory,
rpcProviderSource: RpcProviderSource,
chainEventPoller: ChainEventPoller,
theGraphClient: TheGraphClient,
streamIdBuilder: StreamIDBuilder,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>,
Expand All @@ -154,11 +155,11 @@ export class StreamRegistry {
this.rpcProviderSource.getProvider(),
'streamRegistry'
)
const chainEventPoller = new ChainEventPoller(this.rpcProviderSource.getSubProviders().map((p) => {
return contractFactory.createEventContract(toEthereumAddress(this.config.contracts.streamRegistryChainAddress), StreamRegistryArtifact, p)
}), config.contracts.pollInterval)
initContractEventGateway({
sourceName: 'StreamCreated',
sourceDefinition: {
contractInterfaceFragment: new Interface(StreamRegistryArtifact).getEvent('StreamCreated')!,
contractAddress: toEthereumAddress(this.config.contracts.streamRegistryChainAddress)
},
sourceEmitter: chainEventPoller,
targetName: 'streamCreated',
targetEmitter: eventEmitter,
Expand Down
24 changes: 13 additions & 11 deletions packages/sdk/src/contracts/StreamStorageRegistry.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EthereumAddress, Logger, StreamID, TheGraphClient, collect, toEthereumAddress, toStreamID } from '@streamr/utils'
import { Interface } from 'ethers'
import min from 'lodash/min'
import { Lifecycle, inject, scoped } from 'tsyringe'
import { Authentication, AuthenticationInjectionToken } from '../Authentication'
Expand All @@ -11,10 +12,10 @@ import StreamStorageRegistryArtifact from '../ethereumArtifacts/StreamStorageReg
import { getEthersOverrides } from '../ethereumUtils'
import { StreamrClientEventEmitter } from '../events'
import { LoggerFactory } from '../utils/LoggerFactory'
import { Mapping, createCacheMap } from '../utils/Mapping'
import { ChainEventPoller } from './ChainEventPoller'
import { ContractFactory } from './ContractFactory'
import { initContractEventGateway, waitForTx } from './contract'
import { createCacheMap, Mapping } from '../utils/Mapping'

export interface StorageNodeAssignmentEvent {
readonly streamId: StreamID
Expand Down Expand Up @@ -51,6 +52,7 @@ export class StreamStorageRegistry {
streamIdBuilder: StreamIDBuilder,
contractFactory: ContractFactory,
rpcProviderSource: RpcProviderSource,
chainEventPoller: ChainEventPoller,
theGraphClient: TheGraphClient,
@inject(ConfigInjectionToken) config: Pick<StrictStreamrClientConfig, 'contracts' | 'cache' | '_timeouts'>,
@inject(AuthenticationInjectionToken) authentication: Authentication,
Expand All @@ -70,13 +72,6 @@ export class StreamStorageRegistry {
rpcProviderSource.getProvider(),
'streamStorageRegistry'
) as StreamStorageRegistryContract
const chainEventPoller = new ChainEventPoller(this.rpcProviderSource.getSubProviders().map((p) => {
return contractFactory.createEventContract(
toEthereumAddress(this.config.contracts.streamStorageRegistryChainAddress),
StreamStorageRegistryArtifact,
p
)
}), config.contracts.pollInterval)
this.initStreamAssignmentEventListeners(eventEmitter, chainEventPoller, loggerFactory)
this.storageNodesCache = createCacheMap({
valueFactory: (query) => {
Expand All @@ -86,7 +81,6 @@ export class StreamStorageRegistry {
})
}

// eslint-disable-next-line class-methods-use-this
private initStreamAssignmentEventListeners(
eventEmitter: StreamrClientEventEmitter,
chainEventPoller: ChainEventPoller,
Expand All @@ -97,16 +91,24 @@ export class StreamStorageRegistry {
nodeAddress: toEthereumAddress(nodeAddress),
blockNumber
})
const contractAddress = toEthereumAddress(this.config.contracts.streamStorageRegistryChainAddress)
const contractInterface = new Interface(StreamStorageRegistryArtifact)
initContractEventGateway({
sourceName: 'Added',
sourceDefinition: {
contractInterfaceFragment: contractInterface.getEvent('Added')!,
contractAddress
},
sourceEmitter: chainEventPoller,
targetName: 'streamAddedToStorageNode',
targetEmitter: eventEmitter,
transformation,
loggerFactory
})
initContractEventGateway({
sourceName: 'Removed',
sourceDefinition: {
contractInterfaceFragment: contractInterface.getEvent('Removed')!,
contractAddress
},
sourceEmitter: chainEventPoller,
targetName: 'streamRemovedFromFromStorageNode',
targetEmitter: eventEmitter,
Expand Down
Loading
Loading