Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions packages/client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ export class EthereumClient {
this.config.logger?.info('Setup networking and services.')

await this.service.start()
this.config.server && (await this.config.server.start())
// Only call bootstrap if servers are actually started
this.config.server && this.config.server.started && (await this.config.server.bootstrap())

this.started = true
}

Expand All @@ -154,15 +150,14 @@ export class EthereumClient {
}
this.config.events.emit(Event.CLIENT_SHUTDOWN)
await this.service.stop()
this.config.server && this.config.server.started && (await this.config.server.stop())
await this.config.networkWorker?.stop()
this.started = false
}

/**
*
* @returns the RLPx server (if it exists)
* @returns the network worker (if it exists)
*/
server() {
return this.config.server
networkWorker() {
return this.config.networkWorker
}
}
42 changes: 24 additions & 18 deletions packages/client/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import { type Address, BIGINT_0, BIGINT_1, BIGINT_2, BIGINT_256 } from '@ethereu
import { EventEmitter } from 'eventemitter3'
import { Level } from 'level'

import { RlpxServer } from './net/server/index.ts'
import { Event } from './types.ts'
import { isBrowser, short } from './util/index.ts'

import type { BlockHeader } from '@ethereumjs/block'
import type { VM, VMProfilerOpts } from '@ethereumjs/vm'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Logger } from './logging.ts'
import type { EventParams, MultiaddrLike, PrometheusMetrics } from './types.ts'
import { NetworkWorker } from './net/server/networkworker.ts'
import type { EventParams, PrometheusMetrics } from './types.ts'

export type DataDirectory = (typeof DataDirectory)[keyof typeof DataDirectory]

