diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index bd30bdb2a8c..aefe938591d 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -138,10 +138,6 @@ export class EthereumClient { this.config.logger?.info('Setup networking and services.') await this.service.start() - this.config.server && (await this.config.server.start()) - // Only call bootstrap if servers are actually started - this.config.server && this.config.server.started && (await this.config.server.bootstrap()) - this.started = true } @@ -154,15 +150,14 @@ export class EthereumClient { } this.config.events.emit(Event.CLIENT_SHUTDOWN) await this.service.stop() - this.config.server && this.config.server.started && (await this.config.server.stop()) + await this.config.networkWorker?.stop() this.started = false } /** - * - * @returns the RLPx server (if it exists) + * @returns the network worker (if it exists) */ - server() { - return this.config.server + networkWorker() { + return this.config.networkWorker } } diff --git a/packages/client/src/config.ts b/packages/client/src/config.ts index a99dbfe9802..b7329d81d2f 100644 --- a/packages/client/src/config.ts +++ b/packages/client/src/config.ts @@ -4,7 +4,6 @@ import { type Address, BIGINT_0, BIGINT_1, BIGINT_2, BIGINT_256 } from '@ethereu import { EventEmitter } from 'eventemitter3' import { Level } from 'level' -import { RlpxServer } from './net/server/index.ts' import { Event } from './types.ts' import { isBrowser, short } from './util/index.ts' @@ -12,7 +11,8 @@ import type { BlockHeader } from '@ethereumjs/block' import type { VM, VMProfilerOpts } from '@ethereumjs/vm' import type { Multiaddr } from '@multiformats/multiaddr' import type { Logger } from './logging.ts' -import type { EventParams, MultiaddrLike, PrometheusMetrics } from './types.ts' +import { NetworkWorker } from './net/server/networkworker.ts' +import type { EventParams, PrometheusMetrics } from './types.ts' export type DataDirectory = (typeof DataDirectory)[keyof typeof DataDirectory] @@ -109,12 +109,6 @@ export interface ConfigOptions { */ multiaddrs?: Multiaddr[] - /** - * Transport servers (RLPx) - * Only used for testing purposes - */ - server?: RlpxServer - /** * Save tx receipts and logs in the meta db (default: false) */ @@ -345,6 +339,12 @@ export interface ConfigOptions { * Enables Prometheus Metrics that can be collected for monitoring client health */ prometheusMetrics?: PrometheusMetrics + /* Transport servers (RLPx) */ + networkWorker?: NetworkWorker + + clientFilter?: string[] + + refreshInterval?: number } export class Config { @@ -469,10 +469,16 @@ export class Config { public readonly chainCommon: Common public readonly execCommon: Common - public readonly server: RlpxServer | undefined = undefined - public readonly metrics: PrometheusMetrics | undefined + public readonly networkWorker: NetworkWorker | undefined = undefined + + public readonly clientFilter?: string[] + + public readonly refreshInterval: number + + public readonly dnsNetworks: string[] = [] + constructor(options: ConfigOptions = {}) { this.events = new EventEmitter() @@ -569,20 +575,20 @@ export class Config { this.logger?.info(`Sync Mode ${this.syncmode}`) if (this.syncmode !== SyncMode.None) { - if (options.server !== undefined) { - this.server = options.server - } else if (isBrowser() !== true) { - // Otherwise start server - const bootnodes: MultiaddrLike = - this.bootnodes ?? (this.chainCommon.bootstrapNodes() as any) - const dnsNetworks = options.dnsNetworks ?? this.chainCommon.dnsNetworks() - this.server = new RlpxServer({ config: this, bootnodes, dnsNetworks }) + if (isBrowser() !== true) { + // Initialize network worker + this.dnsNetworks = options.dnsNetworks ?? this.chainCommon.dnsNetworks() + this.networkWorker = options.networkWorker ?? new NetworkWorker(this) + void this.networkWorker.start(this, this.bootnodes ?? [], this.dnsNetworks) } } this.events.once(Event.CLIENT_SHUTDOWN, () => { this.shutdown = true }) + + this.clientFilter = options.clientFilter + this.refreshInterval = options.refreshInterval ?? 30000 } /** diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 13a27d74f09..ca0bb8eb2ab 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -1,2 +1,3 @@ export { EthereumClient } from './client.ts' export * from './config.ts' +export { RlpxServer } from './net/server/rlpxserver.ts' diff --git a/packages/client/src/net/peerpool.ts b/packages/client/src/net/peerpool.ts index 30727277d44..f4a6a1bc331 100644 --- a/packages/client/src/net/peerpool.ts +++ b/packages/client/src/net/peerpool.ts @@ -191,10 +191,10 @@ export class PeerPool { * @emits {@link Event.POOL_PEER_BANNED} */ ban(peer: Peer, maxAge: number = 60000) { - if (!peer.server) { + if (!peer.id) { return } - peer.server.ban(peer.id, maxAge) + this.config.networkWorker?.ban(peer.id, maxAge) this.remove(peer) this.config.events.emit(Event.POOL_PEER_BANNED, peer) @@ -246,17 +246,20 @@ export class PeerPool { this.noPeerPeriods += 1 if (this.noPeerPeriods >= NO_PEER_PERIOD_COUNT) { this.noPeerPeriods = 0 - if (this.config.server !== undefined) { - this.config.logger?.info('Restarting RLPx server') - await this.config.server.stop() - await this.config.server.start() - this.config.logger?.info('Reinitiating server bootstrap') - await this.config.server.bootstrap() + if (this.config.networkWorker !== undefined) { + this.config.logger?.info('Restarting network worker') + await this.config.networkWorker.stop() + await this.config.networkWorker.start( + this.config, + this.config.bootnodes ?? [], + this.config.dnsNetworks ?? [], + ) + this.config.logger?.info('Reinitiating worker bootstrap') } } else { let tablesize: number | undefined = 0 - if (this.config.server !== undefined && this.config.server.discovery) { - tablesize = this.config.server.dpt?.getPeers().length + if (this.config.networkWorker !== undefined && this.config.networkWorker.discovery) { + tablesize = this.config.networkWorker.dpt?.getPeers().length this.config.logger?.info(`Looking for suited peers: peertablesize=${tablesize}`) } } @@ -277,4 +280,33 @@ export class PeerPool { } } } + + async restart() { + if (this.config.networkWorker !== undefined) { + this.config.logger?.info('Restarting network worker') + await this.config.networkWorker.stop() + await this.config.networkWorker.start( + this.config, + this.config.bootnodes ?? [], + this.config.dnsNetworks ?? [], + ) + this.config.logger?.info('Reinitiating worker bootstrap') + } + } + + getPeers() { + return Array.from(this.pool.values()) + } + + getPeer(id: string) { + return this.pool.get(id) + } + + addPeer(peer: Peer) { + this.pool.set(peer.id, peer) + } + + removePeer(peer: Peer) { + this.pool.delete(peer.id) + } } diff --git a/packages/client/src/net/protocol/protocol.ts b/packages/client/src/net/protocol/protocol.ts index 14bc314fed8..67f27349936 100644 --- a/packages/client/src/net/protocol/protocol.ts +++ b/packages/client/src/net/protocol/protocol.ts @@ -144,4 +144,13 @@ export class Protocol { } return payload } + + /** + * Handle incoming message + * @param message message to handle + * @param peer peer that sent the message + */ + async handle(_message: any, _peer: any): Promise { + throw EthereumJSErrorWithoutCode('Unimplemented') + } } diff --git a/packages/client/src/net/server/networkworker.js b/packages/client/src/net/server/networkworker.js new file mode 100644 index 00000000000..41083f9cf00 --- /dev/null +++ b/packages/client/src/net/server/networkworker.js @@ -0,0 +1,87 @@ +parentPort.postMessage({ type: 'LOG', event: 'Worker starting' }) +import { Config } from '@ethereumjs/client/dist/esm/src/config.js' +import { RlpxServer } from '@ethereumjs/client/dist/esm/src/net/server/rlpxserver.js' +import { parentPort } from 'worker_threads' +let server = null + +parentPort.on('message', async (data) => { + parentPort.postMessage({ type: 'LOG', event: 'Worker received message', data }) + switch (data.type) { + case 'INIT': { + const { maxPeers, bootnodes, dnsNetworks, port, extIP } = data + // Create a minimal config object with only the necessary data + const config = new Config({ + maxPeers, + port, + extIP, + events: { + emit: (event, ...args) => { + parentPort.postMessage({ type: 'EVENT', event, args }) + }, + }, + }) + server = new RlpxServer({ config, bootnodes, dnsNetworks }) + await server.start() + await server.bootstrap() + parentPort.postMessage({ type: 'INIT_COMPLETE' }) + break + } + case 'STOP': { + if (server !== null) { + parentPort.postMessage({ type: 'LOG', event: 'Worker stopping server...' }) + await server.stop() + server = null + parentPort.postMessage({ type: 'LOG', event: 'Worker server stopped' }) + } + parentPort.postMessage({ type: 'STOP_COMPLETE' }) + break + } + case 'GET_NETWORK_INFO': { + if (server !== null) { + const info = server.getRlpxInfo() + parentPort.postMessage({ type: 'NETWORK_INFO', info }) + } + break + } + case 'ADD_PEER': { + if (server !== null) { + try { + const peerInfo = await server.dpt.addPeer(data.peer) + parentPort.postMessage({ type: 'PEER_ADDED', peerInfo }) + } catch (error) { + parentPort.postMessage({ type: 'PEER_ADD_ERROR', error: error.message }) + } + } + break + } + case 'BAN_PEER': { + if (server !== null) { + try { + await server.ban(data.peerId, data.maxAge) + parentPort.postMessage({ type: 'PEER_BANNED', peerId: data.peerId }) + } catch (error) { + parentPort.postMessage({ type: 'PEER_BAN_ERROR', error: error.message }) + } + } + break + } + } +}) + +// Forward protocol messages to main thread +if (server !== null) { + server.config.events.on('PROTOCOL_MESSAGE', (message, protocol, peer) => { + parentPort.postMessage({ + type: 'EVENT', + event: 'PROTOCOL_MESSAGE', + args: [message, protocol, peer], + }) + }) + server.config.events.on('SERVER_ERROR', (error, _server) => { + parentPort.postMessage({ + type: 'EVENT', + event: 'SERVER_ERROR', + args: [error, 'rlpx'], + }) + }) +} diff --git a/packages/client/src/net/server/networkworker.ts b/packages/client/src/net/server/networkworker.ts new file mode 100644 index 00000000000..4a6424e20e8 --- /dev/null +++ b/packages/client/src/net/server/networkworker.ts @@ -0,0 +1,152 @@ +import { dirname } from 'path' +import { fileURLToPath } from 'url' +import { Worker } from 'worker_threads' +import type { Config } from '../../config.ts' +import { Event } from '../../types.ts' +import type { MultiaddrLike } from '../../types.ts' +import type { Peer } from '../peer/peer.ts' + +const __filename = fileURLToPath(import.meta.url) +const __dirname = dirname(__filename) + +export class NetworkWorker { + private worker: Worker + private messageHandler?: (message: any, protocol: string, peer: Peer) => Promise + private config: Config + public discovery: boolean = true + public dpt: { getPeers: () => any[] } = { getPeers: () => [] } + + constructor(config: Config) { + this.config = config + this.worker = new Worker(`${__dirname}/networkworker.js`, { + workerData: { __dirname }, + }) + + // Set up message handling from worker + this.worker.on( + 'message', + (data: { + type: + | 'PROTOCOL_MESSAGE' + | 'INIT_COMPLETE' + | 'STOP_COMPLETE' + | 'NETWORK_INFO' + | 'PEER_ADDED' + | 'PEER_ADD_ERROR' + | 'EVENT' + | 'SERVER_ERROR' + | 'LOG' + message?: any + protocol?: string + peerId?: string + info?: any + peerInfo?: any + error?: string + event?: string + args?: any[] + }) => { + if ( + data.type === 'EVENT' && + data.event === 'PROTOCOL_MESSAGE' && + Array.isArray(data.args) && + data.args.length === 3 + ) { + // Forward events from worker to main thread + this.config.events.emit(Event.PROTOCOL_MESSAGE, data.args[0], data.args[1], data.args[2]) + } else if ( + data.type === 'PROTOCOL_MESSAGE' && + typeof data.message !== 'undefined' && + typeof data.protocol === 'string' && + data.protocol.length > 0 && + typeof data.peerId === 'string' && + data.peerId.length > 0 + ) { + const peer = { id: data.peerId } as Peer + if (this.messageHandler) { + void this.messageHandler(data.message, data.protocol, peer) + } + } else if (data.type === 'LOG') { + console.log(`[DEVP2P] [${data.event}]`) + } + }, + ) + } + + async start(config: Config, bootnodes: MultiaddrLike, dnsNetworks: string[]) { + console.log('starting worker', bootnodes, dnsNetworks) + this.worker.postMessage({ + type: 'INIT', + maxPeers: this.config.maxPeers, + bootnodes, + dnsNetworks, + port: config.port, + extIP: config.extIP, + }) + } + + async stop() { + this.worker.postMessage({ type: 'STOP' }) + } + + registerHandler(handler: (message: any, protocol: string, peer: Peer) => Promise) { + this.messageHandler = handler + } + + sendToPeer(protocol: string, peerId: string, messageName: string, args: any) { + this.worker.postMessage({ + type: 'SEND_MESSAGE', + protocol, + peerId, + messageName, + args, + }) + } + + async getNetworkInfo(): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error('Timeout waiting for network info')) + }, 5000) + + const handler = (data: any) => { + if (data.type === 'NETWORK_INFO') { + clearTimeout(timeout) + this.worker.removeListener('message', handler) + resolve(data.info) + } + } + this.worker.on('message', handler) + this.worker.postMessage({ type: 'GET_NETWORK_INFO' }) + }) + } + + async addPeer(peer: any): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error('Timeout waiting for peer add response')) + }, 5000) + + const handler = (data: any) => { + if (data.type === 'PEER_ADDED') { + clearTimeout(timeout) + this.worker.removeListener('message', handler) + resolve(data.peerInfo) + } else if (data.type === 'PEER_ADD_ERROR') { + clearTimeout(timeout) + this.worker.removeListener('message', handler) + reject(new Error(data.error)) + } + } + this.worker.on('message', handler) + this.worker.postMessage({ type: 'ADD_PEER', peer }) + }) + } + + ban(peerId: string, maxAge: number) { + this.worker.postMessage({ + type: 'BAN_PEER', + peerId, + maxAge, + }) + } +} diff --git a/packages/client/src/rpc/modules/admin.ts b/packages/client/src/rpc/modules/admin.ts index 093158637dc..ec5914cf0d2 100644 --- a/packages/client/src/rpc/modules/admin.ts +++ b/packages/client/src/rpc/modules/admin.ts @@ -9,7 +9,6 @@ import { middleware, validators } from '../validation.ts' import type { Chain } from '../../blockchain/index.ts' import type { EthereumClient } from '../../client.ts' -import type { RlpxServer } from '../../net/server/rlpxserver.ts' import type { FullEthereumService } from '../../service/index.ts' /** @@ -49,8 +48,16 @@ export class Admin { * see for reference: https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-admin#admin_peers */ async nodeInfo() { - const rlpxInfo = this._client.config.server!.getRlpxInfo() - const { enode, id, ip, listenAddr, ports } = rlpxInfo + const networkWorker = this._client.config.networkWorker + if (networkWorker === undefined) { + throw { + code: INTERNAL_ERROR, + message: 'Network worker not initialized', + } + } + + const networkInfo = await networkWorker.getNetworkInfo() + const { enode, id, ip, listenAddr, ports } = networkInfo const { discovery, listener } = ports const clientName = getClientVersion() @@ -112,18 +119,23 @@ export class Admin { } /** - * Attempts to add a peer to client service peer pool using the RLPx server address and port + * Attempts to add a peer to client service peer pool using the network worker * e.g. `.admin_addPeer [{"address": "127.0.0.1", "tcpPort": 30303, "udpPort": 30303}]` * @param params An object containing an address, tcpPort, and udpPort for target server to connect to */ async addPeer(params: [object]) { - const service = this._client.service as any as FullEthereumService - const server = service.pool.config.server as RlpxServer - const dpt = server.dpt + const service = this._client.service as FullEthereumService + const networkWorker = this._client.config.networkWorker + + if (networkWorker === undefined) { + throw { + code: INTERNAL_ERROR, + message: 'Network worker not initialized', + } + } - let peerInfo try { - peerInfo = await dpt!.addPeer(params[0]) + const peerInfo = await networkWorker.addPeer(params[0]) const rlpxPeer = new RlpxPeer({ config: new Config(), id: bytesToHex(peerInfo.id!), @@ -131,6 +143,7 @@ export class Admin { port: peerInfo.tcpPort as number, }) service.pool.add(rlpxPeer) + return true } catch (err: any) { throw { code: INTERNAL_ERROR, @@ -138,7 +151,5 @@ export class Admin { stack: err?.stack, } } - - return peerInfo !== undefined } } diff --git a/packages/client/src/service/service.ts b/packages/client/src/service/service.ts index 8217ef0b47e..431f6778605 100644 --- a/packages/client/src/service/service.ts +++ b/packages/client/src/service/service.ts @@ -91,7 +91,7 @@ export class Service { }) //@ts-expect-error TODO replace with async create constructor - this.chain = options.chain ?? new Chain(options) + this.chain = options.chain ?? new Chain(options.config) this.interval = options.interval ?? 8000 this.timeout = options.timeout ?? 6000 this.opened = false @@ -120,7 +120,12 @@ export class Service { return false } const protocols = this.protocols - this.config.server && this.config.server.addProtocols(protocols) + this.config.networkWorker?.registerHandler(async (message, protocol, peer) => { + const protocolHandler = protocols.find((p) => p.name === protocol) + if (protocolHandler) { + await protocolHandler.handle(message, peer) + } + }) this.config.events.on(Event.POOL_PEER_BANNED, (peer) => this.config.logger?.debug(`Peer banned: ${peer}`), @@ -214,4 +219,16 @@ export class Service { * @param peer peer */ async handle(_message: any, _protocol: string, _peer: Peer): Promise {} + + /** + * Add protocols to the service + */ + addProtocols(protocols: Protocol[]) { + this.config.networkWorker?.registerHandler(async (message, protocol, peer) => { + const protocolHandler = protocols.find((p) => p.name === protocol) + if (protocolHandler) { + await protocolHandler.handle(message, peer) + } + }) + } } diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index c6e4f6f083f..ef1f1c6b8c9 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -3,9 +3,9 @@ import type { MerkleStateManager } from '@ethereumjs/statemanager' import type { Address } from '@ethereumjs/util' import type { Multiaddr } from '@multiformats/multiaddr' import type * as promClient from 'prom-client' -import type { SyncMode } from './index.ts' +import type { RlpxServer, SyncMode } from './index.ts' import type { Peer } from './net/peer/index.ts' -import type { Server } from './net/server/index.ts' +import type { NetworkWorker } from './net/server/networkworker.ts' export type Event = (typeof Event)[keyof typeof Event] /** @@ -47,7 +47,7 @@ export interface EventParams { [Event.PEER_DISCONNECTED]: [disconnectedPeer: Peer] [Event.PEER_ERROR]: [error: Error, peerCausingError: Peer] [Event.SERVER_LISTENING]: [{ transport: string; url: string }] - [Event.SERVER_ERROR]: [serverError: Error, serverCausingError: Server] + [Event.SERVER_ERROR]: [serverError: Error, serverCausingError: NetworkWorker | RlpxServer] [Event.POOL_PEER_ADDED]: [addedPeer: Peer] [Event.POOL_PEER_REMOVED]: [removedPeer: Peer] [Event.POOL_PEER_BANNED]: [bannedPeer: Peer] diff --git a/packages/client/test/rpc/admin/addPeer.spec.ts b/packages/client/test/rpc/admin/addPeer.spec.ts index 7bd4b57c10d..58d2504a97b 100644 --- a/packages/client/test/rpc/admin/addPeer.spec.ts +++ b/packages/client/test/rpc/admin/addPeer.spec.ts @@ -1,56 +1,42 @@ -import { DPT } from '@ethereumjs/devp2p' -import { hexToBytes } from '@ethereumjs/util' import { assert, describe, it } from 'vitest' -import { Config } from '../../../src/index.ts' +import { Config, SyncMode } from '../../../src/index.ts' import { PeerPool } from '../../../src/net/peerpool.ts' -import { RlpxServer } from '../../../src/net/server/index.ts' import { createClient, createManager, getRPCClient, startRPC } from '../helpers.ts' const method = 'admin_addPeer' -const localEndpointInfo = { address: '0.0.0.0', tcpPort: 30303 } const peerPort = 30304 -// NOTE: the `privateKey` currently cannot be 0x-prefixed in `./net/server/server.ts` -const privateKey = 'dc6457099f127cf0bac78de8b297df04951281909db4f58b43def7c7151e765d' -const privateKeyBytes = hexToBytes(`0x${privateKey}`) - describe(method, () => { it('works', async () => { const localPeerClient = await createClient({ opened: true, noPeers: true }) - const remotePeerClient = await createClient({ opened: true, noPeers: true }) const rpc = getRPCClient(startRPC(createManager(localPeerClient).getMethods())) - const dpt = new DPT(privateKeyBytes, { - endpoint: localEndpointInfo, - }) + const localConfig = new Config({ + accountCache: 10000, + storageCache: 1000, + syncmode: SyncMode.Full, + }) localPeerClient.service.pool = new PeerPool({ - config: new Config({ accountCache: 10000, storageCache: 1000 }), + config: localConfig, }) - //@ts-expect-error -- Assigning to a read-only property - localPeerClient.service.pool.config.server.dpt = dpt - const remoteConfig = new Config({ accountCache: 10000, storageCache: 1000, port: peerPort }) - const remoteServer = new RlpxServer({ - config: remoteConfig, - key: privateKey, + const remoteConfig = new Config({ + accountCache: 10000, + storageCache: 1000, + port: peerPort, + syncmode: SyncMode.Full, }) - await remoteServer.start() - ;(remotePeerClient.service.pool.config as any) = { - server: remoteServer, - } + await remoteConfig.networkWorker!.start(remoteConfig, [], []) + + await new Promise((resolve) => setTimeout(resolve, 1000)) const addPeerResponse = await rpc.request('admin_addPeer', [ { address: '0.0.0.0', tcpPort: peerPort, udpPort: peerPort }, ]) assert.strictEqual(addPeerResponse.result, true, 'admin_addPeer successfully adds peer') const peersResponse = await rpc.request('admin_peers', []) - assert.strictEqual(peersResponse.result.length, 1, 'added peer is visible') - assert.strictEqual( - localPeerClient.service.pool.peers.length, - 1, - 'added peer is visible in peer pool', - ) + assert.strictEqual(peersResponse.result.length, 1, 'admin_peers returns one peer') }) -}) +}, 10000) diff --git a/packages/client/test/service/fullethereumservice.spec.ts b/packages/client/test/service/fullethereumservice.spec.ts index 2f252457516..0b99651a068 100644 --- a/packages/client/test/service/fullethereumservice.spec.ts +++ b/packages/client/test/service/fullethereumservice.spec.ts @@ -6,7 +6,7 @@ import { assert, describe, expect, it, vi } from 'vitest' import { Chain } from '../../src/blockchain/index.ts' import { Config, SyncMode } from '../../src/config.ts' -import { RlpxServer } from '../../src/net/server/index.ts' +import { NetworkWorker } from '../../src/net/server/networkworker.ts' import { Event } from '../../src/types.ts' import type { Log } from '@ethereumjs/evm' @@ -52,6 +52,7 @@ vi.mock('../../src/sync/beaconsync.ts', () => { }) vi.mock('../../src/net/server/index.ts') +vi.mock('../../src/net/server/networkworker.ts') vi.mock('../../src/execution/index.ts') const { FullEthereumService } = await import('../../src/service/fullethereumservice.ts') @@ -87,8 +88,7 @@ describe('initialize', async () => { }) describe('should open', async () => { - const server = new RlpxServer({} as any) - const config = new Config({ server, accountCache: 10000, storageCache: 1000 }) + const config = new Config({ accountCache: 10000, storageCache: 1000, syncmode: SyncMode.Full }) const chain = await Chain.create({ config }) chain.open = vi.fn() @@ -96,7 +96,8 @@ describe('should open', async () => { await service.open() expect(service.synchronizer!.open).toBeCalled() - expect(server.addProtocols).toBeCalled() + vi.mocked(config.networkWorker!.start).mockResolvedValue(undefined) + expect(config.networkWorker!.start).toBeCalled() it('should synchronize', async () => { assert.isTrue( @@ -131,7 +132,7 @@ describe('should open', async () => { service.config.events.on(Event.SERVER_ERROR, (err) => { resolve(err.message) }) - service.config.events.emit(Event.SERVER_ERROR, new Error('error1'), server) + service.config.events.emit(Event.SERVER_ERROR, new Error('error1'), config.networkWorker!) resolve(false) }), 'error1', @@ -143,8 +144,8 @@ describe('should open', async () => { }) describe('should start/stop', async () => { - const server = new RlpxServer({} as any) - const config = new Config({ server, accountCache: 10000, storageCache: 1000 }) + const networkWorker = new NetworkWorker(new Config()) + const config = new Config({ networkWorker, accountCache: 10000, storageCache: 1000 }) const chain = await Chain.create({ config }) const service = new FullEthereumService({ config, chain })