From 65f9c03b54c4bbf8f6776b4c538ffcaf62d99ed4 Mon Sep 17 00:00:00 2001 From: acolytec3 <17355484+acolytec3@users.noreply.github.com> Date: Tue, 20 May 2025 13:49:14 -0400 Subject: [PATCH 1/6] sig verification pool test --- packages/vm/src/worker/signatureWorker.ts | 25 ++++++ packages/vm/src/worker/signatureWorkerPool.ts | 69 +++++++++++++++ .../test/benchmark/signatureVerification.ts | 83 +++++++++++++++++++ 3 files changed, 177 insertions(+) create mode 100644 packages/vm/src/worker/signatureWorker.ts create mode 100644 packages/vm/src/worker/signatureWorkerPool.ts create mode 100644 packages/vm/test/benchmark/signatureVerification.ts diff --git a/packages/vm/src/worker/signatureWorker.ts b/packages/vm/src/worker/signatureWorker.ts new file mode 100644 index 00000000000..b3f4cea5464 --- /dev/null +++ b/packages/vm/src/worker/signatureWorker.ts @@ -0,0 +1,25 @@ +import { ecrecover } from '@ethereumjs/util' +import { parentPort } from 'worker_threads' + +interface SignatureTask { + msgHash: Uint8Array + v: bigint + r: Uint8Array + s: Uint8Array + chainId?: bigint +} + +interface SignatureResult { + publicKey: Uint8Array +} + +function processSignature(task: SignatureTask): SignatureResult { + const publicKey = ecrecover(task.msgHash, task.v, task.r, task.s, task.chainId) + return { publicKey } +} + +// Listen for messages from the main thread +parentPort?.on('message', (tasks: SignatureTask[]) => { + const results = tasks.map(processSignature) + parentPort?.postMessage(results) +}) diff --git a/packages/vm/src/worker/signatureWorkerPool.ts b/packages/vm/src/worker/signatureWorkerPool.ts new file mode 100644 index 00000000000..dec662c6db8 --- /dev/null +++ b/packages/vm/src/worker/signatureWorkerPool.ts @@ -0,0 +1,69 @@ +import { dirname, join } from 'path' +import { fileURLToPath } from 'url' +import { Worker } from 'worker_threads' + +interface SignatureTask { + msgHash: Uint8Array + v: bigint + r: Uint8Array + s: Uint8Array + chainId?: bigint +} + +interface SignatureResult { + publicKey: Uint8Array +} + +export class SignatureWorkerPool { + private workers: Worker[] = [] + private taskQueue: SignatureTask[] = [] + private results: Map = new Map() + private nextTaskId = 0 + + constructor(numWorkers: number = 4) { + const __filename = fileURLToPath(import.meta.url) + const __dirname = dirname(__filename) + const workerPath = join(__dirname, 'signatureWorker.ts') + + for (let i = 0; i < numWorkers; i++) { + const worker = new Worker(workerPath, { + // Use tsx to run TypeScript files + execArgv: ['-r', 'tsx/register'], + }) + + worker.on('message', (results: SignatureResult[]) => { + results.forEach((result, index) => { + this.results.set(this.nextTaskId - results.length + index, result) + }) + }) + this.workers.push(worker) + } + } + + async processBatch(tasks: SignatureTask[]): Promise> { + const batchSize = Math.ceil(tasks.length / this.workers.length) + const promises: Promise[] = [] + + for (let i = 0; i < this.workers.length; i++) { + const start = i * batchSize + const end = Math.min(start + batchSize, tasks.length) + const batch = tasks.slice(start, end) + + if (batch.length > 0) { + promises.push( + new Promise((resolve) => { + this.workers[i].postMessage(batch) + resolve() + }), + ) + } + } + + await Promise.all(promises) + return this.results + } + + terminate() { + this.workers.forEach((worker) => worker.terminate()) + } +} diff --git a/packages/vm/test/benchmark/signatureVerification.ts b/packages/vm/test/benchmark/signatureVerification.ts new file mode 100644 index 00000000000..624c28b8ebc --- /dev/null +++ b/packages/vm/test/benchmark/signatureVerification.ts @@ -0,0 +1,83 @@ +import { LegacyTx } from '@ethereumjs/tx' +import { bigIntToUnpaddedBytes, ecrecover, equalsBytes, randomBytes } from '@ethereumjs/util' +import { SignatureWorkerPool } from '../../src/worker/signatureWorkerPool.ts' + +async function runBenchmark() { + // Create 10 test transactions + const transactions = Array.from({ length: 1000 }, (_, i) => { + return new LegacyTx({ + nonce: i, + gasPrice: 1000000000n, + gasLimit: 21000n, + to: '0x0000000000000000000000000000000000000000', + value: 1000000000000000000n, + data: '0x', + }) + }) + + // Sign all transactions + const signedTxs = transactions.map((tx) => tx.sign(randomBytes(32))) + + // Precompute all hashes and signature values + const msgHashes = signedTxs.map((tx) => tx.getMessageToVerifySignature()) + const vs = signedTxs.map((tx) => tx.v!) + const rs = signedTxs.map((tx) => bigIntToUnpaddedBytes(tx.r!)) + const ss = signedTxs.map((tx) => bigIntToUnpaddedBytes(tx.s!)) + const chainIds = signedTxs.map((tx) => tx.common.chainId()) + + console.log('Running benchmark...\n') + + // Sequential verification + console.log('Sequential verification:') + const startSeq = performance.now() + + // Store sequential results + const sequentialResults = new Map() + for (let i = 0; i < signedTxs.length; i++) { + const publicKey = ecrecover(msgHashes[i], vs[i], rs[i], ss[i], 1n) + sequentialResults.set(i, publicKey) + } + + const endSeq = performance.now() + console.log(`Time taken: ${endSeq - startSeq}ms\n`) + + // Parallel verification using worker pool + console.log('Parallel verification using worker pool:') + const startPar = performance.now() + + const signatureTasks = msgHashes.map((msgHash, i) => { + return { + msgHash, + v: vs[i], + r: rs[i], + s: ss[i], + chainId: chainIds[i], + } + }) + + const workerPool = new SignatureWorkerPool(4) // Use 4 workers + const parallelResults = await workerPool.processBatch(signatureTasks) + await workerPool.terminate() + + const endPar = performance.now() + console.log(`Time taken: ${endPar - startPar}ms\n`) + + // Print speedup + const speedup = (endSeq - startSeq) / (endPar - startPar) + console.log(`Speedup: ${speedup.toFixed(2)}x`) + + // Verify results match by comparing the stored sequential results with parallel results + console.log('\nVerifying results...') + let allMatch = true + for (let i = 0; i < signedTxs.length; i++) { + const parallelResult = parallelResults.get(i) + const sequentialResult = sequentialResults.get(i) + if (parallelResult && !equalsBytes(parallelResult.publicKey, sequentialResult)) { + console.log(`Mismatch at index ${i}`) + allMatch = false + } + } + console.log(`All results match: ${allMatch}`) +} + +runBenchmark().catch(console.error) From cc860ecf489c07ae3e35973e4f95849811842137 Mon Sep 17 00:00:00 2001 From: acolytec3 <17355484+acolytec3@users.noreply.github.com> Date: Tue, 20 May 2025 20:26:44 -0400 Subject: [PATCH 2/6] fix pooling --- packages/vm/src/worker/signatureWorkerPool.ts | 63 ++++++++++++------- .../test/benchmark/signatureVerification.ts | 35 ++++++++++- 2 files changed, 72 insertions(+), 26 deletions(-) diff --git a/packages/vm/src/worker/signatureWorkerPool.ts b/packages/vm/src/worker/signatureWorkerPool.ts index dec662c6db8..2214a01106e 100644 --- a/packages/vm/src/worker/signatureWorkerPool.ts +++ b/packages/vm/src/worker/signatureWorkerPool.ts @@ -1,5 +1,3 @@ -import { dirname, join } from 'path' -import { fileURLToPath } from 'url' import { Worker } from 'worker_threads' interface SignatureTask { @@ -16,54 +14,73 @@ interface SignatureResult { export class SignatureWorkerPool { private workers: Worker[] = [] - private taskQueue: SignatureTask[] = [] private results: Map = new Map() private nextTaskId = 0 + private pendingResults = 0 constructor(numWorkers: number = 4) { - const __filename = fileURLToPath(import.meta.url) - const __dirname = dirname(__filename) - const workerPath = join(__dirname, 'signatureWorker.ts') + const workerCode = ` + const { parentPort } = require('worker_threads') + const { ecrecover } = require('@ethereumjs/util') + parentPort.on('message', (data) => { + const { tasks, taskId } = data + const results = tasks.map(task => { + const publicKey = ecrecover(task.msgHash, task.v, task.r, task.s, task.chainId) + return { publicKey } + }) + parentPort.postMessage({ results, taskId }) + }) + ` + + // Initialize workers for (let i = 0; i < numWorkers; i++) { - const worker = new Worker(workerPath, { - // Use tsx to run TypeScript files - execArgv: ['-r', 'tsx/register'], - }) + const worker = new Worker(workerCode, { eval: true }) - worker.on('message', (results: SignatureResult[]) => { + worker.on('message', (data: { results: SignatureResult[]; taskId: number }) => { + const { results, taskId } = data results.forEach((result, index) => { - this.results.set(this.nextTaskId - results.length + index, result) + this.results.set(taskId + index, result) }) + this.pendingResults-- }) + this.workers.push(worker) } } async processBatch(tasks: SignatureTask[]): Promise> { + // Clear previous results + this.results.clear() + this.nextTaskId = 0 + this.pendingResults = 0 + + // Calculate batch size per worker const batchSize = Math.ceil(tasks.length / this.workers.length) - const promises: Promise[] = [] + // Process tasks in parallel for (let i = 0; i < this.workers.length; i++) { const start = i * batchSize const end = Math.min(start + batchSize, tasks.length) + if (start >= tasks.length) break + const batch = tasks.slice(start, end) + this.pendingResults++ + this.workers[i].postMessage({ tasks: batch, taskId: this.nextTaskId }) + this.nextTaskId += batch.length + } - if (batch.length > 0) { - promises.push( - new Promise((resolve) => { - this.workers[i].postMessage(batch) - resolve() - }), - ) - } + // Wait for all results + while (this.pendingResults > 0) { + await new Promise((resolve) => setTimeout(resolve, 10)) } - await Promise.all(promises) return this.results } terminate() { - this.workers.forEach((worker) => worker.terminate()) + for (const worker of this.workers) { + worker.terminate() + } } } diff --git a/packages/vm/test/benchmark/signatureVerification.ts b/packages/vm/test/benchmark/signatureVerification.ts index 624c28b8ebc..421b6492dc5 100644 --- a/packages/vm/test/benchmark/signatureVerification.ts +++ b/packages/vm/test/benchmark/signatureVerification.ts @@ -1,5 +1,11 @@ import { LegacyTx } from '@ethereumjs/tx' -import { bigIntToUnpaddedBytes, ecrecover, equalsBytes, randomBytes } from '@ethereumjs/util' +import { + bigIntToUnpaddedBytes, + bytesToHex, + ecrecover, + equalsBytes, + randomBytes, +} from '@ethereumjs/util' import { SignatureWorkerPool } from '../../src/worker/signatureWorkerPool.ts' async function runBenchmark() { @@ -66,18 +72,41 @@ async function runBenchmark() { const speedup = (endSeq - startSeq) / (endPar - startPar) console.log(`Speedup: ${speedup.toFixed(2)}x`) + // Print result counts + console.log( + `\nResults count - Sequential: ${sequentialResults.size}, Parallel: ${parallelResults.size}`, + ) + // Verify results match by comparing the stored sequential results with parallel results console.log('\nVerifying results...') let allMatch = true + let missingResults = 0 + let mismatchedResults = 0 + for (let i = 0; i < signedTxs.length; i++) { const parallelResult = parallelResults.get(i) const sequentialResult = sequentialResults.get(i) - if (parallelResult && !equalsBytes(parallelResult.publicKey, sequentialResult)) { + + if (!parallelResult) { + console.log(`Missing parallel result at index ${i}`) + missingResults++ + allMatch = false + continue + } + + if (!equalsBytes(parallelResult.publicKey, sequentialResult)) { console.log(`Mismatch at index ${i}`) + console.log(`Sequential: ${sequentialResult}`) + console.log(`Parallel: ${parallelResult.publicKey}`) + mismatchedResults++ allMatch = false } } - console.log(`All results match: ${allMatch}`) + + console.log(`\nVerification Summary:`) + console.log(`- All results match: ${allMatch}`) + console.log(`- Missing results: ${missingResults}`) + console.log(`- Mismatched results: ${mismatchedResults}`) } runBenchmark().catch(console.error) From 6d424074305c0a6c0566b1d9190eec6d875660cc Mon Sep 17 00:00:00 2001 From: acolytec3 <17355484+acolytec3@users.noreply.github.com> Date: Tue, 20 May 2025 20:29:35 -0400 Subject: [PATCH 3/6] use esm --- packages/vm/src/worker/signatureWorkerPool.ts | 4 ++-- packages/vm/test/benchmark/signatureVerification.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/vm/src/worker/signatureWorkerPool.ts b/packages/vm/src/worker/signatureWorkerPool.ts index 2214a01106e..b617f2ebbf8 100644 --- a/packages/vm/src/worker/signatureWorkerPool.ts +++ b/packages/vm/src/worker/signatureWorkerPool.ts @@ -20,8 +20,8 @@ export class SignatureWorkerPool { constructor(numWorkers: number = 4) { const workerCode = ` - const { parentPort } = require('worker_threads') - const { ecrecover } = require('@ethereumjs/util') + import { parentPort } from 'worker_threads' + import { ecrecover } from '@ethereumjs/util' parentPort.on('message', (data) => { const { tasks, taskId } = data diff --git a/packages/vm/test/benchmark/signatureVerification.ts b/packages/vm/test/benchmark/signatureVerification.ts index 421b6492dc5..4bf2bf27247 100644 --- a/packages/vm/test/benchmark/signatureVerification.ts +++ b/packages/vm/test/benchmark/signatureVerification.ts @@ -10,7 +10,7 @@ import { SignatureWorkerPool } from '../../src/worker/signatureWorkerPool.ts' async function runBenchmark() { // Create 10 test transactions - const transactions = Array.from({ length: 1000 }, (_, i) => { + const transactions = Array.from({ length: 2000 }, (_, i) => { return new LegacyTx({ nonce: i, gasPrice: 1000000000n, From 033f6526298cf57526eabe6ec1638ea7ae1f1a00 Mon Sep 17 00:00:00 2001 From: acolytec3 <17355484+acolytec3@users.noreply.github.com> Date: Tue, 20 May 2025 20:33:36 -0400 Subject: [PATCH 4/6] benchmark --- packages/vm/test/benchmark/signatureVerification.ts | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/packages/vm/test/benchmark/signatureVerification.ts b/packages/vm/test/benchmark/signatureVerification.ts index 4bf2bf27247..8e626410d08 100644 --- a/packages/vm/test/benchmark/signatureVerification.ts +++ b/packages/vm/test/benchmark/signatureVerification.ts @@ -1,11 +1,5 @@ import { LegacyTx } from '@ethereumjs/tx' -import { - bigIntToUnpaddedBytes, - bytesToHex, - ecrecover, - equalsBytes, - randomBytes, -} from '@ethereumjs/util' +import { bigIntToUnpaddedBytes, ecrecover, equalsBytes, randomBytes } from '@ethereumjs/util' import { SignatureWorkerPool } from '../../src/worker/signatureWorkerPool.ts' async function runBenchmark() { From a6b2eb2a0dc9ac8033808d786d465c7b646c3326 Mon Sep 17 00:00:00 2001 From: acolytec3 <17355484+acolytec3@users.noreply.github.com> Date: Thu, 22 May 2025 11:02:52 -0400 Subject: [PATCH 5/6] delete unused worker script --- packages/vm/src/worker/signatureWorker.ts | 25 ------------------- packages/vm/src/worker/signatureWorkerPool.ts | 4 +-- 2 files changed, 2 insertions(+), 27 deletions(-) delete mode 100644 packages/vm/src/worker/signatureWorker.ts diff --git a/packages/vm/src/worker/signatureWorker.ts b/packages/vm/src/worker/signatureWorker.ts deleted file mode 100644 index b3f4cea5464..00000000000 --- a/packages/vm/src/worker/signatureWorker.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { ecrecover } from '@ethereumjs/util' -import { parentPort } from 'worker_threads' - -interface SignatureTask { - msgHash: Uint8Array - v: bigint - r: Uint8Array - s: Uint8Array - chainId?: bigint -} - -interface SignatureResult { - publicKey: Uint8Array -} - -function processSignature(task: SignatureTask): SignatureResult { - const publicKey = ecrecover(task.msgHash, task.v, task.r, task.s, task.chainId) - return { publicKey } -} - -// Listen for messages from the main thread -parentPort?.on('message', (tasks: SignatureTask[]) => { - const results = tasks.map(processSignature) - parentPort?.postMessage(results) -}) diff --git a/packages/vm/src/worker/signatureWorkerPool.ts b/packages/vm/src/worker/signatureWorkerPool.ts index b617f2ebbf8..2214a01106e 100644 --- a/packages/vm/src/worker/signatureWorkerPool.ts +++ b/packages/vm/src/worker/signatureWorkerPool.ts @@ -20,8 +20,8 @@ export class SignatureWorkerPool { constructor(numWorkers: number = 4) { const workerCode = ` - import { parentPort } from 'worker_threads' - import { ecrecover } from '@ethereumjs/util' + const { parentPort } = require('worker_threads') + const { ecrecover } = require('@ethereumjs/util') parentPort.on('message', (data) => { const { tasks, taskId } = data From 8888d904e8c7e51b734d98e83dadba66867467ff Mon Sep 17 00:00:00 2001 From: acolytec3 <17355484+acolytec3@users.noreply.github.com> Date: Thu, 22 May 2025 11:10:04 -0400 Subject: [PATCH 6/6] relocate files and add readme --- packages/vm/test/worker/README.md | 13 +++++++++++++ .../ecrecover/signatureVerificationBenchmark.ts} | 2 +- .../worker/ecrecover}/signatureWorkerPool.ts | 0 3 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 packages/vm/test/worker/README.md rename packages/vm/test/{benchmark/signatureVerification.ts => worker/ecrecover/signatureVerificationBenchmark.ts} (97%) rename packages/vm/{src/worker => test/worker/ecrecover}/signatureWorkerPool.ts (100%) diff --git a/packages/vm/test/worker/README.md b/packages/vm/test/worker/README.md new file mode 100644 index 00000000000..b318795a42b --- /dev/null +++ b/packages/vm/test/worker/README.md @@ -0,0 +1,13 @@ +# Worker Thread Experiments + +This folder holds various worker thread experiments exploring possible performance improvements that can be gained from paralleization of heavy computation or else multiple long-lived threads for related but interdependent workflows. + +## `ecRecover` Worker Pool + +The `ecrecover` directory contains an implementation of a worker pool for parallel ECDSA signature recovery operations. + +- [`signatureWorkerPool.ts`](./ecrecover//signatureWorkerPool.ts): Implements a worker pool that distributes ECDSA signature recovery operations across multiple Node.js worker threads. +- [`signatureVerificationBenchmark.ts`](./ecrecover//signatureVerificationBenchmark.ts): Benchmark script that compares sequential vs. parallel signature verification performance. + + + diff --git a/packages/vm/test/benchmark/signatureVerification.ts b/packages/vm/test/worker/ecrecover/signatureVerificationBenchmark.ts similarity index 97% rename from packages/vm/test/benchmark/signatureVerification.ts rename to packages/vm/test/worker/ecrecover/signatureVerificationBenchmark.ts index 8e626410d08..2b42f63913c 100644 --- a/packages/vm/test/benchmark/signatureVerification.ts +++ b/packages/vm/test/worker/ecrecover/signatureVerificationBenchmark.ts @@ -1,6 +1,6 @@ import { LegacyTx } from '@ethereumjs/tx' import { bigIntToUnpaddedBytes, ecrecover, equalsBytes, randomBytes } from '@ethereumjs/util' -import { SignatureWorkerPool } from '../../src/worker/signatureWorkerPool.ts' +import { SignatureWorkerPool } from './signatureWorkerPool.ts' async function runBenchmark() { // Create 10 test transactions diff --git a/packages/vm/src/worker/signatureWorkerPool.ts b/packages/vm/test/worker/ecrecover/signatureWorkerPool.ts similarity index 100% rename from packages/vm/src/worker/signatureWorkerPool.ts rename to packages/vm/test/worker/ecrecover/signatureWorkerPool.ts