Expand Down Expand Up @@ -109,12 +109,6 @@ export interface ConfigOptions {
*/
multiaddrs?: Multiaddr[]

/**
* Transport servers (RLPx)
* Only used for testing purposes
*/
server?: RlpxServer

/**
* Save tx receipts and logs in the meta db (default: false)
*/
Expand Down Expand Up @@ -345,6 +339,12 @@ export interface ConfigOptions {
* Enables Prometheus Metrics that can be collected for monitoring client health
*/
prometheusMetrics?: PrometheusMetrics
/* Transport servers (RLPx) */
networkWorker?: NetworkWorker

clientFilter?: string[]

refreshInterval?: number
}

export class Config {
Expand Down Expand Up @@ -469,10 +469,16 @@ export class Config {
public readonly chainCommon: Common
public readonly execCommon: Common

public readonly server: RlpxServer | undefined = undefined

public readonly metrics: PrometheusMetrics | undefined

public readonly networkWorker: NetworkWorker | undefined = undefined

public readonly clientFilter?: string[]

public readonly refreshInterval: number

public readonly dnsNetworks: string[] = []

constructor(options: ConfigOptions = {}) {
this.events = new EventEmitter<EventParams>()

Expand Down Expand Up @@ -569,20 +575,20 @@ export class Config {

this.logger?.info(`Sync Mode ${this.syncmode}`)
if (this.syncmode !== SyncMode.None) {
if (options.server !== undefined) {
this.server = options.server
} else if (isBrowser() !== true) {
// Otherwise start server
const bootnodes: MultiaddrLike =
this.bootnodes ?? (this.chainCommon.bootstrapNodes() as any)
const dnsNetworks = options.dnsNetworks ?? this.chainCommon.dnsNetworks()
this.server = new RlpxServer({ config: this, bootnodes, dnsNetworks })
if (isBrowser() !== true) {
// Initialize network worker
this.dnsNetworks = options.dnsNetworks ?? this.chainCommon.dnsNetworks()
this.networkWorker = options.networkWorker ?? new NetworkWorker(this)
void this.networkWorker.start(this, this.bootnodes ?? [], this.dnsNetworks)
}
}

this.events.once(Event.CLIENT_SHUTDOWN, () => {
this.shutdown = true
})

this.clientFilter = options.clientFilter
this.refreshInterval = options.refreshInterval ?? 30000
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { EthereumClient } from './client.ts'
export * from './config.ts'
export { RlpxServer } from './net/server/rlpxserver.ts'
52 changes: 42 additions & 10 deletions packages/client/src/net/peerpool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ export class PeerPool {
* @emits {@link Event.POOL_PEER_BANNED}
*/
ban(peer: Peer, maxAge: number = 60000) {
if (!peer.server) {
if (!peer.id) {
return
}
peer.server.ban(peer.id, maxAge)
this.config.networkWorker?.ban(peer.id, maxAge)
this.remove(peer)
this.config.events.emit(Event.POOL_PEER_BANNED, peer)

Expand Down Expand Up @@ -246,17 +246,20 @@ export class PeerPool {
this.noPeerPeriods += 1
if (this.noPeerPeriods >= NO_PEER_PERIOD_COUNT) {
this.noPeerPeriods = 0
if (this.config.server !== undefined) {
this.config.logger?.info('Restarting RLPx server')
await this.config.server.stop()
await this.config.server.start()
this.config.logger?.info('Reinitiating server bootstrap')
await this.config.server.bootstrap()
if (this.config.networkWorker !== undefined) {
this.config.logger?.info('Restarting network worker')
await this.config.networkWorker.stop()
await this.config.networkWorker.start(
this.config,
this.config.bootnodes ?? [],
this.config.dnsNetworks ?? [],
)
this.config.logger?.info('Reinitiating worker bootstrap')
}
} else {
let tablesize: number | undefined = 0
if (this.config.server !== undefined && this.config.server.discovery) {
tablesize = this.config.server.dpt?.getPeers().length
if (this.config.networkWorker !== undefined && this.config.networkWorker.discovery) {
tablesize = this.config.networkWorker.dpt?.getPeers().length
this.config.logger?.info(`Looking for suited peers: peertablesize=${tablesize}`)
}
}
Expand All @@ -277,4 +280,33 @@ export class PeerPool {
}
}
}

async restart() {
if (this.config.networkWorker !== undefined) {
this.config.logger?.info('Restarting network worker')
await this.config.networkWorker.stop()
await this.config.networkWorker.start(
this.config,
this.config.bootnodes ?? [],
this.config.dnsNetworks ?? [],
)
this.config.logger?.info('Reinitiating worker bootstrap')
}
}

getPeers() {
return Array.from(this.pool.values())
}

getPeer(id: string) {
return this.pool.get(id)
}

addPeer(peer: Peer) {
this.pool.set(peer.id, peer)
}

removePeer(peer: Peer) {
this.pool.delete(peer.id)
}
}
9 changes: 9 additions & 0 deletions packages/client/src/net/protocol/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,13 @@ export class Protocol {
}
return payload
}

/**
* Handle incoming message
* @param message message to handle
* @param peer peer that sent the message
*/
async handle(_message: any, _peer: any): Promise<any> {
throw EthereumJSErrorWithoutCode('Unimplemented')
}
}
87 changes: 87 additions & 0 deletions packages/client/src/net/server/networkworker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
parentPort.postMessage({ type: 'LOG', event: 'Worker starting' })
import { Config } from '@ethereumjs/client/dist/esm/src/config.js'
import { RlpxServer } from '@ethereumjs/client/dist/esm/src/net/server/rlpxserver.js'
import { parentPort } from 'worker_threads'
let server = null

parentPort.on('message', async (data) => {
parentPort.postMessage({ type: 'LOG', event: 'Worker received message', data })
switch (data.type) {
case 'INIT': {
const { maxPeers, bootnodes, dnsNetworks, port, extIP } = data
// Create a minimal config object with only the necessary data
const config = new Config({
maxPeers,
port,
extIP,
events: {
emit: (event, ...args) => {
parentPort.postMessage({ type: 'EVENT', event, args })
},
},
})
server = new RlpxServer({ config, bootnodes, dnsNetworks })
await server.start()
await server.bootstrap()
parentPort.postMessage({ type: 'INIT_COMPLETE' })
break
}
case 'STOP': {
if (server !== null) {
parentPort.postMessage({ type: 'LOG', event: 'Worker stopping server...' })
await server.stop()
server = null
parentPort.postMessage({ type: 'LOG', event: 'Worker server stopped' })
}
parentPort.postMessage({ type: 'STOP_COMPLETE' })
break
}
case 'GET_NETWORK_INFO': {
if (server !== null) {
const info = server.getRlpxInfo()
parentPort.postMessage({ type: 'NETWORK_INFO', info })
}
break
}
case 'ADD_PEER': {
if (server !== null) {
try {
const peerInfo = await server.dpt.addPeer(data.peer)
parentPort.postMessage({ type: 'PEER_ADDED', peerInfo })
} catch (error) {
parentPort.postMessage({ type: 'PEER_ADD_ERROR', error: error.message })
}
}
break
}
case 'BAN_PEER': {
if (server !== null) {
try {
await server.ban(data.peerId, data.maxAge)
parentPort.postMessage({ type: 'PEER_BANNED', peerId: data.peerId })
} catch (error) {
parentPort.postMessage({ type: 'PEER_BAN_ERROR', error: error.message })
}
}
break
}
}
})

// Forward protocol messages to main thread
if (server !== null) {
server.config.events.on('PROTOCOL_MESSAGE', (message, protocol, peer) => {
parentPort.postMessage({
type: 'EVENT',
event: 'PROTOCOL_MESSAGE',
args: [message, protocol, peer],
})
})
server.config.events.on('SERVER_ERROR', (error, _server) => {
parentPort.postMessage({
type: 'EVENT',
event: 'SERVER_ERROR',
args: [error, 'rlpx'],
})
})
}
Loading
Loading