Skip to content

Commit 89f837f

Browse files
authored
fix(builder): resolve v8 serialization crash in cluster worker pool (#230)
1 parent 8adf2e4 commit 89f837f

File tree

2 files changed

+38
-24
lines changed

2 files changed

+38
-24
lines changed

packages/builder/src/runAsWorker.ts

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import { join } from 'node:path'
12
import process from 'node:process'
3+
import { fileURLToPath } from 'node:url'
24
import { deserialize } from 'node:v8'
35

46
import type { PhotoManifestItem } from '@afilmory/typing'
57

68
import type { BuilderOptions } from './builder/builder.js'
79
import { AfilmoryBuilder } from './builder/builder.js'
10+
import { loadBuilderConfig } from './config/index.js'
811
import type { PluginRunState } from './plugins/manager.js'
912
import type { StorageObject } from './storage/interfaces'
1013
import type { BuilderConfig } from './types/config.js'
@@ -40,6 +43,17 @@ export async function runAsWorker() {
4043
let builder: AfilmoryBuilder
4144
let pluginRunState: PluginRunState
4245

46+
// 安全发送消息到主进程(防止 EPIPE 错误)
47+
const safeSend = (message: unknown) => {
48+
try {
49+
if (process.send && process.connected) {
50+
process.send(message)
51+
}
52+
} catch {
53+
// 主进程已关闭 IPC 通道,静默忽略
54+
}
55+
}
56+
4357
// 初始化函数,从主进程接收共享数据
4458
const initializeWorker = async (serializedData: WorkerInitMessage['sharedData']) => {
4559
if (isInitialized) return
@@ -52,7 +66,18 @@ export async function runAsWorker() {
5266
imageObjects = sharedData.imageObjects
5367
existingManifestMap = sharedData.existingManifestMap
5468
livePhotoMap = sharedData.livePhotoMap
55-
builder = new AfilmoryBuilder(sharedData.builderConfig)
69+
70+
// 主进程序列化时移除了 plugins(函数无法序列化),
71+
// 从配置文件重新加载以获取完整的插件配置
72+
const fullConfig = await loadBuilderConfig({
73+
cwd: join(fileURLToPath(import.meta.url), '../../../..'),
74+
})
75+
const builderConfig: BuilderConfig = {
76+
...sharedData.builderConfig,
77+
plugins: fullConfig.plugins,
78+
}
79+
80+
builder = new AfilmoryBuilder(builderConfig)
5681
await builder.ensurePluginsReady()
5782
pluginRunState = builder.createPluginRunState()
5883

@@ -133,9 +158,7 @@ export async function runAsWorker() {
133158
result,
134159
}
135160

136-
if (process.send) {
137-
process.send(response)
138-
}
161+
safeSend(response)
139162
} catch (error) {
140163
// 发送错误回主进程
141164
const response: TaskResult = {
@@ -144,9 +167,7 @@ export async function runAsWorker() {
144167
error: error instanceof Error ? error.message : String(error),
145168
}
146169

147-
if (process.send) {
148-
process.send(response)
149-
}
170+
safeSend(response)
150171
}
151172
}
152173

@@ -242,9 +263,7 @@ export async function runAsWorker() {
242263
results,
243264
}
244265

245-
if (process.send) {
246-
process.send(response)
247-
}
266+
safeSend(response)
248267
} catch (error) {
249268
// 如果批量处理失败,为每个任务发送错误结果
250269
const results: TaskResult[] = message.tasks.map((task) => ({
@@ -258,9 +277,7 @@ export async function runAsWorker() {
258277
results,
259278
}
260279

261-
if (process.send) {
262-
process.send(response)
263-
}
280+
safeSend(response)
264281
}
265282
}
266283

@@ -275,19 +292,15 @@ export async function runAsWorker() {
275292

276293
if (message.type === 'ping') {
277294
// 响应主进程的 ping,表示 worker 已准备好
278-
if (process.send) {
279-
process.send({ type: 'pong', workerId })
280-
}
295+
safeSend({ type: 'pong', workerId })
281296
return
282297
}
283298

284299
if (message.type === 'init') {
285300
// 处理初始化消息
286301
try {
287302
await initializeWorker(message.sharedData)
288-
if (process.send) {
289-
process.send({ type: 'init-complete', workerId })
290-
}
303+
safeSend({ type: 'init-complete', workerId })
291304
} catch (error) {
292305
console.error('Worker initialization failed', error)
293306
process.exit(1)
@@ -323,7 +336,5 @@ export async function runAsWorker() {
323336
})
324337

325338
// 告知主进程 worker 已准备好
326-
if (process.send) {
327-
process.send({ type: 'ready', workerId })
328-
}
339+
safeSend({ type: 'ready', workerId })
329340
}

packages/builder/src/worker/cluster-pool.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,14 @@ export class ClusterPool<T> extends EventEmitter {
267267
// 首次准备就绪时发送初始化数据,但不立即标记为 ready
268268
if (this.sharedData) {
269269
// 使用 v8.serialize 序列化数据以保持类型完整性
270+
// 注意:plugins 可能包含函数(如 hook 回调),v8.serialize 无法序列化函数
271+
// 因此在序列化时移除 plugins,worker 进程会从配置文件重新加载
272+
const { plugins: _plugins, ...serializableConfig } = this.sharedData.builderConfig
270273
const serializedBuffer = serialize({
271274
existingManifestMap: this.sharedData.existingManifestMap,
272275
livePhotoMap: this.sharedData.livePhotoMap,
273276
imageObjects: this.sharedData.imageObjects,
274-
builderConfig: this.sharedData.builderConfig,
277+
builderConfig: { ...serializableConfig, plugins: [] },
275278
})
276279

277280
// 将 Buffer 转换为数组以通过 IPC 传输
@@ -515,7 +518,7 @@ export class ClusterPool<T> extends EventEmitter {
515518
const timeout = setTimeout(() => {
516519
worker.kill('SIGKILL')
517520
resolve()
518-
}, 0)
521+
}, 5000)
519522

520523
worker.on('exit', () => {
521524
clearTimeout(timeout)

0 commit comments

Comments
 (0)