Skip to content

Commit a185ab9

Browse files
authored
fix: cluster pool dead lock (#99)
1 parent 41b0d7a commit a185ab9

File tree

1 file changed

+19
-53
lines changed

1 file changed

+19
-53
lines changed

packages/builder/src/worker/cluster-pool.ts

Lines changed: 19 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,6 @@ export class ClusterPool<T> extends EventEmitter {
119119
// 启动 worker 进程
120120
await this.startWorkers()
121121

122-
// 等待所有 worker 准备好
123-
await this.waitForWorkersReady()
124-
125122
// 等待所有任务完成
126123
return new Promise((resolve, reject) => {
127124
this.on('allTasksCompleted', () => {
@@ -134,9 +131,6 @@ export class ClusterPool<T> extends EventEmitter {
134131
})
135132

136133
this.on('error', reject)
137-
138-
// 开始分发任务
139-
this.distributeInitialTasks()
140134
})
141135
}
142136

@@ -200,25 +194,25 @@ export class ClusterPool<T> extends EventEmitter {
200194
| { type: 'init-complete'; workerId: number },
201195
) => {
202196
switch (message.type) {
203-
case 'ready':
204-
case 'pong': {
205-
this.handleWorkerReady(workerId, message as WorkerReadyMessage)
206-
207-
break;
208-
}
209-
case 'init-complete': {
210-
this.handleWorkerInitComplete(workerId)
211-
212-
break;
213-
}
214-
case 'batch-result': {
215-
this.handleWorkerBatchResult(workerId, message as BatchTaskResult)
216-
217-
break;
218-
}
219-
default: {
220-
this.handleWorkerMessage(workerId, message as TaskResult)
221-
}
197+
case 'ready':
198+
case 'pong': {
199+
this.handleWorkerReady(workerId, message as WorkerReadyMessage)
200+
201+
break
202+
}
203+
case 'init-complete': {
204+
this.handleWorkerInitComplete(workerId)
205+
206+
break
207+
}
208+
case 'batch-result': {
209+
this.handleWorkerBatchResult(workerId, message as BatchTaskResult)
210+
211+
break
212+
}
213+
default: {
214+
this.handleWorkerMessage(workerId, message as TaskResult)
215+
}
222216
}
223217
},
224218
)
@@ -305,13 +299,6 @@ export class ClusterPool<T> extends EventEmitter {
305299
}
306300
}
307301

308-
private distributeInitialTasks(): void {
309-
// 为每个 worker 分配初始任务批次
310-
for (const [workerId] of this.workers) {
311-
this.assignBatchTasksToWorker(workerId)
312-
}
313-
}
314-
315302
private assignBatchTasksToWorker(workerId: number): void {
316303
if (this.taskQueue.length === 0) return
317304

@@ -521,27 +508,6 @@ export class ClusterPool<T> extends EventEmitter {
521508
this.workerStats.clear()
522509
}
523510

524-
private async waitForWorkersReady(): Promise<void> {
525-
return new Promise((resolve) => {
526-
const requiredWorkers = Math.ceil(
527-
this.totalTasks / this.workerConcurrency,
528-
)
529-
const expectedWorkers = Math.min(this.concurrency, requiredWorkers)
530-
531-
const checkReady = () => {
532-
if (this.readyWorkers.size >= expectedWorkers) {
533-
this.logger.main.info(
534-
`所有 ${expectedWorkers} 个 worker 进程已准备就绪`,
535-
)
536-
resolve()
537-
}
538-
}
539-
540-
this.on('workerReady', checkReady)
541-
checkReady() // 立即检查一次
542-
})
543-
}
544-
545511
// 获取 worker 统计信息
546512
getWorkerStats(): WorkerStats[] {
547513
return Array.from(this.workerStats.values())

0 commit comments

Comments
 (0)