Skip to content

Commit cc860ec

Browse files
committed
fix pooling
1 parent 65f9c03 commit cc860ec

File tree

2 files changed

+72
-26
lines changed

2 files changed

+72
-26
lines changed
Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import { dirname, join } from 'path'
2-
import { fileURLToPath } from 'url'
31
import { Worker } from 'worker_threads'
42

53
interface SignatureTask {
@@ -16,54 +14,73 @@ interface SignatureResult {
1614

1715
export class SignatureWorkerPool {
1816
private workers: Worker[] = []
19-
private taskQueue: SignatureTask[] = []
2017
private results: Map<number, SignatureResult> = new Map()
2118
private nextTaskId = 0
19+
private pendingResults = 0
2220

2321
constructor(numWorkers: number = 4) {
24-
const __filename = fileURLToPath(import.meta.url)
25-
const __dirname = dirname(__filename)
26-
const workerPath = join(__dirname, 'signatureWorker.ts')
22+
const workerCode = `
23+
const { parentPort } = require('worker_threads')
24+
const { ecrecover } = require('@ethereumjs/util')
2725
26+
parentPort.on('message', (data) => {
27+
const { tasks, taskId } = data
28+
const results = tasks.map(task => {
29+
const publicKey = ecrecover(task.msgHash, task.v, task.r, task.s, task.chainId)
30+
return { publicKey }
31+
})
32+
parentPort.postMessage({ results, taskId })
33+
})
34+
`
35+
36+
// Initialize workers
2837
for (let i = 0; i < numWorkers; i++) {
29-
const worker = new Worker(workerPath, {
30-
// Use tsx to run TypeScript files
31-
execArgv: ['-r', 'tsx/register'],
32-
})
38+
const worker = new Worker(workerCode, { eval: true })
3339

34-
worker.on('message', (results: SignatureResult[]) => {
40+
worker.on('message', (data: { results: SignatureResult[]; taskId: number }) => {
41+
const { results, taskId } = data
3542
results.forEach((result, index) => {
36-
this.results.set(this.nextTaskId - results.length + index, result)
43+
this.results.set(taskId + index, result)
3744
})
45+
this.pendingResults--
3846
})
47+
3948
this.workers.push(worker)
4049
}
4150
}
4251

4352
async processBatch(tasks: SignatureTask[]): Promise<Map<number, SignatureResult>> {
53+
// Clear previous results
54+
this.results.clear()
55+
this.nextTaskId = 0
56+
this.pendingResults = 0
57+
58+
// Calculate batch size per worker
4459
const batchSize = Math.ceil(tasks.length / this.workers.length)
45-
const promises: Promise<void>[] = []
4660

61+
// Process tasks in parallel
4762
for (let i = 0; i < this.workers.length; i++) {
4863
const start = i * batchSize
4964
const end = Math.min(start + batchSize, tasks.length)
65+
if (start >= tasks.length) break
66+
5067
const batch = tasks.slice(start, end)
68+
this.pendingResults++
69+
this.workers[i].postMessage({ tasks: batch, taskId: this.nextTaskId })
70+
this.nextTaskId += batch.length
71+
}
5172

52-
if (batch.length > 0) {
53-
promises.push(
54-
new Promise((resolve) => {
55-
this.workers[i].postMessage(batch)
56-
resolve()
57-
}),
58-
)
59-
}
73+
// Wait for all results
74+
while (this.pendingResults > 0) {
75+
await new Promise((resolve) => setTimeout(resolve, 10))
6076
}
6177

62-
await Promise.all(promises)
6378
return this.results
6479
}
6580

6681
terminate() {
67-
this.workers.forEach((worker) => worker.terminate())
82+
for (const worker of this.workers) {
83+
worker.terminate()
84+
}
6885
}
6986
}

packages/vm/test/benchmark/signatureVerification.ts

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import { LegacyTx } from '@ethereumjs/tx'
2-
import { bigIntToUnpaddedBytes, ecrecover, equalsBytes, randomBytes } from '@ethereumjs/util'
2+
import {
3+
bigIntToUnpaddedBytes,
4+
bytesToHex,
5+
ecrecover,
6+
equalsBytes,
7+
randomBytes,
8+
} from '@ethereumjs/util'
39
import { SignatureWorkerPool } from '../../src/worker/signatureWorkerPool.ts'
410

511
async function runBenchmark() {
@@ -66,18 +72,41 @@ async function runBenchmark() {
6672
const speedup = (endSeq - startSeq) / (endPar - startPar)
6773
console.log(`Speedup: ${speedup.toFixed(2)}x`)
6874

75+
// Print result counts
76+
console.log(
77+
`\nResults count - Sequential: ${sequentialResults.size}, Parallel: ${parallelResults.size}`,
78+
)
79+
6980
// Verify results match by comparing the stored sequential results with parallel results
7081
console.log('\nVerifying results...')
7182
let allMatch = true
83+
let missingResults = 0
84+
let mismatchedResults = 0
85+
7286
for (let i = 0; i < signedTxs.length; i++) {
7387
const parallelResult = parallelResults.get(i)
7488
const sequentialResult = sequentialResults.get(i)
75-
if (parallelResult && !equalsBytes(parallelResult.publicKey, sequentialResult)) {
89+
90+
if (!parallelResult) {
91+
console.log(`Missing parallel result at index ${i}`)
92+
missingResults++
93+
allMatch = false
94+
continue
95+
}
96+
97+
if (!equalsBytes(parallelResult.publicKey, sequentialResult)) {
7698
console.log(`Mismatch at index ${i}`)
99+
console.log(`Sequential: ${sequentialResult}`)
100+
console.log(`Parallel: ${parallelResult.publicKey}`)
101+
mismatchedResults++
77102
allMatch = false
78103
}
79104
}
80-
console.log(`All results match: ${allMatch}`)
105+
106+
console.log(`\nVerification Summary:`)
107+
console.log(`- All results match: ${allMatch}`)
108+
console.log(`- Missing results: ${missingResults}`)
109+
console.log(`- Mismatched results: ${mismatchedResults}`)
81110
}
82111

83112
runBenchmark().catch(console.error)

0 commit comments

Comments
 (0)